From 7c2a395a5701bab6b51b814282da5dab50c54655 Mon Sep 17 00:00:00 2001 From: tnsetting Date: Fri, 5 Feb 2021 16:18:25 +0100 Subject: [PATCH] Address comments and use new config section --- pkg/async/notifications/factory.go | 18 +++--- .../implementations/event_publisher.go | 56 +++++++++---------- pkg/rpc/adminservice/base.go | 2 +- pkg/runtime/application_config_provider.go | 7 +++ .../interfaces/application_configuration.go | 18 ++++-- .../mocks/mock_application_provider.go | 21 +++++-- 6 files changed, 72 insertions(+), 50 deletions(-) diff --git a/pkg/async/notifications/factory.go b/pkg/async/notifications/factory.go index 18252aa0d..e8fe6463d 100644 --- a/pkg/async/notifications/factory.go +++ b/pkg/async/notifications/factory.go @@ -149,8 +149,8 @@ func NewNotificationsPublisher(config runtimeInterfaces.NotificationsConfig, sco } } -func NewEventsPublisher(config runtimeInterfaces.NotificationsConfig, scope promutils.Scope) interfaces.Publisher { - if config.ExternalEvent.EventPublisherConfig.TopicName == "" { +func NewEventsPublisher(config runtimeInterfaces.ExternalEventsConfig, scope promutils.Scope) interfaces.Publisher { + if config.Enable == false { return implementations.NewNoopPublish() } reconnectAttempts := config.ReconnectAttempts @@ -158,13 +158,9 @@ func NewEventsPublisher(config runtimeInterfaces.NotificationsConfig, scope prom switch config.Type { case common.AWS: snsConfig := gizmoAWS.SNSConfig{ - Topic: config.ExternalEvent.EventPublisherConfig.TopicName, - } - if config.AWSConfig.Region != "" { - snsConfig.Region = config.AWSConfig.Region - } else { - snsConfig.Region = config.Region + Topic: config.EventsPublisherConfig.TopicName, } + snsConfig.Region = config.AWSConfig.Region var publisher pubsub.Publisher var err error @@ -177,10 +173,10 @@ func NewEventsPublisher(config runtimeInterfaces.NotificationsConfig, scope prom if err != nil { panic(err) } - return implementations.NewEventsPublisher(publisher, scope, config.ExternalEvent.EventPublisherConfig.EventTypes) + return implementations.NewEventsPublisher(publisher, scope, config.EventsPublisherConfig.EventTypes) case common.GCP: pubsubConfig := gizmoGCP.Config{ - Topic: config.ExternalEvent.EventPublisherConfig.TopicName, + Topic: config.EventsPublisherConfig.TopicName, } pubsubConfig.ProjectID = config.GCPConfig.ProjectID var publisher pubsub.MultiPublisher @@ -193,7 +189,7 @@ func NewEventsPublisher(config runtimeInterfaces.NotificationsConfig, scope prom if err != nil { panic(err) } - return implementations.NewEventsPublisher(publisher, scope, config.ExternalEvent.EventPublisherConfig.EventTypes) + return implementations.NewEventsPublisher(publisher, scope, config.EventsPublisherConfig.EventTypes) case common.Local: fallthrough default: diff --git a/pkg/async/notifications/implementations/event_publisher.go b/pkg/async/notifications/implementations/event_publisher.go index 747a4912f..51ddcaed2 100644 --- a/pkg/async/notifications/implementations/event_publisher.go +++ b/pkg/async/notifications/implementations/event_publisher.go @@ -2,6 +2,7 @@ package implementations import ( "context" + "k8s.io/apimachinery/pkg/util/sets" "strings" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin" @@ -16,33 +17,33 @@ import ( ) type eventPublisherSystemMetrics struct { - Scope promutils.Scope - PublishTotal prometheus.Counter - PublishError prometheus.Counter + Scope promutils.Scope + PublishTotal prometheus.Counter + PublishSuccess prometheus.Counter + PublishError prometheus.Counter } // TODO: Add a counter that encompasses the publisher stats grouped by project and domain. type EventPublisher struct { pub pubsub.Publisher systemMetrics eventPublisherSystemMetrics - events []string + events sets.String } -func getSupportedEvents() map[string]string { - supportedEvents := make(map[string]string) - var taskExecutionReq admin.TaskExecutionEventRequest - supportedEvents["task"] = proto.MessageName(&taskExecutionReq) - var nodeExecutionReq admin.NodeExecutionEventRequest - supportedEvents["node"] = proto.MessageName(&nodeExecutionReq) - var workflowExecutionReq admin.WorkflowExecutionEventRequest - supportedEvents["workflow"] = proto.MessageName(&workflowExecutionReq) - return supportedEvents +var taskExecutionReq admin.TaskExecutionEventRequest +var nodeExecutionReq admin.NodeExecutionEventRequest +var workflowExecutionReq admin.WorkflowExecutionEventRequest + +var supportedEvents = map[string]string{ + "task": proto.MessageName(&taskExecutionReq), + "node": proto.MessageName(&nodeExecutionReq), + "workflow": proto.MessageName(&workflowExecutionReq), } // The key is the notification type as defined as an enum. func (p *EventPublisher) Publish(ctx context.Context, notificationType string, msg proto.Message) error { p.systemMetrics.PublishTotal.Inc() - logger.Debugf(ctx, "Publishing the following message [%s]", msg.String()) + logger.Debugf(ctx, "Publishing the following message [%+v]", msg) if !p.shouldPublishEvent(notificationType) { return nil @@ -52,40 +53,39 @@ func (p *EventPublisher) Publish(ctx context.Context, notificationType string, m if err != nil { p.systemMetrics.PublishError.Inc() logger.Errorf(ctx, "Failed to publish a message with key [%s] and message [%s] and error: %v", notificationType, msg.String(), err) + } else { + p.systemMetrics.PublishSuccess.Inc() } return err } func (p *EventPublisher) shouldPublishEvent(notificationType string) bool { - for _, e := range p.events { - if e == notificationType { - return true - } - } - return false + return p.events.Has(notificationType) } func newEventPublisherSystemMetrics(scope promutils.Scope) eventPublisherSystemMetrics { return eventPublisherSystemMetrics{ - Scope: scope, - PublishTotal: scope.MustNewCounter("event_publish_total", "overall count of publish messages"), - PublishError: scope.MustNewCounter("event_publish_errors", "count of publish errors"), + Scope: scope, + PublishTotal: scope.MustNewCounter("event_publish_total", "overall count of publish messages"), + PublishSuccess: scope.MustNewCounter("event_publish_success", "sucess count of publish messages"), + PublishError: scope.MustNewCounter("event_publish_errors", "count of publish errors"), } } func NewEventsPublisher(pub pubsub.Publisher, scope promutils.Scope, eventTypes string) interfaces.Publisher { - supportedEvents := getSupportedEvents() - var eventList = make([]string, 0) + eventSet := sets.NewString() if strings.Contains(eventTypes, "*") || strings.Contains(eventTypes, "all") { for _, e := range supportedEvents { - eventList = append(eventList, e) + eventSet = eventSet.Insert(e) } } else { events := strings.Split(eventTypes, ",") for _, event := range events { if e, found := supportedEvents[event]; found { - eventList = append(eventList, e) + eventSet = eventSet.Insert(e) + } else { + logger.Errorf(context.Background(), "Unsupported event type [%s] in the config") } } } @@ -93,6 +93,6 @@ func NewEventsPublisher(pub pubsub.Publisher, scope promutils.Scope, eventTypes return &EventPublisher{ pub: pub, systemMetrics: newEventPublisherSystemMetrics(scope.NewSubScope("events_publisher")), - events: eventList, + events: eventSet, } } diff --git a/pkg/rpc/adminservice/base.go b/pkg/rpc/adminservice/base.go index 9c7923ce2..806970fa4 100644 --- a/pkg/rpc/adminservice/base.go +++ b/pkg/rpc/adminservice/base.go @@ -99,7 +99,7 @@ func NewAdminServer(kubeConfig, master string) *AdminService { publisher := notifications.NewNotificationsPublisher(*configuration.ApplicationConfiguration().GetNotificationsConfig(), adminScope) processor := notifications.NewNotificationsProcessor(*configuration.ApplicationConfiguration().GetNotificationsConfig(), adminScope) - eventPublisher := notifications.NewEventsPublisher(*configuration.ApplicationConfiguration().GetNotificationsConfig(), adminScope) + eventPublisher := notifications.NewEventsPublisher(*configuration.ApplicationConfiguration().GetExternalEventsConfig(), adminScope) go func() { logger.Info(context.Background(), "Started processing notifications.") processor.StartProcessing() diff --git a/pkg/runtime/application_config_provider.go b/pkg/runtime/application_config_provider.go index 58032b67e..1ad15c8ef 100644 --- a/pkg/runtime/application_config_provider.go +++ b/pkg/runtime/application_config_provider.go @@ -16,6 +16,7 @@ const scheduler = "scheduler" const remoteData = "remoteData" const notifications = "notifications" const domains = "domains" +const externalEvents = "externalEvents" var databaseConfig = config.MustRegisterSection(database, &interfaces.DbConfigSection{}) var flyteAdminConfig = config.MustRegisterSection(flyteAdmin, &interfaces.ApplicationConfig{}) @@ -23,6 +24,7 @@ var schedulerConfig = config.MustRegisterSection(scheduler, &interfaces.Schedule var remoteDataConfig = config.MustRegisterSection(remoteData, &interfaces.RemoteDataConfig{}) var notificationsConfig = config.MustRegisterSection(notifications, &interfaces.NotificationsConfig{}) var domainsConfig = config.MustRegisterSection(domains, &interfaces.DomainsConfig{}) +var externalEventsConfig = config.MustRegisterSection(externalEvents, &interfaces.ExternalEventsConfig{}) // Implementation of an interfaces.ApplicationConfiguration type ApplicationConfigurationProvider struct{} @@ -72,6 +74,11 @@ func (p *ApplicationConfigurationProvider) GetNotificationsConfig() *interfaces. func (p *ApplicationConfigurationProvider) GetDomainsConfig() *interfaces.DomainsConfig { return domainsConfig.GetConfig().(*interfaces.DomainsConfig) } + +func (p *ApplicationConfigurationProvider) GetExternalEventsConfig() *interfaces.ExternalEventsConfig { + return externalEventsConfig.GetConfig().(*interfaces.ExternalEventsConfig) +} + func NewApplicationConfigurationProvider() interfaces.ApplicationConfiguration { return &ApplicationConfigurationProvider{} } diff --git a/pkg/runtime/interfaces/application_configuration.go b/pkg/runtime/interfaces/application_configuration.go index 359c5edaa..882426288 100644 --- a/pkg/runtime/interfaces/application_configuration.go +++ b/pkg/runtime/interfaces/application_configuration.go @@ -154,8 +154,19 @@ type EventsPublisherConfig struct { EventTypes string `json:"eventTypes"` } -type ExternalEvent struct { - EventPublisherConfig EventsPublisherConfig `json:"eventPublisher"` +type ExternalEventsConfig struct { + Enable bool `json:"enable"` + // Defines the cloud provider that backs the scheduler. In the absence of a specification the no-op, 'local' + // scheme is used. + Type string `json:"type"` + AWSConfig AWSConfig `json:"aws"` + GCPConfig GCPConfig `json:"gcp"` + // Publish events to a pubsub tops + EventsPublisherConfig EventsPublisherConfig `json:"eventsPublisher"` + // Number of times to attempt recreating a notifications processor client should there be any disruptions. + ReconnectAttempts int `json:"reconnectAttempts"` + // Specifies the time interval to wait before attempting to reconnect the notifications processor client. + ReconnectDelaySeconds int `json:"reconnectDelaySeconds"` } // Configuration specific to notifications handling @@ -174,8 +185,6 @@ type NotificationsConfig struct { ReconnectAttempts int `json:"reconnectAttempts"` // Specifies the time interval to wait before attempting to reconnect the notifications processor client. ReconnectDelaySeconds int `json:"reconnectDelaySeconds"` - // Publish events to a pubsub tops - ExternalEvent ExternalEvent `json:"externalEvent"` } // Domains are always globally set in the application config, whereas individual projects can be individually registered. @@ -196,4 +205,5 @@ type ApplicationConfiguration interface { GetRemoteDataConfig() *RemoteDataConfig GetNotificationsConfig() *NotificationsConfig GetDomainsConfig() *DomainsConfig + GetExternalEventsConfig() *ExternalEventsConfig } diff --git a/pkg/runtime/mocks/mock_application_provider.go b/pkg/runtime/mocks/mock_application_provider.go index 0d9973dbc..33ac9a9b9 100644 --- a/pkg/runtime/mocks/mock_application_provider.go +++ b/pkg/runtime/mocks/mock_application_provider.go @@ -5,12 +5,13 @@ import ( ) type MockApplicationProvider struct { - dbConfig interfaces.DbConfig - topLevelConfig interfaces.ApplicationConfig - schedulerConfig interfaces.SchedulerConfig - remoteDataConfig interfaces.RemoteDataConfig - notificationsConfig interfaces.NotificationsConfig - domainsConfig interfaces.DomainsConfig + dbConfig interfaces.DbConfig + topLevelConfig interfaces.ApplicationConfig + schedulerConfig interfaces.SchedulerConfig + remoteDataConfig interfaces.RemoteDataConfig + notificationsConfig interfaces.NotificationsConfig + domainsConfig interfaces.DomainsConfig + externalEventsConfig interfaces.ExternalEventsConfig } func (p *MockApplicationProvider) GetDbConfig() interfaces.DbConfig { @@ -60,3 +61,11 @@ func (p *MockApplicationProvider) GetDomainsConfig() *interfaces.DomainsConfig { func (p *MockApplicationProvider) SetDomainsConfig(domainsConfig interfaces.DomainsConfig) { p.domainsConfig = domainsConfig } + +func (p *MockApplicationProvider) SetExternalEventsConfig(externalEventsConfig interfaces.ExternalEventsConfig) { + p.externalEventsConfig = externalEventsConfig +} + +func (p *MockApplicationProvider) GetExternalEventsConfig() *interfaces.ExternalEventsConfig { + return &p.externalEventsConfig +}