From 9306023c5973940eb1d827d88252bb3e18f79f01 Mon Sep 17 00:00:00 2001 From: tnsetting Date: Wed, 27 Jan 2021 14:16:54 +0100 Subject: [PATCH 1/9] Add publisher for events --- pkg/async/notifications/factory.go | 51 +++++++++++ .../implementations/event_publisher.go | 87 +++++++++++++++++++ pkg/manager/impl/task_execution_manager.go | 30 ++++--- .../impl/task_execution_manager_test.go | 42 +++------ pkg/rpc/adminservice/base.go | 5 +- .../interfaces/application_configuration.go | 10 +++ 6 files changed, 182 insertions(+), 43 deletions(-) create mode 100644 pkg/async/notifications/implementations/event_publisher.go diff --git a/pkg/async/notifications/factory.go b/pkg/async/notifications/factory.go index 83ee8c663..f5aa73e42 100644 --- a/pkg/async/notifications/factory.go +++ b/pkg/async/notifications/factory.go @@ -148,3 +148,54 @@ func NewNotificationsPublisher(config runtimeInterfaces.NotificationsConfig, sco return implementations.NewNoopPublish() } } + +func NewEventsPublisher(config runtimeInterfaces.NotificationsConfig, scope promutils.Scope) interfaces.Publisher { + reconnectAttempts := config.ReconnectAttempts + reconnectDelay := time.Duration(config.ReconnectDelaySeconds) * time.Second + switch config.Type { + case common.AWS: + snsConfig := gizmoAWS.SNSConfig{ + Topic: config.NotificationsPublisherConfig.TopicName, + } + if config.AWSConfig.Region != "" { + snsConfig.Region = config.AWSConfig.Region + } else { + snsConfig.Region = config.Region + } + + var publisher pubsub.Publisher + var err error + err = async.Retry(reconnectAttempts, reconnectDelay, func() error { + publisher, err = gizmoAWS.NewPublisher(snsConfig) + return err + }) + + // Any persistent errors initiating Publisher with Amazon configurations results in a failed start up. + if err != nil { + panic(err) + } + return implementations.NewEventsPublisher(publisher, scope, config.EventPublisherConfig.EventTypes) + case common.GCP: + pubsubConfig := gizmoGCP.Config{ + Topic: config.EventPublisherConfig.TopicName, + } + pubsubConfig.ProjectID = config.GCPConfig.ProjectID + var publisher pubsub.MultiPublisher + var err error + err = async.Retry(reconnectAttempts, reconnectDelay, func() error { + publisher, err = gizmoGCP.NewPublisher(context.TODO(), pubsubConfig) + return err + }) + + if err != nil { + panic(err) + } + return implementations.NewEventsPublisher(publisher, scope, config.EventPublisherConfig.EventTypes) + case common.Local: + fallthrough + default: + logger.Infof(context.Background(), + "Using default noop events publisher implementation for config type [%s]", config.Type) + return implementations.NewNoopPublish() + } +} diff --git a/pkg/async/notifications/implementations/event_publisher.go b/pkg/async/notifications/implementations/event_publisher.go new file mode 100644 index 000000000..870d922d3 --- /dev/null +++ b/pkg/async/notifications/implementations/event_publisher.go @@ -0,0 +1,87 @@ +package implementations + +import ( + "context" + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin" + "strings" + + "github.com/lyft/flyteadmin/pkg/async/notifications/interfaces" + + "github.com/NYTimes/gizmo/pubsub" + "github.com/golang/protobuf/proto" + "github.com/lyft/flytestdlib/logger" + "github.com/lyft/flytestdlib/promutils" + "github.com/prometheus/client_golang/prometheus" +) + +type eventPublisherSystemMetrics struct { + Scope promutils.Scope + PublishTotal prometheus.Counter + PublishError prometheus.Counter +} + +// TODO: Add a counter that encompasses the publisher stats grouped by project and domain. +type EventPublisher struct { + pub pubsub.Publisher + systemMetrics eventPublisherSystemMetrics + events []string +} + +// The key is the notification type as defined as an enum. +func (p *EventPublisher) Publish(ctx context.Context, notificationType string, msg proto.Message) error { + p.systemMetrics.PublishTotal.Inc() + logger.Debugf(ctx, "Publishing the following message [%s]", msg.String()) + + if !p.shouldPublishEvent(notificationType) { + return nil + } + + err := p.pub.Publish(ctx, notificationType, msg) + if err != nil { + p.systemMetrics.PublishError.Inc() + logger.Errorf(ctx, "Failed to publish a message with key [%s] and message [%s] and error: %v", notificationType, msg.String(), err) + } + return err +} + +func (p *EventPublisher) shouldPublishEvent(notificationType string) bool { + for _, e := range p.events { + if e == notificationType { + return true + } + } + return false +} + +func newEventPublisherSystemMetrics(scope promutils.Scope) eventPublisherSystemMetrics { + return eventPublisherSystemMetrics{ + Scope: scope, + PublishTotal: scope.MustNewCounter("event_publish_total", "overall count of publish messages"), + PublishError: scope.MustNewCounter("event_publish_errors", "count of publish errors"), + } +} + +func NewEventsPublisher(pub pubsub.Publisher, scope promutils.Scope, eventTypes string) interfaces.Publisher { + events := strings.Split(eventTypes, ",") + var eventList = make([]string, 0) + + for _, event := range events { + switch event { + case "task": + var taskExecutionReq admin.TaskExecutionEventRequest + eventList = append(eventList, proto.MessageName(&taskExecutionReq)) + case "node": + var nodeExecutionReq admin.NodeExecutionEventRequest + eventList = append(eventList, proto.MessageName(&nodeExecutionReq)) + case "workflow": + var workflowExecutionReq admin.WorkflowExecutionEventRequest + eventList = append(eventList, proto.MessageName(&workflowExecutionReq)) + } + } + + return &EventPublisher{ + pub: pub, + systemMetrics: newEventPublisherSystemMetrics(scope.NewSubScope("events_publisher")), + events: eventList, + } +} diff --git a/pkg/manager/impl/task_execution_manager.go b/pkg/manager/impl/task_execution_manager.go index ddb927e7c..13cfb4155 100644 --- a/pkg/manager/impl/task_execution_manager.go +++ b/pkg/manager/impl/task_execution_manager.go @@ -3,6 +3,8 @@ package impl import ( "context" "fmt" + "github.com/golang/protobuf/proto" + notificationInterfaces "github.com/lyft/flyteadmin/pkg/async/notifications/interfaces" "strconv" "github.com/lyft/flytestdlib/storage" @@ -43,11 +45,12 @@ type taskExecutionMetrics struct { } type TaskExecutionManager struct { - db repositories.RepositoryInterface - config runtimeInterfaces.Configuration - storageClient *storage.DataStore - metrics taskExecutionMetrics - urlData dataInterfaces.RemoteURLInterface + db repositories.RepositoryInterface + config runtimeInterfaces.Configuration + storageClient *storage.DataStore + metrics taskExecutionMetrics + urlData dataInterfaces.RemoteURLInterface + notificationClient notificationInterfaces.Publisher } func getTaskExecutionContext(ctx context.Context, identifier *core.TaskExecutionIdentifier) context.Context { @@ -173,6 +176,8 @@ func (m *TaskExecutionManager) CreateTaskExecutionEvent(ctx context.Context, req m.metrics.TaskExecutionsTerminated.Inc() } + m.notificationClient.Publish(ctx, proto.MessageName(&request), &request) + m.metrics.TaskExecutionEventsCreated.Inc() logger.Debugf(ctx, "Successfully recorded task execution event [%v]", request.Event) // TODO: we will want to return some scope information here soon! @@ -310,9 +315,7 @@ func (m *TaskExecutionManager) GetTaskExecutionData( return response, nil } -func NewTaskExecutionManager( - db repositories.RepositoryInterface, config runtimeInterfaces.Configuration, storageClient *storage.DataStore, - scope promutils.Scope, urlData dataInterfaces.RemoteURLInterface) interfaces.TaskExecutionInterface { +func NewTaskExecutionManager(db repositories.RepositoryInterface, config runtimeInterfaces.Configuration, storageClient *storage.DataStore, scope promutils.Scope, urlData dataInterfaces.RemoteURLInterface, publisher notificationInterfaces.Publisher) interfaces.TaskExecutionInterface { metrics := taskExecutionMetrics{ Scope: scope, ActiveTaskExecutions: scope.MustNewGauge("active_executions", @@ -335,10 +338,11 @@ func NewTaskExecutionManager( "size in bytes of serialized node execution outputs"), } return &TaskExecutionManager{ - db: db, - config: config, - storageClient: storageClient, - metrics: metrics, - urlData: urlData, + db: db, + config: config, + storageClient: storageClient, + metrics: metrics, + urlData: urlData, + notificationClient: publisher, } } diff --git a/pkg/manager/impl/task_execution_manager_test.go b/pkg/manager/impl/task_execution_manager_test.go index 9cfb181b5..ea3b92b1d 100644 --- a/pkg/manager/impl/task_execution_manager_test.go +++ b/pkg/manager/impl/task_execution_manager_test.go @@ -175,8 +175,7 @@ func TestCreateTaskEvent(t *testing.T) { }, input) return nil }) - taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), - getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL) + taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL, nil) resp, err := taskExecManager.CreateTaskExecutionEvent(context.Background(), taskEventRequest) assert.True(t, getTaskCalled) assert.True(t, createTaskCalled) @@ -289,8 +288,7 @@ func TestCreateTaskEvent_Update(t *testing.T) { OutputUri: expectedOutputResult.OutputUri, } - taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), - getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL) + taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL, nil) resp, err := taskExecManager.CreateTaskExecutionEvent(context.Background(), taskEventRequest) assert.True(t, getTaskCalled) assert.True(t, updateTaskCalled) @@ -305,8 +303,7 @@ func TestCreateTaskEvent_MissingExecution(t *testing.T) { func(ctx context.Context, input interfaces.GetNodeExecutionInput) (models.NodeExecution, error) { return models.NodeExecution{}, expectedErr }) - taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), - getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL) + taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL, nil) resp, err := taskExecManager.CreateTaskExecutionEvent(context.Background(), taskEventRequest) assert.EqualError(t, err, "failed to get existing node execution id: [node_id:\"node-id\""+ " execution_id: ] "+ @@ -326,8 +323,7 @@ func TestCreateTaskEvent_CreateDatabaseError(t *testing.T) { func(ctx context.Context, input models.TaskExecution) error { return expectedErr }) - taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), - getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL) + taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL, nil) resp, err := taskExecManager.CreateTaskExecutionEvent(context.Background(), taskEventRequest) assert.EqualError(t, err, expectedErr.Error()) assert.Nil(t, resp) @@ -367,8 +363,7 @@ func TestCreateTaskEvent_UpdateDatabaseError(t *testing.T) { func(ctx context.Context, execution models.TaskExecution) error { return expectedErr }) - nodeExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), - getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL) + nodeExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL, nil) resp, err := nodeExecManager.CreateTaskExecutionEvent(context.Background(), taskEventRequest) assert.EqualError(t, err, expectedErr.Error()) assert.Nil(t, resp) @@ -401,8 +396,7 @@ func TestCreateTaskEvent_UpdateTerminalEventError(t *testing.T) { }, nil }) taskEventRequest.Event.Phase = core.TaskExecution_RUNNING - taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), - getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL) + taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL, nil) resp, err := taskExecManager.CreateTaskExecutionEvent(context.Background(), taskEventRequest) assert.Nil(t, resp) @@ -469,8 +463,7 @@ func TestCreateTaskEvent_PhaseVersionChange(t *testing.T) { taskEventRequest.Event.PhaseVersion = uint32(1) taskEventRequest.Event.OccurredAt = taskEventUpdatedAtProto - taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), - getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL) + taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL, nil) resp, err := taskExecManager.CreateTaskExecutionEvent(context.Background(), taskEventRequest) assert.True(t, getTaskCalled) assert.True(t, updateTaskCalled) @@ -536,8 +529,7 @@ func TestGetTaskExecution(t *testing.T) { }, }, nil }) - taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), - getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL) + taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL, nil) taskExecution, err := taskExecManager.GetTaskExecution(context.Background(), admin.TaskExecutionGetRequest{ Id: &core.TaskExecutionIdentifier{ TaskId: sampleTaskID, @@ -587,8 +579,7 @@ func TestGetTaskExecution_TransformerError(t *testing.T) { Closure: []byte("i'm an invalid task closure"), }, nil }) - taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), - getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL) + taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL, nil) taskExecution, err := taskExecManager.GetTaskExecution(context.Background(), admin.TaskExecutionGetRequest{ Id: &core.TaskExecutionIdentifier{ TaskId: sampleTaskID, @@ -698,8 +689,7 @@ func TestListTaskExecutions(t *testing.T) { }, }, nil }) - taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), - getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL) + taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL, nil) taskExecutions, err := taskExecManager.ListTaskExecutions(context.Background(), admin.TaskExecutionListRequest{ NodeExecutionId: &core.NodeExecutionIdentifier{ NodeId: "nodey b", @@ -770,8 +760,7 @@ func TestListTaskExecutions_NoFilters(t *testing.T) { listTaskCalled = true return interfaces.TaskExecutionCollectionOutput{}, nil }) - taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), - getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL) + taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL, nil) _, err := taskExecManager.ListTaskExecutions(context.Background(), admin.TaskExecutionListRequest{ Token: "1", Limit: 99, @@ -789,8 +778,7 @@ func TestListTaskExecutions_NoLimit(t *testing.T) { getTaskCalled = true return interfaces.TaskExecutionCollectionOutput{}, nil }) - taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), - getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL) + taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL, nil) _, err := taskExecManager.ListTaskExecutions(context.Background(), admin.TaskExecutionListRequest{ Limit: 0, }) @@ -821,8 +809,7 @@ func TestListTaskExecutions_NothingToReturn(t *testing.T) { listTasksCalled = true return interfaces.TaskCollectionOutput{}, nil }) - taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), - getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL) + taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL, nil) _, err := taskExecManager.ListTaskExecutions(context.Background(), admin.TaskExecutionListRequest{ NodeExecutionId: &core.NodeExecutionIdentifier{ ExecutionId: &core.WorkflowExecutionIdentifier{ @@ -924,8 +911,7 @@ func TestGetTaskExecutionData(t *testing.T) { } return fmt.Errorf("unexpected call to find value in storage [%v]", reference.String()) } - taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), mockStorage, - mockScope.NewTestScope(), mockTaskExecutionRemoteURL) + taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), mockStorage, mockScope.NewTestScope(), mockTaskExecutionRemoteURL, nil) dataResponse, err := taskExecManager.GetTaskExecutionData(context.Background(), admin.TaskExecutionGetDataRequest{ Id: &core.TaskExecutionIdentifier{ TaskId: sampleTaskID, diff --git a/pkg/rpc/adminservice/base.go b/pkg/rpc/adminservice/base.go index 376233868..89fce681b 100644 --- a/pkg/rpc/adminservice/base.go +++ b/pkg/rpc/adminservice/base.go @@ -99,6 +99,7 @@ func NewAdminServer(kubeConfig, master string) *AdminService { publisher := notifications.NewNotificationsPublisher(*configuration.ApplicationConfiguration().GetNotificationsConfig(), adminScope) processor := notifications.NewNotificationsProcessor(*configuration.ApplicationConfiguration().GetNotificationsConfig(), adminScope) + eventPublisher := notifications.NewEventsPublisher(*configuration.ApplicationConfiguration().GetNotificationsConfig(), adminScope) go func() { logger.Info(context.Background(), "Started processing notifications.") processor.StartProcessing() @@ -162,8 +163,8 @@ func NewAdminServer(kubeConfig, master string) *AdminService { NamedEntityManager: namedEntityManager, NodeExecutionManager: manager.NewNodeExecutionManager( db, configuration, dataStorageClient, adminScope.NewSubScope("node_execution_manager"), urlData), - TaskExecutionManager: manager.NewTaskExecutionManager( - db, configuration, dataStorageClient, adminScope.NewSubScope("task_execution_manager"), urlData), + TaskExecutionManager: manager.NewTaskExecutionManager(db, configuration, dataStorageClient, + adminScope.NewSubScope("task_execution_manager"), urlData, eventPublisher), ProjectManager: manager.NewProjectManager(db, configuration), ResourceManager: resources.NewResourceManager(db, configuration.ApplicationConfiguration()), Metrics: InitMetrics(adminScope), diff --git a/pkg/runtime/interfaces/application_configuration.go b/pkg/runtime/interfaces/application_configuration.go index d00878522..9b0d49480 100644 --- a/pkg/runtime/interfaces/application_configuration.go +++ b/pkg/runtime/interfaces/application_configuration.go @@ -146,6 +146,14 @@ type NotificationsEmailerConfig struct { Body string `json:"body"` } +// This section handles configuration for the workflow notifications pipeline. +type EventsPublisherConfig struct { + // The topic which events should be published, e.g. node, task, workflow + TopicName string `json:"topicName"` + // Event types: task, node, workflow executions + EventTypes string `json:"eventTypes"` +} + // Configuration specific to notifications handling type NotificationsConfig struct { // Defines the cloud provider that backs the scheduler. In the absence of a specification the no-op, 'local' @@ -162,6 +170,8 @@ type NotificationsConfig struct { ReconnectAttempts int `json:"reconnectAttempts"` // Specifies the time interval to wait before attempting to reconnect the notifications processor client. ReconnectDelaySeconds int `json:"reconnectDelaySeconds"` + // Publish events to a pubsub tops + EventPublisherConfig EventsPublisherConfig `json:"eventPublisher"` } // Domains are always globally set in the application config, whereas individual projects can be individually registered. From 6fa0eabfd5bb0bd216c52c242ea71fa1c97c3b76 Mon Sep 17 00:00:00 2001 From: tnsetting Date: Tue, 2 Feb 2021 14:53:22 +0100 Subject: [PATCH 2/9] sending events to pubsub --- pkg/async/notifications/factory.go | 11 +- .../implementations/event_publisher.go | 36 ++-- .../implementations/event_publisher_test.go | 172 ++++++++++++++++ pkg/manager/impl/execution_manager.go | 15 +- pkg/manager/impl/execution_manager_test.go | 194 +++++------------- pkg/manager/impl/node_execution_manager.go | 27 +-- .../impl/node_execution_manager_test.go | 60 ++---- pkg/rpc/adminservice/base.go | 9 +- .../interfaces/application_configuration.go | 6 +- 9 files changed, 297 insertions(+), 233 deletions(-) create mode 100644 pkg/async/notifications/implementations/event_publisher_test.go diff --git a/pkg/async/notifications/factory.go b/pkg/async/notifications/factory.go index f5aa73e42..18252aa0d 100644 --- a/pkg/async/notifications/factory.go +++ b/pkg/async/notifications/factory.go @@ -150,12 +150,15 @@ func NewNotificationsPublisher(config runtimeInterfaces.NotificationsConfig, sco } func NewEventsPublisher(config runtimeInterfaces.NotificationsConfig, scope promutils.Scope) interfaces.Publisher { + if config.ExternalEvent.EventPublisherConfig.TopicName == "" { + return implementations.NewNoopPublish() + } reconnectAttempts := config.ReconnectAttempts reconnectDelay := time.Duration(config.ReconnectDelaySeconds) * time.Second switch config.Type { case common.AWS: snsConfig := gizmoAWS.SNSConfig{ - Topic: config.NotificationsPublisherConfig.TopicName, + Topic: config.ExternalEvent.EventPublisherConfig.TopicName, } if config.AWSConfig.Region != "" { snsConfig.Region = config.AWSConfig.Region @@ -174,10 +177,10 @@ func NewEventsPublisher(config runtimeInterfaces.NotificationsConfig, scope prom if err != nil { panic(err) } - return implementations.NewEventsPublisher(publisher, scope, config.EventPublisherConfig.EventTypes) + return implementations.NewEventsPublisher(publisher, scope, config.ExternalEvent.EventPublisherConfig.EventTypes) case common.GCP: pubsubConfig := gizmoGCP.Config{ - Topic: config.EventPublisherConfig.TopicName, + Topic: config.ExternalEvent.EventPublisherConfig.TopicName, } pubsubConfig.ProjectID = config.GCPConfig.ProjectID var publisher pubsub.MultiPublisher @@ -190,7 +193,7 @@ func NewEventsPublisher(config runtimeInterfaces.NotificationsConfig, scope prom if err != nil { panic(err) } - return implementations.NewEventsPublisher(publisher, scope, config.EventPublisherConfig.EventTypes) + return implementations.NewEventsPublisher(publisher, scope, config.ExternalEvent.EventPublisherConfig.EventTypes) case common.Local: fallthrough default: diff --git a/pkg/async/notifications/implementations/event_publisher.go b/pkg/async/notifications/implementations/event_publisher.go index 870d922d3..9099b187d 100644 --- a/pkg/async/notifications/implementations/event_publisher.go +++ b/pkg/async/notifications/implementations/event_publisher.go @@ -27,6 +27,17 @@ type EventPublisher struct { events []string } +func getSupportedEvents() map[string]string { + supportedEvents := make(map[string]string) + var taskExecutionReq admin.TaskExecutionEventRequest + supportedEvents["task"] = proto.MessageName(&taskExecutionReq) + var nodeExecutionReq admin.NodeExecutionEventRequest + supportedEvents["node"] = proto.MessageName(&nodeExecutionReq) + var workflowExecutionReq admin.WorkflowExecutionEventRequest + supportedEvents["workflow"] = proto.MessageName(&workflowExecutionReq) + return supportedEvents +} + // The key is the notification type as defined as an enum. func (p *EventPublisher) Publish(ctx context.Context, notificationType string, msg proto.Message) error { p.systemMetrics.PublishTotal.Inc() @@ -62,20 +73,19 @@ func newEventPublisherSystemMetrics(scope promutils.Scope) eventPublisherSystemM } func NewEventsPublisher(pub pubsub.Publisher, scope promutils.Scope, eventTypes string) interfaces.Publisher { - events := strings.Split(eventTypes, ",") - var eventList = make([]string, 0) + supportedEvents := getSupportedEvents() - for _, event := range events { - switch event { - case "task": - var taskExecutionReq admin.TaskExecutionEventRequest - eventList = append(eventList, proto.MessageName(&taskExecutionReq)) - case "node": - var nodeExecutionReq admin.NodeExecutionEventRequest - eventList = append(eventList, proto.MessageName(&nodeExecutionReq)) - case "workflow": - var workflowExecutionReq admin.WorkflowExecutionEventRequest - eventList = append(eventList, proto.MessageName(&workflowExecutionReq)) + var eventList = make([]string, 0) + if strings.Contains(eventTypes, "*") || strings.Contains(eventTypes, "all") { + for _, e := range supportedEvents { + eventList = append(eventList, e) + } + } else { + events := strings.Split(eventTypes, ",") + for _, event := range events { + if e, found := supportedEvents[event]; found { + eventList = append(eventList, e) + } } } diff --git a/pkg/async/notifications/implementations/event_publisher_test.go b/pkg/async/notifications/implementations/event_publisher_test.go new file mode 100644 index 000000000..c1f4142fe --- /dev/null +++ b/pkg/async/notifications/implementations/event_publisher_test.go @@ -0,0 +1,172 @@ +package implementations + +import ( + "context" + "errors" + "github.com/golang/protobuf/ptypes" + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin" + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/event" + "testing" + "time" + + "github.com/NYTimes/gizmo/pubsub" + "github.com/NYTimes/gizmo/pubsub/pubsubtest" + "github.com/golang/protobuf/proto" + "github.com/lyft/flytestdlib/promutils" + "github.com/stretchr/testify/assert" +) + +var testEventPublisher pubsubtest.TestPublisher +var mockEventPublisher pubsub.Publisher = &testEventPublisher + +var executionID = core.WorkflowExecutionIdentifier{ + Project: "project", + Domain: "domain", + Name: "name", +} +var nodeExecutionID = core.NodeExecutionIdentifier{ + NodeId: "node id", + ExecutionId: &executionID, +} + +var taskID = &core.Identifier{ + ResourceType: core.ResourceType_TASK, + Project: "p", + Domain: "d", + Version: "v", + Name: "n", +} + +var occurredAt = time.Now().UTC() +var occurredAtProto, _ = ptypes.TimestampProto(occurredAt) + +var taskPhase = core.TaskExecution_RUNNING + +var retryAttempt = uint32(1) + +const requestID = "request id" + +var taskRequest = &admin.TaskExecutionEventRequest{ + RequestId: requestID, + Event: &event.TaskExecutionEvent{ + TaskId: taskID, + ParentNodeExecutionId: &nodeExecutionID, + RetryAttempt: retryAttempt, + Phase: taskPhase, + OccurredAt: occurredAtProto, + }, +} + +var nodeRequest = &admin.NodeExecutionEventRequest{ + RequestId: requestID, + Event: &event.NodeExecutionEvent{ + ProducerId: "propeller", + Id: &nodeExecutionID, + OccurredAt: occurredAtProto, + Phase: core.NodeExecution_RUNNING, + InputUri: "input uri", + }, +} + +var workflowRequest = &admin.WorkflowExecutionEventRequest{ + Event: &event.WorkflowExecutionEvent{ + Phase: core.WorkflowExecution_SUCCEEDED, + OutputResult: &event.WorkflowExecutionEvent_OutputUri{ + OutputUri: "somestring", + }, + ExecutionId: &executionID, + }, +} + +// This method should be invoked before every test around Publisher. +func initializeEventPublisher() { + testEventPublisher.Published = nil + testEventPublisher.GivenError = nil + testEventPublisher.FoundError = nil +} + +func TestNewEventsPublisher_EventTypes(t *testing.T) { + { + tests := []struct { + name string + eventTypes string + events []proto.Message + shouldSendEvent []bool + expectedSendCnt int + }{ + {"eventTypes as workflow,node", "workflow,node", + []proto.Message{workflowRequest, nodeRequest, taskRequest}, + []bool{true, true, false}, + 2}, + {"eventTypes as workflow,task", "workflow,task", + []proto.Message{workflowRequest, nodeRequest, taskRequest}, + []bool{true, false, true}, + 2}, + {"eventTypes as workflow,task", "node,task", + []proto.Message{workflowRequest, nodeRequest, taskRequest}, + []bool{false, true, true}, + 2}, + {"eventTypes as task", "task", + []proto.Message{taskRequest}, + []bool{true}, + 1}, + {"eventTypes as node", "node", + []proto.Message{nodeRequest}, + []bool{true}, + 1}, + {"eventTypes as workflow", "workflow", + []proto.Message{workflowRequest}, + []bool{true}, + 1}, + {"eventTypes as workflow", "workflow", + []proto.Message{nodeRequest, taskRequest}, + []bool{false, false}, + 0}, + {"eventTypes as task", "task", + []proto.Message{workflowRequest, nodeRequest}, + []bool{false, false}, + 0}, + {"eventTypes as node", "node", + []proto.Message{workflowRequest, taskRequest}, + []bool{false, false}, + 0}, + {"eventTypes as all", "all", + []proto.Message{workflowRequest, nodeRequest, taskRequest}, + []bool{true, true, true}, + 3}, + {"eventTypes as *", "*", + []proto.Message{workflowRequest, nodeRequest, taskRequest}, + []bool{true, true, true}, + 3}, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + initializeEventPublisher() + var currentEventPublisher = NewEventsPublisher(mockEventPublisher, promutils.NewTestScope(), test.eventTypes) + var cnt = 0 + for id, event := range test.events { + assert.Nil(t, currentEventPublisher.Publish(context.Background(), proto.MessageName(event), + event)) + if test.shouldSendEvent[id] { + assert.Equal(t, proto.MessageName(event), testEventPublisher.Published[cnt].Key) + marshalledData, err := proto.Marshal(event) + assert.Nil(t, err) + assert.Equal(t, marshalledData, testEventPublisher.Published[cnt].Body) + cnt++ + } + } + assert.Equal(t, test.expectedSendCnt, len(testEventPublisher.Published)) + }) + } + } +} + +func TestEventPublisher_PublishError(t *testing.T) { + initializeEventPublisher() + currentEventPublisher := NewEventsPublisher(mockEventPublisher, promutils.NewTestScope(), "*") + var publishError = errors.New("publish() returns an error") + testEventPublisher.GivenError = publishError + assert.Equal(t, publishError, currentEventPublisher.Publish(context.Background(), + proto.MessageName(taskRequest), taskRequest)) +} diff --git a/pkg/manager/impl/execution_manager.go b/pkg/manager/impl/execution_manager.go index dfe85367e..7bc6b1477 100644 --- a/pkg/manager/impl/execution_manager.go +++ b/pkg/manager/impl/execution_manager.go @@ -89,6 +89,7 @@ type ExecutionManager struct { namedEntityManager interfaces.NamedEntityInterface resourceManager interfaces.ResourceInterface qualityOfServiceAllocator executions.QualityOfServiceAllocator + eventPublisher notificationInterfaces.Publisher } func getExecutionContext(ctx context.Context, id *core.WorkflowExecutionIdentifier) context.Context { @@ -1004,6 +1005,7 @@ func (m *ExecutionManager) CreateWorkflowEvent(ctx context.Context, request admi return nil, err } } + m.eventPublisher.Publish(ctx, proto.MessageName(&request), &request) m.systemMetrics.ExecutionEventsCreated.Inc() return &admin.WorkflowExecutionEventResponse{}, nil @@ -1329,17 +1331,7 @@ func newExecutionSystemMetrics(scope promutils.Scope) executionSystemMetrics { } } -func NewExecutionManager( - db repositories.RepositoryInterface, - config runtimeInterfaces.Configuration, - storageClient *storage.DataStore, - workflowExecutor workflowengineInterfaces.Executor, - systemScope promutils.Scope, - userScope promutils.Scope, - publisher notificationInterfaces.Publisher, - urlData dataInterfaces.RemoteURLInterface, - workflowManager interfaces.WorkflowInterface, - namedEntityManager interfaces.NamedEntityInterface) interfaces.ExecutionInterface { +func NewExecutionManager(db repositories.RepositoryInterface, config runtimeInterfaces.Configuration, storageClient *storage.DataStore, workflowExecutor workflowengineInterfaces.Executor, systemScope promutils.Scope, userScope promutils.Scope, publisher notificationInterfaces.Publisher, urlData dataInterfaces.RemoteURLInterface, workflowManager interfaces.WorkflowInterface, namedEntityManager interfaces.NamedEntityInterface, eventPublisher notificationInterfaces.Publisher) interfaces.ExecutionInterface { queueAllocator := executions.NewQueueAllocator(config, db) systemMetrics := newExecutionSystemMetrics(systemScope) @@ -1369,6 +1361,7 @@ func NewExecutionManager( namedEntityManager: namedEntityManager, resourceManager: resourceManager, qualityOfServiceAllocator: executions.NewQualityOfServiceAllocator(config, resourceManager), + eventPublisher: eventPublisher, } } diff --git a/pkg/manager/impl/execution_manager_test.go b/pkg/manager/impl/execution_manager_test.go index f0e5d85ba..4f25f9e23 100644 --- a/pkg/manager/impl/execution_manager_test.go +++ b/pkg/manager/impl/execution_manager_test.go @@ -256,9 +256,7 @@ func TestCreateExecution(t *testing.T) { mockConfig := getMockExecutionsConfigProvider() mockConfig.(*runtimeMocks.MockConfigurationProvider).AddQualityOfServiceConfiguration(qosProvider) - execManager := NewExecutionManager( - repository, mockConfig, getMockStorageForExecTest(context.Background()), mockExecutor, - mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager := NewExecutionManager(repository, mockConfig, getMockStorageForExecTest(context.Background()), mockExecutor, mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) request := testutils.GetExecutionRequest() request.Spec.Metadata = &admin.ExecutionMetadata{ Principal: "unused - populated from authenticated context", @@ -332,9 +330,7 @@ func TestCreateExecutionFromWorkflowNode(t *testing.T) { }, ) - execManager := NewExecutionManager( - repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), - mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) request := testutils.GetExecutionRequest() request.Spec.Metadata = &admin.ExecutionMetadata{ Mode: admin.ExecutionMetadata_CHILD_WORKFLOW, @@ -372,8 +368,7 @@ func TestCreateExecution_NoAssignedName(t *testing.T) { Cluster: testCluster, }, nil }) - execManager := NewExecutionManager( - repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockExecutor, mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockExecutor, mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) request := testutils.GetExecutionRequest() request.Name = "" response, err := execManager.CreateExecution(context.Background(), request, requestedAt) @@ -421,8 +416,7 @@ func TestCreateExecution_TaggedQueue(t *testing.T) { Cluster: testCluster, }, nil }) - execManager := NewExecutionManager( - repository, configProvider, getMockStorageForExecTest(context.Background()), mockExecutor, mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager := NewExecutionManager(repository, configProvider, getMockStorageForExecTest(context.Background()), mockExecutor, mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) request := testutils.GetExecutionRequest() response, err := execManager.CreateExecution(context.Background(), request, requestedAt) @@ -438,9 +432,7 @@ func TestCreateExecution_TaggedQueue(t *testing.T) { func TestCreateExecutionValidationError(t *testing.T) { repository := getMockRepositoryForExecTest() setDefaultLpCallbackForExecTest(repository) - execManager := NewExecutionManager( - repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), - mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) request := testutils.GetExecutionRequest() request.Domain = "" @@ -452,9 +444,7 @@ func TestCreateExecutionValidationError(t *testing.T) { func TestCreateExecution_InvalidLpIdentifier(t *testing.T) { repository := getMockRepositoryForExecTest() setDefaultLpCallbackForExecTest(repository) - execManager := NewExecutionManager( - repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), - mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) request := testutils.GetExecutionRequest() request.Spec.LaunchPlan = nil @@ -466,9 +456,7 @@ func TestCreateExecution_InvalidLpIdentifier(t *testing.T) { func TestCreateExecutionInCompatibleInputs(t *testing.T) { repository := getMockRepositoryForExecTest() setDefaultLpCallbackForExecTest(repository) - execManager := NewExecutionManager( - repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), - mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) request := testutils.GetExecutionRequest() request.Inputs = &core.LiteralMap{ @@ -492,8 +480,7 @@ func TestCreateExecutionPropellerFailure(t *testing.T) { return nil, expectedErr } mockExecutor.(*workflowengineMocks.MockExecutor).SetExecuteWorkflowCallback(createFunc) - execManager := NewExecutionManager( - repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockExecutor, mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockExecutor, mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) request := testutils.GetExecutionRequest() @@ -512,9 +499,7 @@ func TestCreateExecutionDatabaseFailure(t *testing.T) { } repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetCreateCallback(exCreateFunc) - execManager := NewExecutionManager( - repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), - mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) request := testutils.GetExecutionRequest() response, err := execManager.CreateExecution(context.Background(), request, requestedAt) @@ -577,9 +562,7 @@ func TestCreateExecutionVerifyDbModel(t *testing.T) { } repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetCreateCallback(exCreateFunc) - execManager := NewExecutionManager( - repository, getMockExecutionsConfigProvider(), storageClient, workflowengineMocks.NewMockExecutor(), - mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), storageClient, workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) execManager.(*ExecutionManager)._clock = mockClock @@ -617,9 +600,7 @@ func TestCreateExecutionDefaultNotifications(t *testing.T) { return nil } repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetCreateCallback(exCreateFunc) - execManager := NewExecutionManager( - repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), - mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) response, err := execManager.CreateExecution(context.Background(), request, requestedAt) assert.Nil(t, err) @@ -653,9 +634,7 @@ func TestCreateExecutionDisableNotifications(t *testing.T) { return nil } repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetCreateCallback(exCreateFunc) - execManager := NewExecutionManager( - repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), - mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) response, err := execManager.CreateExecution(context.Background(), request, requestedAt) assert.Nil(t, err) @@ -720,9 +699,7 @@ func TestCreateExecutionNoNotifications(t *testing.T) { return nil } repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetCreateCallback(exCreateFunc) - execManager := NewExecutionManager( - repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), - mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) response, err := execManager.CreateExecution(context.Background(), request, requestedAt) assert.Nil(t, err) @@ -751,9 +728,7 @@ func TestCreateExecutionDynamicLabelsAndAnnotations(t *testing.T) { Cluster: testCluster, }, nil }) - execManager := NewExecutionManager( - repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockExecutor, - mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockExecutor, mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) request := testutils.GetExecutionRequest() request.Spec.Labels = &admin.Labels{ Values: map[string]string{ @@ -833,9 +808,7 @@ func TestRelaunchExecution(t *testing.T) { // Set up mocks. repository := getMockRepositoryForExecTest() setDefaultLpCallbackForExecTest(repository) - execManager := NewExecutionManager( - repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), - mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) startTime := time.Now() startTimeProto, _ := ptypes.TimestampProto(startTime) existingClosure := admin.ExecutionClosure{ @@ -892,9 +865,7 @@ func TestRelaunchExecution_GetExistingFailure(t *testing.T) { // Set up mocks. repository := getMockRepositoryForExecTest() setDefaultLpCallbackForExecTest(repository) - execManager := NewExecutionManager( - repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), - mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) expectedErr := errors.New("expected error") repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback( @@ -928,9 +899,7 @@ func TestRelaunchExecution_CreateFailure(t *testing.T) { // Set up mocks. repository := getMockRepositoryForExecTest() setDefaultLpCallbackForExecTest(repository) - execManager := NewExecutionManager( - repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), - mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) startTime := time.Now() startTimeProto, _ := ptypes.TimestampProto(startTime) existingClosure := admin.ExecutionClosure{ @@ -1015,9 +984,7 @@ func TestCreateWorkflowEvent(t *testing.T) { return nil } repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateCallback(updateExecutionFunc) - execManager := NewExecutionManager( - repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), - mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) resp, err := execManager.CreateWorkflowEvent(context.Background(), admin.WorkflowExecutionEventRequest{ RequestId: "1", Event: &event.WorkflowExecutionEvent{ @@ -1055,9 +1022,7 @@ func TestCreateWorkflowEvent_TerminalState(t *testing.T) { return nil } repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateCallback(updateExecutionFunc) - execManager := NewExecutionManager( - repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), - mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) resp, err := execManager.CreateWorkflowEvent(context.Background(), admin.WorkflowExecutionEventRequest{ RequestId: "1", @@ -1112,9 +1077,7 @@ func TestCreateWorkflowEvent_StartedRunning(t *testing.T) { return nil } repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateCallback(updateExecutionFunc) - execManager := NewExecutionManager( - repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), - mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) occurredAtTimestamp, _ := ptypes.TimestampProto(occurredAt) resp, err := execManager.CreateWorkflowEvent(context.Background(), admin.WorkflowExecutionEventRequest{ RequestId: "1", @@ -1153,9 +1116,7 @@ func TestCreateWorkflowEvent_DuplicateRunning(t *testing.T) { }, ) - execManager := NewExecutionManager( - repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), - mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) occurredAtTimestamp, _ := ptypes.TimestampProto(occurredAt) resp, err := execManager.CreateWorkflowEvent(context.Background(), admin.WorkflowExecutionEventRequest{ RequestId: "1", @@ -1196,9 +1157,7 @@ func TestCreateWorkflowEvent_InvalidPhaseChange(t *testing.T) { }, ) - execManager := NewExecutionManager( - repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), - mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) occurredAtTimestamp, _ := ptypes.TimestampProto(occurredAt) resp, err := execManager.CreateWorkflowEvent(context.Background(), admin.WorkflowExecutionEventRequest{ RequestId: "1", @@ -1232,9 +1191,7 @@ func TestCreateWorkflowEvent_InvalidEvent(t *testing.T) { return nil } repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateCallback(updateExecutionFunc) - execManager := NewExecutionManager( - repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), - mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) resp, err := execManager.CreateWorkflowEvent(context.Background(), admin.WorkflowExecutionEventRequest{ RequestId: "1", Event: &event.WorkflowExecutionEvent{ @@ -1262,9 +1219,7 @@ func TestCreateWorkflowEvent_UpdateModelError(t *testing.T) { Message: "bar baz", } - execManager := NewExecutionManager( - repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), - mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) resp, err := execManager.CreateWorkflowEvent(context.Background(), admin.WorkflowExecutionEventRequest{ RequestId: "1", Event: &event.WorkflowExecutionEvent{ @@ -1296,9 +1251,7 @@ func TestCreateWorkflowEvent_DatabaseGetError(t *testing.T) { Code: "foo", Message: "bar baz", } - execManager := NewExecutionManager( - repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), - mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) resp, err := execManager.CreateWorkflowEvent(context.Background(), admin.WorkflowExecutionEventRequest{ RequestId: "1", Event: &event.WorkflowExecutionEvent{ @@ -1332,9 +1285,7 @@ func TestCreateWorkflowEvent_DatabaseUpdateError(t *testing.T) { return expectedErr } repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateCallback(updateExecutionFunc) - execManager := NewExecutionManager( - repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), - mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) resp, err := execManager.CreateWorkflowEvent(context.Background(), admin.WorkflowExecutionEventRequest{ RequestId: "1", Event: &event.WorkflowExecutionEvent{ @@ -1373,9 +1324,7 @@ func TestGetExecution(t *testing.T) { }, nil } repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(executionGetFunc) - execManager := NewExecutionManager( - repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), - mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) execution, err := execManager.GetExecution(context.Background(), admin.WorkflowExecutionGetRequest{ Id: &executionIdentifier, }) @@ -1396,9 +1345,7 @@ func TestGetExecution_DatabaseError(t *testing.T) { return models.Execution{}, expectedErr } repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(executionGetFunc) - execManager := NewExecutionManager( - repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), - mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) execution, err := execManager.GetExecution(context.Background(), admin.WorkflowExecutionGetRequest{ Id: &executionIdentifier, }) @@ -1428,9 +1375,7 @@ func TestGetExecution_TransformerError(t *testing.T) { }, nil } repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(executionGetFunc) - execManager := NewExecutionManager( - repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), - mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) execution, err := execManager.GetExecution(context.Background(), admin.WorkflowExecutionGetRequest{ Id: &executionIdentifier, }) @@ -1490,9 +1435,7 @@ func TestListExecutions(t *testing.T) { }, nil } repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetListCallback(executionListFunc) - execManager := NewExecutionManager( - repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), - mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) executionList, err := execManager.ListExecutions(context.Background(), admin.ResourceListRequest{ Id: &admin.NamedEntityIdentifier{ @@ -1523,10 +1466,7 @@ func TestListExecutions(t *testing.T) { } func TestListExecutions_MissingParameters(t *testing.T) { - execManager := NewExecutionManager( - repositoryMocks.NewMockRepository(), - getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), - mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager := NewExecutionManager(repositoryMocks.NewMockRepository(), getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) _, err := execManager.ListExecutions(context.Background(), admin.ResourceListRequest{ Id: &admin.NamedEntityIdentifier{ Domain: domainValue, @@ -1563,9 +1503,7 @@ func TestListExecutions_DatabaseError(t *testing.T) { return interfaces.ExecutionCollectionOutput{}, expectedErr } repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetListCallback(executionListFunc) - execManager := NewExecutionManager( - repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), - mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) _, err := execManager.ListExecutions(context.Background(), admin.ResourceListRequest{ Id: &admin.NamedEntityIdentifier{ Project: projectValue, @@ -1596,9 +1534,7 @@ func TestListExecutions_TransformerError(t *testing.T) { }, nil } repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetListCallback(executionListFunc) - execManager := NewExecutionManager( - repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), - mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) executionList, err := execManager.ListExecutions(context.Background(), admin.ResourceListRequest{ Id: &admin.NamedEntityIdentifier{ @@ -1905,9 +1841,7 @@ func TestTerminateExecution(t *testing.T) { }, input.ExecutionID)) return nil }) - execManager := NewExecutionManager( - repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockExecutor, - mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockExecutor, mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) ctx := context.WithValue(context.Background(), auth.PrincipalContextKey, principal) resp, err := execManager.TerminateExecution(ctx, admin.ExecutionTerminateRequest{ @@ -1937,9 +1871,7 @@ func TestTerminateExecution_PropellerError(t *testing.T) { t.Fatal("update should not be called when propeller fails to terminate an execution") return nil }) - execManager := NewExecutionManager( - repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockExecutor, - mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockExecutor, mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) resp, err := execManager.TerminateExecution(context.Background(), admin.ExecutionTerminateRequest{ Id: &core.WorkflowExecutionIdentifier{ @@ -1966,9 +1898,7 @@ func TestTerminateExecution_DatabaseError(t *testing.T) { } repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateExecutionCallback(updateExecutionFunc) - execManager := NewExecutionManager( - repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), - mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) resp, err := execManager.TerminateExecution(context.Background(), admin.ExecutionTerminateRequest{ Id: &core.WorkflowExecutionIdentifier{ Project: "project", @@ -2056,9 +1986,7 @@ func TestGetExecutionData(t *testing.T) { } repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(executionGetFunc) - execManager := NewExecutionManager( - repository, getMockExecutionsConfigProvider(), mockStorage, workflowengineMocks.NewMockExecutor(), - mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), mockStorage, workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) dataResponse, err := execManager.GetExecutionData(context.Background(), admin.WorkflowExecutionGetDataRequest{ Id: &executionIdentifier, }) @@ -2087,9 +2015,7 @@ func TestAddLabelsAndAnnotationsRuntimeLimitsObserved(t *testing.T) { configProvider := getMockExecutionsConfigProvider() configProvider.(*runtimeMocks.MockConfigurationProvider).AddRegistrationValidationConfiguration( mockRegistrationValidationConfig) - execManager := NewExecutionManager( - repository, configProvider, getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), - mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager := NewExecutionManager(repository, configProvider, getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) request := testutils.GetExecutionRequest() request.Spec.Labels = &admin.Labels{ Values: map[string]string{ @@ -2153,9 +2079,7 @@ func TestAddPluginOverrides(t *testing.T) { } partiallyPopulatedInputs := workflowengineInterfaces.ExecuteWorkflowInput{} - execManager := NewExecutionManager( - db, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), - mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager := NewExecutionManager(db, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) taskPluginOverrides, err := execManager.(*ExecutionManager).addPluginOverrides( context.Background(), executionID, workflowName, launchPlanName) @@ -2186,9 +2110,7 @@ func TestPluginOverrides_ResourceGetFailure(t *testing.T) { models.Resource, error) { return models.Resource{}, flyteAdminErrors.NewFlyteAdminErrorf(codes.Aborted, "uh oh") } - execManager := NewExecutionManager( - db, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), - mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager := NewExecutionManager(db, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) _, err := execManager.(*ExecutionManager).addPluginOverrides( context.Background(), executionID, workflowName, launchPlanName) @@ -2217,9 +2139,7 @@ func TestGetExecution_Legacy(t *testing.T) { }, nil } repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(executionGetFunc) - execManager := NewExecutionManager( - repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), - mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) execution, err := execManager.GetExecution(context.Background(), admin.WorkflowExecutionGetRequest{ Id: &executionIdentifier, }) @@ -2254,9 +2174,7 @@ func TestGetExecution_LegacyClient_OffloadedData(t *testing.T) { } repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(executionGetFunc) storageClient := getMockStorageForExecTest(context.Background()) - execManager := NewExecutionManager( - repository, getMockExecutionsConfigProvider(), storageClient, workflowengineMocks.NewMockExecutor(), - mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), storageClient, workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) _ = storageClient.WriteProtobuf(context.Background(), storage.DataReference(shared.UserInputs), storage.Options{}, getLegacySpec().Inputs) _ = storageClient.WriteProtobuf(context.Background(), storage.DataReference(shared.Inputs), storage.Options{}, getLegacyClosure().ComputedInputs) execution, err := execManager.GetExecution(context.Background(), admin.WorkflowExecutionGetRequest{ @@ -2316,9 +2234,7 @@ func TestGetExecutionData_LegacyModel(t *testing.T) { repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(executionGetFunc) storageClient := getMockStorageForExecTest(context.Background()) - execManager := NewExecutionManager( - repository, getMockExecutionsConfigProvider(), storageClient, workflowengineMocks.NewMockExecutor(), - mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), storageClient, workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) dataResponse, err := execManager.GetExecutionData(context.Background(), admin.WorkflowExecutionGetDataRequest{ Id: &executionIdentifier, }) @@ -2363,9 +2279,7 @@ func TestCreateExecution_LegacyClient(t *testing.T) { Cluster: testCluster, }, nil }) - execManager := NewExecutionManager( - repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockExecutor, - mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockExecutor, mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) response, err := execManager.CreateExecution(context.Background(), *getLegacyExecutionRequest(), requestedAt) assert.Nil(t, err) @@ -2381,9 +2295,7 @@ func TestRelaunchExecution_LegacyModel(t *testing.T) { repository := getMockRepositoryForExecTest() setDefaultLpCallbackForExecTest(repository) storageClient := getMockStorageForExecTest(context.Background()) - execManager := NewExecutionManager( - repository, getMockExecutionsConfigProvider(), storageClient, workflowengineMocks.NewMockExecutor(), - mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), storageClient, workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) startTime := time.Now() startTimeProto, _ := ptypes.TimestampProto(startTime) existingClosure := getLegacyClosure() @@ -2494,9 +2406,7 @@ func TestListExecutions_LegacyModel(t *testing.T) { }, nil } repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetListCallback(executionListFunc) - execManager := NewExecutionManager( - repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), - mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) executionList, err := execManager.ListExecutions(context.Background(), admin.ResourceListRequest{ Id: &admin.NamedEntityIdentifier{ @@ -2698,9 +2608,7 @@ func TestSetDefaults(t *testing.T) { mockConfig := runtimeMocks.NewMockConfigurationProvider( testutils.GetApplicationConfigWithDefaultDomains(), nil, nil, &taskConfig, runtimeMocks.NewMockWhitelistConfiguration(), nil) - execManager := NewExecutionManager( - repositoryMocks.NewMockRepository(), mockConfig, getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), - mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager := NewExecutionManager(repositoryMocks.NewMockRepository(), mockConfig, getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) execManager.(*ExecutionManager).setCompiledTaskDefaults(context.Background(), task, "workflow") assert.True(t, proto.Equal( &core.Container{ @@ -2767,9 +2675,7 @@ func TestSetDefaults_MissingDefaults(t *testing.T) { mockConfig := runtimeMocks.NewMockConfigurationProvider( testutils.GetApplicationConfigWithDefaultDomains(), nil, nil, &taskConfig, runtimeMocks.NewMockWhitelistConfiguration(), nil) - execManager := NewExecutionManager( - repositoryMocks.NewMockRepository(), mockConfig, getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), - mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager := NewExecutionManager(repositoryMocks.NewMockRepository(), mockConfig, getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) execManager.(*ExecutionManager).setCompiledTaskDefaults(context.Background(), task, "workflow") assert.True(t, proto.Equal( &core.Container{ @@ -2952,9 +2858,7 @@ func TestCreateSingleTaskExecution(t *testing.T) { getMockWorkflowConfigProvider(), getMockWorkflowCompiler(), mockStorage, storagePrefix, mockScope.NewTestScope()) namedEntityManager := NewNamedEntityManager(repository, getMockConfigForNETest(), mockScope.NewTestScope()) - execManager := NewExecutionManager( - repository, getMockExecutionsConfigProvider(), mockStorage, workflowengineMocks.NewMockExecutor(), - mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, workflowManager, namedEntityManager) + execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), mockStorage, workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, workflowManager, namedEntityManager, nil) request := admin.ExecutionCreateRequest{ Project: "flytekit", Domain: "production", diff --git a/pkg/manager/impl/node_execution_manager.go b/pkg/manager/impl/node_execution_manager.go index 3e37e9c87..25a2c4f6a 100644 --- a/pkg/manager/impl/node_execution_manager.go +++ b/pkg/manager/impl/node_execution_manager.go @@ -2,6 +2,7 @@ package impl import ( "context" + notificationInterfaces "github.com/lyft/flyteadmin/pkg/async/notifications/interfaces" "strconv" "github.com/lyft/flytestdlib/storage" @@ -46,11 +47,12 @@ type nodeExecutionMetrics struct { } type NodeExecutionManager struct { - db repositories.RepositoryInterface - config runtimeInterfaces.Configuration - storageClient *storage.DataStore - metrics nodeExecutionMetrics - urlData dataInterfaces.RemoteURLInterface + db repositories.RepositoryInterface + config runtimeInterfaces.Configuration + storageClient *storage.DataStore + metrics nodeExecutionMetrics + urlData dataInterfaces.RemoteURLInterface + eventPublisher notificationInterfaces.Publisher } type updateNodeExecutionStatus int @@ -427,9 +429,7 @@ func (m *NodeExecutionManager) GetNodeExecutionData( return response, nil } -func NewNodeExecutionManager( - db repositories.RepositoryInterface, config runtimeInterfaces.Configuration, storageClient *storage.DataStore, - scope promutils.Scope, urlData dataInterfaces.RemoteURLInterface) interfaces.NodeExecutionInterface { +func NewNodeExecutionManager(db repositories.RepositoryInterface, config runtimeInterfaces.Configuration, storageClient *storage.DataStore, scope promutils.Scope, urlData dataInterfaces.RemoteURLInterface, eventPublisher notificationInterfaces.Publisher) interfaces.NodeExecutionInterface { metrics := nodeExecutionMetrics{ Scope: scope, ActiveNodeExecutions: scope.MustNewGauge("active_node_executions", @@ -450,10 +450,11 @@ func NewNodeExecutionManager( "size in bytes of serialized node execution outputs"), } return &NodeExecutionManager{ - db: db, - config: config, - storageClient: storageClient, - metrics: metrics, - urlData: urlData, + db: db, + config: config, + storageClient: storageClient, + metrics: metrics, + urlData: urlData, + eventPublisher: eventPublisher, } } diff --git a/pkg/manager/impl/node_execution_manager_test.go b/pkg/manager/impl/node_execution_manager_test.go index 676baa495..2e71ac406 100644 --- a/pkg/manager/impl/node_execution_manager_test.go +++ b/pkg/manager/impl/node_execution_manager_test.go @@ -134,8 +134,7 @@ func TestCreateNodeEvent(t *testing.T) { }, *input) return nil }) - nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), - getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL) + nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, nil) resp, err := nodeExecManager.CreateNodeEvent(context.Background(), request) assert.Nil(t, err) assert.NotNil(t, resp) @@ -190,8 +189,7 @@ func TestCreateNodeEvent_Update(t *testing.T) { return nil }) - nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), - getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL) + nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, nil) resp, err := nodeExecManager.CreateNodeEvent(context.Background(), request) assert.Nil(t, err) assert.NotNil(t, resp) @@ -204,8 +202,7 @@ func TestCreateNodeEvent_MissingExecution(t *testing.T) { func(ctx context.Context, input interfaces.GetResourceInput) (models.Execution, error) { return models.Execution{}, expectedErr }) - nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), - getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL) + nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, nil) resp, err := nodeExecManager.CreateNodeEvent(context.Background(), request) assert.EqualError(t, err, "failed to get existing execution id: [project:\"project\""+ " domain:\"domain\" name:\"name\" ] with err: expected error") @@ -225,8 +222,7 @@ func TestCreateNodeEvent_CreateDatabaseError(t *testing.T) { func(ctx context.Context, event *models.NodeExecutionEvent, input *models.NodeExecution) error { return expectedErr }) - nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), - getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL) + nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, nil) resp, err := nodeExecManager.CreateNodeEvent(context.Background(), request) assert.EqualError(t, err, expectedErr.Error()) assert.Nil(t, resp) @@ -261,8 +257,7 @@ func TestCreateNodeEvent_UpdateDatabaseError(t *testing.T) { func(ctx context.Context, event *models.NodeExecutionEvent, nodeExecution *models.NodeExecution) error { return expectedErr }) - nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), - getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL) + nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, nil) resp, err := nodeExecManager.CreateNodeEvent(context.Background(), request) assert.EqualError(t, err, expectedErr.Error()) assert.Nil(t, resp) @@ -291,8 +286,7 @@ func TestCreateNodeEvent_UpdateTerminalEventError(t *testing.T) { StartedAt: &occurredAt, }, nil }) - nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), - getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL) + nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, nil) resp, err := nodeExecManager.CreateNodeEvent(context.Background(), request) assert.Nil(t, resp) assert.NotNil(t, err) @@ -327,8 +321,7 @@ func TestCreateNodeEvent_UpdateDuplicateEventError(t *testing.T) { StartedAt: &occurredAt, }, nil }) - nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), - getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL) + nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, nil) resp, err := nodeExecManager.CreateNodeEvent(context.Background(), request) assert.Equal(t, codes.AlreadyExists, err.(flyteAdminErrors.FlyteAdminError).Code()) assert.Nil(t, resp) @@ -341,8 +334,7 @@ func TestCreateNodeEvent_FirstEventIsTerminal(t *testing.T) { func(ctx context.Context, input interfaces.GetNodeExecutionInput) (models.NodeExecution, error) { return models.NodeExecution{}, flyteAdminErrors.NewFlyteAdminError(codes.NotFound, "foo") }) - nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), - getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL) + nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, nil) succeededRequest := admin.NodeExecutionEventRequest{ RequestId: "request id", Event: &event.NodeExecutionEvent{ @@ -403,8 +395,7 @@ func TestGetNodeExecution(t *testing.T) { NodeExecutionMetadata: metadataBytes, }, nil }) - nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), - getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL) + nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, nil) nodeExecution, err := nodeExecManager.GetNodeExecution(context.Background(), admin.NodeExecutionGetRequest{ Id: &nodeExecutionIdentifier, }) @@ -467,8 +458,7 @@ func TestGetNodeExecutionParentNode(t *testing.T) { }, }, nil }) - nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), - getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL) + nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, nil) nodeExecution, err := nodeExecManager.GetNodeExecution(context.Background(), admin.NodeExecutionGetRequest{ Id: &nodeExecutionIdentifier, }) @@ -489,8 +479,7 @@ func TestGetNodeExecution_DatabaseError(t *testing.T) { func(ctx context.Context, input interfaces.GetNodeExecutionInput) (models.NodeExecution, error) { return models.NodeExecution{}, expectedErr }) - nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), - getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL) + nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, nil) nodeExecution, err := nodeExecManager.GetNodeExecution(context.Background(), admin.NodeExecutionGetRequest{ Id: &nodeExecutionIdentifier, }) @@ -517,8 +506,7 @@ func TestGetNodeExecution_TransformerError(t *testing.T) { Closure: []byte("i'm invalid"), }, nil }) - nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), - getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL) + nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, nil) nodeExecution, err := nodeExecManager.GetNodeExecution(context.Background(), admin.NodeExecutionGetRequest{ Id: &nodeExecutionIdentifier, }) @@ -587,8 +575,7 @@ func TestListNodeExecutionsLevelZero(t *testing.T) { }, }, nil }) - nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), - getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL) + nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, nil) nodeExecutions, err := nodeExecManager.ListNodeExecutions(context.Background(), admin.NodeExecutionListRequest{ WorkflowExecutionId: &core.WorkflowExecutionIdentifier{ Project: "project", @@ -687,8 +674,7 @@ func TestListNodeExecutionsWithParent(t *testing.T) { }, }, nil }) - nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), - getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL) + nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, nil) nodeExecutions, err := nodeExecManager.ListNodeExecutions(context.Background(), admin.NodeExecutionListRequest{ WorkflowExecutionId: &core.WorkflowExecutionIdentifier{ Project: "project", @@ -722,8 +708,7 @@ func TestListNodeExecutionsWithParent(t *testing.T) { } func TestListNodeExecutions_InvalidParams(t *testing.T) { - nodeExecManager := NewNodeExecutionManager(nil, getMockExecutionsConfigProvider(), - getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL) + nodeExecManager := NewNodeExecutionManager(nil, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, nil) _, err := nodeExecManager.ListNodeExecutions(context.Background(), admin.NodeExecutionListRequest{ Filters: "eq(execution.project, project)", }) @@ -749,8 +734,7 @@ func TestListNodeExecutions_DatabaseError(t *testing.T) { interfaces.NodeExecutionCollectionOutput, error) { return interfaces.NodeExecutionCollectionOutput{}, expectedErr }) - nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), - getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL) + nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, nil) nodeExecutions, err := nodeExecManager.ListNodeExecutions(context.Background(), admin.NodeExecutionListRequest{ WorkflowExecutionId: &core.WorkflowExecutionIdentifier{ Project: "project", @@ -788,8 +772,7 @@ func TestListNodeExecutions_TransformerError(t *testing.T) { }, }, nil }) - nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), - getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL) + nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, nil) nodeExecutions, err := nodeExecManager.ListNodeExecutions(context.Background(), admin.NodeExecutionListRequest{ WorkflowExecutionId: &core.WorkflowExecutionIdentifier{ Project: "project", @@ -817,8 +800,7 @@ func TestListNodeExecutions_NothingToReturn(t *testing.T) { listExecutionsCalled = true return interfaces.ExecutionCollectionOutput{}, nil }) - nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), - getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL) + nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, nil) _, err := nodeExecManager.ListNodeExecutions(context.Background(), admin.NodeExecutionListRequest{ WorkflowExecutionId: &core.WorkflowExecutionIdentifier{ Project: "project", @@ -915,8 +897,7 @@ func TestListNodeExecutionsForTask(t *testing.T) { }, }, nil }) - nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), - getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL) + nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, nil) nodeExecutions, err := nodeExecManager.ListNodeExecutionsForTask(context.Background(), admin.NodeExecutionForTaskListRequest{ TaskExecutionId: &core.TaskExecutionIdentifier{ NodeExecutionId: &core.NodeExecutionIdentifier{ @@ -1041,8 +1022,7 @@ func TestGetNodeExecutionData(t *testing.T) { } return fmt.Errorf("unexpected call to find value in storage [%v]", reference.String()) } - nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), mockStorage, - mockScope.NewTestScope(), mockNodeExecutionRemoteURL) + nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), mockStorage, mockScope.NewTestScope(), mockNodeExecutionRemoteURL, nil) dataResponse, err := nodeExecManager.GetNodeExecutionData(context.Background(), admin.NodeExecutionGetDataRequest{ Id: &nodeExecutionIdentifier, }) diff --git a/pkg/rpc/adminservice/base.go b/pkg/rpc/adminservice/base.go index 89fce681b..9c7923ce2 100644 --- a/pkg/rpc/adminservice/base.go +++ b/pkg/rpc/adminservice/base.go @@ -132,10 +132,7 @@ func NewAdminServer(kubeConfig, master string) *AdminService { db, configuration, workflowengine.NewCompiler(), dataStorageClient, applicationConfiguration.MetadataStoragePrefix, adminScope.NewSubScope("workflow_manager")) namedEntityManager := manager.NewNamedEntityManager(db, configuration, adminScope.NewSubScope("named_entity_manager")) - executionManager := manager.NewExecutionManager( - db, configuration, dataStorageClient, workflowExecutor, adminScope.NewSubScope("execution_manager"), - adminScope.NewSubScope("user_execution_metrics"), publisher, urlData, workflowManager, - namedEntityManager) + executionManager := manager.NewExecutionManager(db, configuration, dataStorageClient, workflowExecutor, adminScope.NewSubScope("execution_manager"), adminScope.NewSubScope("user_execution_metrics"), publisher, urlData, workflowManager, namedEntityManager, eventPublisher) scheduledWorkflowExecutor := workflowScheduler.GetWorkflowExecutor(executionManager, launchPlanManager) logger.Info(context.Background(), "Successfully initialized a new scheduled workflow executor") @@ -161,8 +158,8 @@ func NewAdminServer(kubeConfig, master string) *AdminService { LaunchPlanManager: launchPlanManager, ExecutionManager: executionManager, NamedEntityManager: namedEntityManager, - NodeExecutionManager: manager.NewNodeExecutionManager( - db, configuration, dataStorageClient, adminScope.NewSubScope("node_execution_manager"), urlData), + NodeExecutionManager: manager.NewNodeExecutionManager(db, configuration, dataStorageClient, + adminScope.NewSubScope("node_execution_manager"), urlData, eventPublisher), TaskExecutionManager: manager.NewTaskExecutionManager(db, configuration, dataStorageClient, adminScope.NewSubScope("task_execution_manager"), urlData, eventPublisher), ProjectManager: manager.NewProjectManager(db, configuration), diff --git a/pkg/runtime/interfaces/application_configuration.go b/pkg/runtime/interfaces/application_configuration.go index 9b0d49480..359c5edaa 100644 --- a/pkg/runtime/interfaces/application_configuration.go +++ b/pkg/runtime/interfaces/application_configuration.go @@ -154,6 +154,10 @@ type EventsPublisherConfig struct { EventTypes string `json:"eventTypes"` } +type ExternalEvent struct { + EventPublisherConfig EventsPublisherConfig `json:"eventPublisher"` +} + // Configuration specific to notifications handling type NotificationsConfig struct { // Defines the cloud provider that backs the scheduler. In the absence of a specification the no-op, 'local' @@ -171,7 +175,7 @@ type NotificationsConfig struct { // Specifies the time interval to wait before attempting to reconnect the notifications processor client. ReconnectDelaySeconds int `json:"reconnectDelaySeconds"` // Publish events to a pubsub tops - EventPublisherConfig EventsPublisherConfig `json:"eventPublisher"` + ExternalEvent ExternalEvent `json:"externalEvent"` } // Domains are always globally set in the application config, whereas individual projects can be individually registered. From 3e44470919c949366e6fcabcaf94ddc22631ca90 Mon Sep 17 00:00:00 2001 From: tnsetting Date: Tue, 2 Feb 2021 15:27:10 +0100 Subject: [PATCH 3/9] Fix test and lint --- .../notifications/implementations/event_publisher.go | 3 ++- .../implementations/event_publisher_test.go | 5 +++-- pkg/manager/impl/execution_manager.go | 8 +++++++- pkg/manager/impl/execution_manager_test.go | 4 ++-- pkg/manager/impl/node_execution_manager.go | 12 +++++++++++- pkg/manager/impl/task_execution_manager.go | 11 +++++++++-- pkg/manager/impl/task_execution_manager_test.go | 4 ++-- 7 files changed, 36 insertions(+), 11 deletions(-) diff --git a/pkg/async/notifications/implementations/event_publisher.go b/pkg/async/notifications/implementations/event_publisher.go index 9099b187d..747a4912f 100644 --- a/pkg/async/notifications/implementations/event_publisher.go +++ b/pkg/async/notifications/implementations/event_publisher.go @@ -2,9 +2,10 @@ package implementations import ( "context" - "github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin" "strings" + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin" + "github.com/lyft/flyteadmin/pkg/async/notifications/interfaces" "github.com/NYTimes/gizmo/pubsub" diff --git a/pkg/async/notifications/implementations/event_publisher_test.go b/pkg/async/notifications/implementations/event_publisher_test.go index c1f4142fe..3f23ab677 100644 --- a/pkg/async/notifications/implementations/event_publisher_test.go +++ b/pkg/async/notifications/implementations/event_publisher_test.go @@ -3,12 +3,13 @@ package implementations import ( "context" "errors" + "testing" + "time" + "github.com/golang/protobuf/ptypes" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/event" - "testing" - "time" "github.com/NYTimes/gizmo/pubsub" "github.com/NYTimes/gizmo/pubsub/pubsubtest" diff --git a/pkg/manager/impl/execution_manager.go b/pkg/manager/impl/execution_manager.go index 7bc6b1477..1453fb88c 100644 --- a/pkg/manager/impl/execution_manager.go +++ b/pkg/manager/impl/execution_manager.go @@ -64,6 +64,7 @@ type executionSystemMetrics struct { SpecSizeBytes prometheus.Summary ClosureSizeBytes prometheus.Summary AcceptanceDelay prometheus.Summary + PublishEventError prometheus.Counter } type executionUserMetrics struct { @@ -1005,7 +1006,10 @@ func (m *ExecutionManager) CreateWorkflowEvent(ctx context.Context, request admi return nil, err } } - m.eventPublisher.Publish(ctx, proto.MessageName(&request), &request) + if err := m.eventPublisher.Publish(ctx, proto.MessageName(&request), &request); err != nil { + m.systemMetrics.PublishEventError.Inc() + logger.Infof(ctx, "error publishing event [%+v] with err: [%v]", request.RequestId, err) + } m.systemMetrics.ExecutionEventsCreated.Inc() return &admin.WorkflowExecutionEventResponse{}, nil @@ -1328,6 +1332,8 @@ func newExecutionSystemMetrics(scope promutils.Scope) executionSystemMetrics { ClosureSizeBytes: scope.MustNewSummary("closure_size_bytes", "size in bytes of serialized execution closure"), AcceptanceDelay: scope.MustNewSummary("acceptance_delay", "delay in seconds from when an execution was requested to be created and when it actually was"), + PublishEventError: scope.MustNewCounter("publish_event_error", + "overall count of publish event errors when invoking publish()"), } } diff --git a/pkg/manager/impl/execution_manager_test.go b/pkg/manager/impl/execution_manager_test.go index 4f25f9e23..605fcce0a 100644 --- a/pkg/manager/impl/execution_manager_test.go +++ b/pkg/manager/impl/execution_manager_test.go @@ -984,7 +984,7 @@ func TestCreateWorkflowEvent(t *testing.T) { return nil } repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateCallback(updateExecutionFunc) - execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) + execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, &mockPublisher) resp, err := execManager.CreateWorkflowEvent(context.Background(), admin.WorkflowExecutionEventRequest{ RequestId: "1", Event: &event.WorkflowExecutionEvent{ @@ -1077,7 +1077,7 @@ func TestCreateWorkflowEvent_StartedRunning(t *testing.T) { return nil } repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateCallback(updateExecutionFunc) - execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) + execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, &mockPublisher) occurredAtTimestamp, _ := ptypes.TimestampProto(occurredAt) resp, err := execManager.CreateWorkflowEvent(context.Background(), admin.WorkflowExecutionEventRequest{ RequestId: "1", diff --git a/pkg/manager/impl/node_execution_manager.go b/pkg/manager/impl/node_execution_manager.go index 25a2c4f6a..af9f51d07 100644 --- a/pkg/manager/impl/node_execution_manager.go +++ b/pkg/manager/impl/node_execution_manager.go @@ -2,9 +2,11 @@ package impl import ( "context" - notificationInterfaces "github.com/lyft/flyteadmin/pkg/async/notifications/interfaces" "strconv" + "github.com/gogo/protobuf/proto" + notificationInterfaces "github.com/lyft/flyteadmin/pkg/async/notifications/interfaces" + "github.com/lyft/flytestdlib/storage" "github.com/lyft/flytestdlib/contextutils" @@ -44,6 +46,7 @@ type nodeExecutionMetrics struct { ClosureSizeBytes prometheus.Summary NodeExecutionInputBytes prometheus.Summary NodeExecutionOutputBytes prometheus.Summary + PublishEventError prometheus.Counter } type NodeExecutionManager struct { @@ -234,6 +237,11 @@ func (m *NodeExecutionManager) CreateNodeEvent(ctx context.Context, request admi } m.metrics.NodeExecutionEventsCreated.Inc() + if err := m.eventPublisher.Publish(ctx, proto.MessageName(&request), &request); err != nil { + m.metrics.PublishEventError.Inc() + logger.Infof(ctx, "error publishing event [%+v] with err: [%v]", request.RequestId, err) + } + return &admin.NodeExecutionEventResponse{}, nil } @@ -448,6 +456,8 @@ func NewNodeExecutionManager(db repositories.RepositoryInterface, config runtime "size in bytes of serialized node execution inputs"), NodeExecutionOutputBytes: scope.MustNewSummary("output_size_bytes", "size in bytes of serialized node execution outputs"), + PublishEventError: scope.MustNewCounter("publish_event_error", + "overall count of publish event errors when invoking publish()"), } return &NodeExecutionManager{ db: db, diff --git a/pkg/manager/impl/task_execution_manager.go b/pkg/manager/impl/task_execution_manager.go index 13cfb4155..4a44790fe 100644 --- a/pkg/manager/impl/task_execution_manager.go +++ b/pkg/manager/impl/task_execution_manager.go @@ -3,9 +3,10 @@ package impl import ( "context" "fmt" + "strconv" + "github.com/golang/protobuf/proto" notificationInterfaces "github.com/lyft/flyteadmin/pkg/async/notifications/interfaces" - "strconv" "github.com/lyft/flytestdlib/storage" @@ -42,6 +43,7 @@ type taskExecutionMetrics struct { ClosureSizeBytes prometheus.Summary TaskExecutionInputBytes prometheus.Summary TaskExecutionOutputBytes prometheus.Summary + PublishEventError prometheus.Counter } type TaskExecutionManager struct { @@ -176,7 +178,10 @@ func (m *TaskExecutionManager) CreateTaskExecutionEvent(ctx context.Context, req m.metrics.TaskExecutionsTerminated.Inc() } - m.notificationClient.Publish(ctx, proto.MessageName(&request), &request) + if err = m.notificationClient.Publish(ctx, proto.MessageName(&request), &request); err != nil { + m.metrics.PublishEventError.Inc() + logger.Infof(ctx, "error publishing event [%+v] with err: [%v]", request.RequestId, err) + } m.metrics.TaskExecutionEventsCreated.Inc() logger.Debugf(ctx, "Successfully recorded task execution event [%v]", request.Event) @@ -336,6 +341,8 @@ func NewTaskExecutionManager(db repositories.RepositoryInterface, config runtime "size in bytes of serialized node execution inputs"), TaskExecutionOutputBytes: scope.MustNewSummary("output_size_bytes", "size in bytes of serialized node execution outputs"), + PublishEventError: scope.MustNewCounter("publish_event_error", + "overall count of publish event errors when invoking publish()"), } return &TaskExecutionManager{ db: db, diff --git a/pkg/manager/impl/task_execution_manager_test.go b/pkg/manager/impl/task_execution_manager_test.go index ea3b92b1d..e7af57a34 100644 --- a/pkg/manager/impl/task_execution_manager_test.go +++ b/pkg/manager/impl/task_execution_manager_test.go @@ -288,7 +288,7 @@ func TestCreateTaskEvent_Update(t *testing.T) { OutputUri: expectedOutputResult.OutputUri, } - taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL, nil) + taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL, &mockPublisher) resp, err := taskExecManager.CreateTaskExecutionEvent(context.Background(), taskEventRequest) assert.True(t, getTaskCalled) assert.True(t, updateTaskCalled) @@ -463,7 +463,7 @@ func TestCreateTaskEvent_PhaseVersionChange(t *testing.T) { taskEventRequest.Event.PhaseVersion = uint32(1) taskEventRequest.Event.OccurredAt = taskEventUpdatedAtProto - taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL, nil) + taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL, &mockPublisher) resp, err := taskExecManager.CreateTaskExecutionEvent(context.Background(), taskEventRequest) assert.True(t, getTaskCalled) assert.True(t, updateTaskCalled) From 33ed0609dd096f271bc3f3eee78b0c36d15fffd6 Mon Sep 17 00:00:00 2001 From: tnsetting Date: Tue, 2 Feb 2021 15:36:45 +0100 Subject: [PATCH 4/9] More fixes for testcases --- pkg/manager/impl/execution_manager_test.go | 2 +- pkg/manager/impl/node_execution_manager_test.go | 11 +++++++---- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/pkg/manager/impl/execution_manager_test.go b/pkg/manager/impl/execution_manager_test.go index 605fcce0a..1e5dba15e 100644 --- a/pkg/manager/impl/execution_manager_test.go +++ b/pkg/manager/impl/execution_manager_test.go @@ -256,7 +256,7 @@ func TestCreateExecution(t *testing.T) { mockConfig := getMockExecutionsConfigProvider() mockConfig.(*runtimeMocks.MockConfigurationProvider).AddQualityOfServiceConfiguration(qosProvider) - execManager := NewExecutionManager(repository, mockConfig, getMockStorageForExecTest(context.Background()), mockExecutor, mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil) + execManager := NewExecutionManager(repository, mockConfig, getMockStorageForExecTest(context.Background()), mockExecutor, mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, &mockPublisher) request := testutils.GetExecutionRequest() request.Spec.Metadata = &admin.ExecutionMetadata{ Principal: "unused - populated from authenticated context", diff --git a/pkg/manager/impl/node_execution_manager_test.go b/pkg/manager/impl/node_execution_manager_test.go index 2e71ac406..aa0306ae0 100644 --- a/pkg/manager/impl/node_execution_manager_test.go +++ b/pkg/manager/impl/node_execution_manager_test.go @@ -134,7 +134,9 @@ func TestCreateNodeEvent(t *testing.T) { }, *input) return nil }) - nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, nil) + nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), + getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, + &mockPublisher) resp, err := nodeExecManager.CreateNodeEvent(context.Background(), request) assert.Nil(t, err) assert.NotNil(t, resp) @@ -189,7 +191,8 @@ func TestCreateNodeEvent_Update(t *testing.T) { return nil }) - nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, nil) + nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), + getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, &mockPublisher) resp, err := nodeExecManager.CreateNodeEvent(context.Background(), request) assert.Nil(t, err) assert.NotNil(t, resp) @@ -202,7 +205,7 @@ func TestCreateNodeEvent_MissingExecution(t *testing.T) { func(ctx context.Context, input interfaces.GetResourceInput) (models.Execution, error) { return models.Execution{}, expectedErr }) - nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, nil) + nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, &mockPublisher) resp, err := nodeExecManager.CreateNodeEvent(context.Background(), request) assert.EqualError(t, err, "failed to get existing execution id: [project:\"project\""+ " domain:\"domain\" name:\"name\" ] with err: expected error") @@ -334,7 +337,7 @@ func TestCreateNodeEvent_FirstEventIsTerminal(t *testing.T) { func(ctx context.Context, input interfaces.GetNodeExecutionInput) (models.NodeExecution, error) { return models.NodeExecution{}, flyteAdminErrors.NewFlyteAdminError(codes.NotFound, "foo") }) - nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, nil) + nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, &mockPublisher) succeededRequest := admin.NodeExecutionEventRequest{ RequestId: "request id", Event: &event.NodeExecutionEvent{ From 7c2a395a5701bab6b51b814282da5dab50c54655 Mon Sep 17 00:00:00 2001 From: tnsetting Date: Fri, 5 Feb 2021 16:18:25 +0100 Subject: [PATCH 5/9] Address comments and use new config section --- pkg/async/notifications/factory.go | 18 +++--- .../implementations/event_publisher.go | 56 +++++++++---------- pkg/rpc/adminservice/base.go | 2 +- pkg/runtime/application_config_provider.go | 7 +++ .../interfaces/application_configuration.go | 18 ++++-- .../mocks/mock_application_provider.go | 21 +++++-- 6 files changed, 72 insertions(+), 50 deletions(-) diff --git a/pkg/async/notifications/factory.go b/pkg/async/notifications/factory.go index 18252aa0d..e8fe6463d 100644 --- a/pkg/async/notifications/factory.go +++ b/pkg/async/notifications/factory.go @@ -149,8 +149,8 @@ func NewNotificationsPublisher(config runtimeInterfaces.NotificationsConfig, sco } } -func NewEventsPublisher(config runtimeInterfaces.NotificationsConfig, scope promutils.Scope) interfaces.Publisher { - if config.ExternalEvent.EventPublisherConfig.TopicName == "" { +func NewEventsPublisher(config runtimeInterfaces.ExternalEventsConfig, scope promutils.Scope) interfaces.Publisher { + if config.Enable == false { return implementations.NewNoopPublish() } reconnectAttempts := config.ReconnectAttempts @@ -158,13 +158,9 @@ func NewEventsPublisher(config runtimeInterfaces.NotificationsConfig, scope prom switch config.Type { case common.AWS: snsConfig := gizmoAWS.SNSConfig{ - Topic: config.ExternalEvent.EventPublisherConfig.TopicName, - } - if config.AWSConfig.Region != "" { - snsConfig.Region = config.AWSConfig.Region - } else { - snsConfig.Region = config.Region + Topic: config.EventsPublisherConfig.TopicName, } + snsConfig.Region = config.AWSConfig.Region var publisher pubsub.Publisher var err error @@ -177,10 +173,10 @@ func NewEventsPublisher(config runtimeInterfaces.NotificationsConfig, scope prom if err != nil { panic(err) } - return implementations.NewEventsPublisher(publisher, scope, config.ExternalEvent.EventPublisherConfig.EventTypes) + return implementations.NewEventsPublisher(publisher, scope, config.EventsPublisherConfig.EventTypes) case common.GCP: pubsubConfig := gizmoGCP.Config{ - Topic: config.ExternalEvent.EventPublisherConfig.TopicName, + Topic: config.EventsPublisherConfig.TopicName, } pubsubConfig.ProjectID = config.GCPConfig.ProjectID var publisher pubsub.MultiPublisher @@ -193,7 +189,7 @@ func NewEventsPublisher(config runtimeInterfaces.NotificationsConfig, scope prom if err != nil { panic(err) } - return implementations.NewEventsPublisher(publisher, scope, config.ExternalEvent.EventPublisherConfig.EventTypes) + return implementations.NewEventsPublisher(publisher, scope, config.EventsPublisherConfig.EventTypes) case common.Local: fallthrough default: diff --git a/pkg/async/notifications/implementations/event_publisher.go b/pkg/async/notifications/implementations/event_publisher.go index 747a4912f..51ddcaed2 100644 --- a/pkg/async/notifications/implementations/event_publisher.go +++ b/pkg/async/notifications/implementations/event_publisher.go @@ -2,6 +2,7 @@ package implementations import ( "context" + "k8s.io/apimachinery/pkg/util/sets" "strings" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin" @@ -16,33 +17,33 @@ import ( ) type eventPublisherSystemMetrics struct { - Scope promutils.Scope - PublishTotal prometheus.Counter - PublishError prometheus.Counter + Scope promutils.Scope + PublishTotal prometheus.Counter + PublishSuccess prometheus.Counter + PublishError prometheus.Counter } // TODO: Add a counter that encompasses the publisher stats grouped by project and domain. type EventPublisher struct { pub pubsub.Publisher systemMetrics eventPublisherSystemMetrics - events []string + events sets.String } -func getSupportedEvents() map[string]string { - supportedEvents := make(map[string]string) - var taskExecutionReq admin.TaskExecutionEventRequest - supportedEvents["task"] = proto.MessageName(&taskExecutionReq) - var nodeExecutionReq admin.NodeExecutionEventRequest - supportedEvents["node"] = proto.MessageName(&nodeExecutionReq) - var workflowExecutionReq admin.WorkflowExecutionEventRequest - supportedEvents["workflow"] = proto.MessageName(&workflowExecutionReq) - return supportedEvents +var taskExecutionReq admin.TaskExecutionEventRequest +var nodeExecutionReq admin.NodeExecutionEventRequest +var workflowExecutionReq admin.WorkflowExecutionEventRequest + +var supportedEvents = map[string]string{ + "task": proto.MessageName(&taskExecutionReq), + "node": proto.MessageName(&nodeExecutionReq), + "workflow": proto.MessageName(&workflowExecutionReq), } // The key is the notification type as defined as an enum. func (p *EventPublisher) Publish(ctx context.Context, notificationType string, msg proto.Message) error { p.systemMetrics.PublishTotal.Inc() - logger.Debugf(ctx, "Publishing the following message [%s]", msg.String()) + logger.Debugf(ctx, "Publishing the following message [%+v]", msg) if !p.shouldPublishEvent(notificationType) { return nil @@ -52,40 +53,39 @@ func (p *EventPublisher) Publish(ctx context.Context, notificationType string, m if err != nil { p.systemMetrics.PublishError.Inc() logger.Errorf(ctx, "Failed to publish a message with key [%s] and message [%s] and error: %v", notificationType, msg.String(), err) + } else { + p.systemMetrics.PublishSuccess.Inc() } return err } func (p *EventPublisher) shouldPublishEvent(notificationType string) bool { - for _, e := range p.events { - if e == notificationType { - return true - } - } - return false + return p.events.Has(notificationType) } func newEventPublisherSystemMetrics(scope promutils.Scope) eventPublisherSystemMetrics { return eventPublisherSystemMetrics{ - Scope: scope, - PublishTotal: scope.MustNewCounter("event_publish_total", "overall count of publish messages"), - PublishError: scope.MustNewCounter("event_publish_errors", "count of publish errors"), + Scope: scope, + PublishTotal: scope.MustNewCounter("event_publish_total", "overall count of publish messages"), + PublishSuccess: scope.MustNewCounter("event_publish_success", "sucess count of publish messages"), + PublishError: scope.MustNewCounter("event_publish_errors", "count of publish errors"), } } func NewEventsPublisher(pub pubsub.Publisher, scope promutils.Scope, eventTypes string) interfaces.Publisher { - supportedEvents := getSupportedEvents() - var eventList = make([]string, 0) + eventSet := sets.NewString() if strings.Contains(eventTypes, "*") || strings.Contains(eventTypes, "all") { for _, e := range supportedEvents { - eventList = append(eventList, e) + eventSet = eventSet.Insert(e) } } else { events := strings.Split(eventTypes, ",") for _, event := range events { if e, found := supportedEvents[event]; found { - eventList = append(eventList, e) + eventSet = eventSet.Insert(e) + } else { + logger.Errorf(context.Background(), "Unsupported event type [%s] in the config") } } } @@ -93,6 +93,6 @@ func NewEventsPublisher(pub pubsub.Publisher, scope promutils.Scope, eventTypes return &EventPublisher{ pub: pub, systemMetrics: newEventPublisherSystemMetrics(scope.NewSubScope("events_publisher")), - events: eventList, + events: eventSet, } } diff --git a/pkg/rpc/adminservice/base.go b/pkg/rpc/adminservice/base.go index 9c7923ce2..806970fa4 100644 --- a/pkg/rpc/adminservice/base.go +++ b/pkg/rpc/adminservice/base.go @@ -99,7 +99,7 @@ func NewAdminServer(kubeConfig, master string) *AdminService { publisher := notifications.NewNotificationsPublisher(*configuration.ApplicationConfiguration().GetNotificationsConfig(), adminScope) processor := notifications.NewNotificationsProcessor(*configuration.ApplicationConfiguration().GetNotificationsConfig(), adminScope) - eventPublisher := notifications.NewEventsPublisher(*configuration.ApplicationConfiguration().GetNotificationsConfig(), adminScope) + eventPublisher := notifications.NewEventsPublisher(*configuration.ApplicationConfiguration().GetExternalEventsConfig(), adminScope) go func() { logger.Info(context.Background(), "Started processing notifications.") processor.StartProcessing() diff --git a/pkg/runtime/application_config_provider.go b/pkg/runtime/application_config_provider.go index 58032b67e..1ad15c8ef 100644 --- a/pkg/runtime/application_config_provider.go +++ b/pkg/runtime/application_config_provider.go @@ -16,6 +16,7 @@ const scheduler = "scheduler" const remoteData = "remoteData" const notifications = "notifications" const domains = "domains" +const externalEvents = "externalEvents" var databaseConfig = config.MustRegisterSection(database, &interfaces.DbConfigSection{}) var flyteAdminConfig = config.MustRegisterSection(flyteAdmin, &interfaces.ApplicationConfig{}) @@ -23,6 +24,7 @@ var schedulerConfig = config.MustRegisterSection(scheduler, &interfaces.Schedule var remoteDataConfig = config.MustRegisterSection(remoteData, &interfaces.RemoteDataConfig{}) var notificationsConfig = config.MustRegisterSection(notifications, &interfaces.NotificationsConfig{}) var domainsConfig = config.MustRegisterSection(domains, &interfaces.DomainsConfig{}) +var externalEventsConfig = config.MustRegisterSection(externalEvents, &interfaces.ExternalEventsConfig{}) // Implementation of an interfaces.ApplicationConfiguration type ApplicationConfigurationProvider struct{} @@ -72,6 +74,11 @@ func (p *ApplicationConfigurationProvider) GetNotificationsConfig() *interfaces. func (p *ApplicationConfigurationProvider) GetDomainsConfig() *interfaces.DomainsConfig { return domainsConfig.GetConfig().(*interfaces.DomainsConfig) } + +func (p *ApplicationConfigurationProvider) GetExternalEventsConfig() *interfaces.ExternalEventsConfig { + return externalEventsConfig.GetConfig().(*interfaces.ExternalEventsConfig) +} + func NewApplicationConfigurationProvider() interfaces.ApplicationConfiguration { return &ApplicationConfigurationProvider{} } diff --git a/pkg/runtime/interfaces/application_configuration.go b/pkg/runtime/interfaces/application_configuration.go index 359c5edaa..882426288 100644 --- a/pkg/runtime/interfaces/application_configuration.go +++ b/pkg/runtime/interfaces/application_configuration.go @@ -154,8 +154,19 @@ type EventsPublisherConfig struct { EventTypes string `json:"eventTypes"` } -type ExternalEvent struct { - EventPublisherConfig EventsPublisherConfig `json:"eventPublisher"` +type ExternalEventsConfig struct { + Enable bool `json:"enable"` + // Defines the cloud provider that backs the scheduler. In the absence of a specification the no-op, 'local' + // scheme is used. + Type string `json:"type"` + AWSConfig AWSConfig `json:"aws"` + GCPConfig GCPConfig `json:"gcp"` + // Publish events to a pubsub tops + EventsPublisherConfig EventsPublisherConfig `json:"eventsPublisher"` + // Number of times to attempt recreating a notifications processor client should there be any disruptions. + ReconnectAttempts int `json:"reconnectAttempts"` + // Specifies the time interval to wait before attempting to reconnect the notifications processor client. + ReconnectDelaySeconds int `json:"reconnectDelaySeconds"` } // Configuration specific to notifications handling @@ -174,8 +185,6 @@ type NotificationsConfig struct { ReconnectAttempts int `json:"reconnectAttempts"` // Specifies the time interval to wait before attempting to reconnect the notifications processor client. ReconnectDelaySeconds int `json:"reconnectDelaySeconds"` - // Publish events to a pubsub tops - ExternalEvent ExternalEvent `json:"externalEvent"` } // Domains are always globally set in the application config, whereas individual projects can be individually registered. @@ -196,4 +205,5 @@ type ApplicationConfiguration interface { GetRemoteDataConfig() *RemoteDataConfig GetNotificationsConfig() *NotificationsConfig GetDomainsConfig() *DomainsConfig + GetExternalEventsConfig() *ExternalEventsConfig } diff --git a/pkg/runtime/mocks/mock_application_provider.go b/pkg/runtime/mocks/mock_application_provider.go index 0d9973dbc..33ac9a9b9 100644 --- a/pkg/runtime/mocks/mock_application_provider.go +++ b/pkg/runtime/mocks/mock_application_provider.go @@ -5,12 +5,13 @@ import ( ) type MockApplicationProvider struct { - dbConfig interfaces.DbConfig - topLevelConfig interfaces.ApplicationConfig - schedulerConfig interfaces.SchedulerConfig - remoteDataConfig interfaces.RemoteDataConfig - notificationsConfig interfaces.NotificationsConfig - domainsConfig interfaces.DomainsConfig + dbConfig interfaces.DbConfig + topLevelConfig interfaces.ApplicationConfig + schedulerConfig interfaces.SchedulerConfig + remoteDataConfig interfaces.RemoteDataConfig + notificationsConfig interfaces.NotificationsConfig + domainsConfig interfaces.DomainsConfig + externalEventsConfig interfaces.ExternalEventsConfig } func (p *MockApplicationProvider) GetDbConfig() interfaces.DbConfig { @@ -60,3 +61,11 @@ func (p *MockApplicationProvider) GetDomainsConfig() *interfaces.DomainsConfig { func (p *MockApplicationProvider) SetDomainsConfig(domainsConfig interfaces.DomainsConfig) { p.domainsConfig = domainsConfig } + +func (p *MockApplicationProvider) SetExternalEventsConfig(externalEventsConfig interfaces.ExternalEventsConfig) { + p.externalEventsConfig = externalEventsConfig +} + +func (p *MockApplicationProvider) GetExternalEventsConfig() *interfaces.ExternalEventsConfig { + return &p.externalEventsConfig +} From 3b170a18984b8f97654a42baba56e6b5f55c9104 Mon Sep 17 00:00:00 2001 From: tnsetting Date: Fri, 5 Feb 2021 16:25:37 +0100 Subject: [PATCH 6/9] Fix goimports --- pkg/async/notifications/factory.go | 2 +- pkg/async/notifications/implementations/event_publisher.go | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/async/notifications/factory.go b/pkg/async/notifications/factory.go index e8fe6463d..2293ff926 100644 --- a/pkg/async/notifications/factory.go +++ b/pkg/async/notifications/factory.go @@ -150,7 +150,7 @@ func NewNotificationsPublisher(config runtimeInterfaces.NotificationsConfig, sco } func NewEventsPublisher(config runtimeInterfaces.ExternalEventsConfig, scope promutils.Scope) interfaces.Publisher { - if config.Enable == false { + if !config.Enable { return implementations.NewNoopPublish() } reconnectAttempts := config.ReconnectAttempts diff --git a/pkg/async/notifications/implementations/event_publisher.go b/pkg/async/notifications/implementations/event_publisher.go index 51ddcaed2..960ef2b82 100644 --- a/pkg/async/notifications/implementations/event_publisher.go +++ b/pkg/async/notifications/implementations/event_publisher.go @@ -2,9 +2,10 @@ package implementations import ( "context" - "k8s.io/apimachinery/pkg/util/sets" "strings" + "k8s.io/apimachinery/pkg/util/sets" + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin" "github.com/lyft/flyteadmin/pkg/async/notifications/interfaces" @@ -67,7 +68,7 @@ func newEventPublisherSystemMetrics(scope promutils.Scope) eventPublisherSystemM return eventPublisherSystemMetrics{ Scope: scope, PublishTotal: scope.MustNewCounter("event_publish_total", "overall count of publish messages"), - PublishSuccess: scope.MustNewCounter("event_publish_success", "sucess count of publish messages"), + PublishSuccess: scope.MustNewCounter("event_publish_success", "success count of publish messages"), PublishError: scope.MustNewCounter("event_publish_errors", "count of publish errors"), } } From 9224c6c5a2fd4233ad46cc0a5473764b753a19e4 Mon Sep 17 00:00:00 2001 From: tnsetting Date: Fri, 5 Feb 2021 22:25:43 +0100 Subject: [PATCH 7/9] just for trigger the build --- pkg/async/notifications/implementations/event_publisher.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/async/notifications/implementations/event_publisher.go b/pkg/async/notifications/implementations/event_publisher.go index 960ef2b82..10bbc3ea2 100644 --- a/pkg/async/notifications/implementations/event_publisher.go +++ b/pkg/async/notifications/implementations/event_publisher.go @@ -74,7 +74,6 @@ func newEventPublisherSystemMetrics(scope promutils.Scope) eventPublisherSystemM } func NewEventsPublisher(pub pubsub.Publisher, scope promutils.Scope, eventTypes string) interfaces.Publisher { - eventSet := sets.NewString() if strings.Contains(eventTypes, "*") || strings.Contains(eventTypes, "all") { for _, e := range supportedEvents { From 1f91d61d2e5b6fe96c7b095647613e7cdab18c46 Mon Sep 17 00:00:00 2001 From: tnsetting Date: Sat, 6 Feb 2021 22:13:31 +0100 Subject: [PATCH 8/9] more comments fix --- .../implementations/event_publisher.go | 39 +++++++++++-------- .../implementations/event_publisher_test.go | 26 ++++++------- pkg/manager/impl/node_execution_manager.go | 2 +- .../interfaces/application_configuration.go | 2 +- 4 files changed, 38 insertions(+), 31 deletions(-) diff --git a/pkg/async/notifications/implementations/event_publisher.go b/pkg/async/notifications/implementations/event_publisher.go index 10bbc3ea2..9f306af8e 100644 --- a/pkg/async/notifications/implementations/event_publisher.go +++ b/pkg/async/notifications/implementations/event_publisher.go @@ -2,7 +2,6 @@ package implementations import ( "context" - "strings" "k8s.io/apimachinery/pkg/util/sets" @@ -35,20 +34,28 @@ var taskExecutionReq admin.TaskExecutionEventRequest var nodeExecutionReq admin.NodeExecutionEventRequest var workflowExecutionReq admin.WorkflowExecutionEventRequest +const ( + Task = "task" + Node = "node" + Workflow = "workflow" + AllTypes = "all" + AllTypesShort = "*" +) + var supportedEvents = map[string]string{ - "task": proto.MessageName(&taskExecutionReq), - "node": proto.MessageName(&nodeExecutionReq), - "workflow": proto.MessageName(&workflowExecutionReq), + Task: proto.MessageName(&taskExecutionReq), + Node: proto.MessageName(&nodeExecutionReq), + Workflow: proto.MessageName(&workflowExecutionReq), } // The key is the notification type as defined as an enum. func (p *EventPublisher) Publish(ctx context.Context, notificationType string, msg proto.Message) error { p.systemMetrics.PublishTotal.Inc() - logger.Debugf(ctx, "Publishing the following message [%+v]", msg) if !p.shouldPublishEvent(notificationType) { return nil } + logger.Debugf(ctx, "Publishing the following message [%+v]", msg) err := p.pub.Publish(ctx, notificationType, msg) if err != nil { @@ -73,20 +80,20 @@ func newEventPublisherSystemMetrics(scope promutils.Scope) eventPublisherSystemM } } -func NewEventsPublisher(pub pubsub.Publisher, scope promutils.Scope, eventTypes string) interfaces.Publisher { +func NewEventsPublisher(pub pubsub.Publisher, scope promutils.Scope, eventTypes []string) interfaces.Publisher { eventSet := sets.NewString() - if strings.Contains(eventTypes, "*") || strings.Contains(eventTypes, "all") { - for _, e := range supportedEvents { - eventSet = eventSet.Insert(e) - } - } else { - events := strings.Split(eventTypes, ",") - for _, event := range events { - if e, found := supportedEvents[event]; found { + + for _, event := range eventTypes { + if event == AllTypes || event == AllTypesShort { + for _, e := range supportedEvents { eventSet = eventSet.Insert(e) - } else { - logger.Errorf(context.Background(), "Unsupported event type [%s] in the config") } + break + } + if e, found := supportedEvents[event]; found { + eventSet = eventSet.Insert(e) + } else { + logger.Errorf(context.Background(), "Unsupported event type [%s] in the config") } } diff --git a/pkg/async/notifications/implementations/event_publisher_test.go b/pkg/async/notifications/implementations/event_publisher_test.go index 3f23ab677..dbfed5607 100644 --- a/pkg/async/notifications/implementations/event_publisher_test.go +++ b/pkg/async/notifications/implementations/event_publisher_test.go @@ -91,52 +91,52 @@ func TestNewEventsPublisher_EventTypes(t *testing.T) { { tests := []struct { name string - eventTypes string + eventTypes []string events []proto.Message shouldSendEvent []bool expectedSendCnt int }{ - {"eventTypes as workflow,node", "workflow,node", + {"eventTypes as workflow,node", []string{"workflow", "node"}, []proto.Message{workflowRequest, nodeRequest, taskRequest}, []bool{true, true, false}, 2}, - {"eventTypes as workflow,task", "workflow,task", + {"eventTypes as workflow,task", []string{"workflow", "task"}, []proto.Message{workflowRequest, nodeRequest, taskRequest}, []bool{true, false, true}, 2}, - {"eventTypes as workflow,task", "node,task", + {"eventTypes as workflow,task", []string{"node", "task"}, []proto.Message{workflowRequest, nodeRequest, taskRequest}, []bool{false, true, true}, 2}, - {"eventTypes as task", "task", + {"eventTypes as task", []string{"task"}, []proto.Message{taskRequest}, []bool{true}, 1}, - {"eventTypes as node", "node", + {"eventTypes as node", []string{"node"}, []proto.Message{nodeRequest}, []bool{true}, 1}, - {"eventTypes as workflow", "workflow", + {"eventTypes as workflow", []string{"workflow"}, []proto.Message{workflowRequest}, []bool{true}, 1}, - {"eventTypes as workflow", "workflow", + {"eventTypes as workflow", []string{"workflow"}, []proto.Message{nodeRequest, taskRequest}, []bool{false, false}, 0}, - {"eventTypes as task", "task", + {"eventTypes as task", []string{"task"}, []proto.Message{workflowRequest, nodeRequest}, []bool{false, false}, 0}, - {"eventTypes as node", "node", + {"eventTypes as node", []string{"node"}, []proto.Message{workflowRequest, taskRequest}, []bool{false, false}, 0}, - {"eventTypes as all", "all", + {"eventTypes as all", []string{"all"}, []proto.Message{workflowRequest, nodeRequest, taskRequest}, []bool{true, true, true}, 3}, - {"eventTypes as *", "*", + {"eventTypes as *", []string{"*"}, []proto.Message{workflowRequest, nodeRequest, taskRequest}, []bool{true, true, true}, 3}, @@ -165,7 +165,7 @@ func TestNewEventsPublisher_EventTypes(t *testing.T) { func TestEventPublisher_PublishError(t *testing.T) { initializeEventPublisher() - currentEventPublisher := NewEventsPublisher(mockEventPublisher, promutils.NewTestScope(), "*") + currentEventPublisher := NewEventsPublisher(mockEventPublisher, promutils.NewTestScope(), []string{"*"}) var publishError = errors.New("publish() returns an error") testEventPublisher.GivenError = publishError assert.Equal(t, publishError, currentEventPublisher.Publish(context.Background(), diff --git a/pkg/manager/impl/node_execution_manager.go b/pkg/manager/impl/node_execution_manager.go index af9f51d07..87f2dbbd6 100644 --- a/pkg/manager/impl/node_execution_manager.go +++ b/pkg/manager/impl/node_execution_manager.go @@ -4,7 +4,7 @@ import ( "context" "strconv" - "github.com/gogo/protobuf/proto" + "github.com/golang/protobuf/proto" notificationInterfaces "github.com/lyft/flyteadmin/pkg/async/notifications/interfaces" "github.com/lyft/flytestdlib/storage" diff --git a/pkg/runtime/interfaces/application_configuration.go b/pkg/runtime/interfaces/application_configuration.go index 882426288..6cffceb29 100644 --- a/pkg/runtime/interfaces/application_configuration.go +++ b/pkg/runtime/interfaces/application_configuration.go @@ -151,7 +151,7 @@ type EventsPublisherConfig struct { // The topic which events should be published, e.g. node, task, workflow TopicName string `json:"topicName"` // Event types: task, node, workflow executions - EventTypes string `json:"eventTypes"` + EventTypes []string `json:"eventTypes"` } type ExternalEventsConfig struct { From e242acb04c9fb56166763f8593fba85d04af5104 Mon Sep 17 00:00:00 2001 From: tnsetting Date: Mon, 8 Feb 2021 22:03:26 +0100 Subject: [PATCH 9/9] Add config example --- flyteadmin_config.yaml | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/flyteadmin_config.yaml b/flyteadmin_config.yaml index 11749e42d..e62521ada 100644 --- a/flyteadmin_config.yaml +++ b/flyteadmin_config.yaml @@ -78,6 +78,14 @@ notifications: Execution \"{{ name }}\" has {{ phase }} in \"{{ domain }}\". View details at http://example.com/projects/{{ project }}/domains/{{ domain }}/executions/{{ name }}. {{ error }} +externalEvents: + Enable: false + type: gcp + gcp: + projectId: "foo" + eventsPublisher: + topicName: "bar" + eventTypes: all Logger: show-source: true level: 6