Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Address comments and use new config section
Browse files Browse the repository at this point in the history
  • Loading branch information
tnsetting committed Feb 5, 2021
1 parent 33ed060 commit 7c2a395
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 50 deletions.
18 changes: 7 additions & 11 deletions pkg/async/notifications/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,22 +149,18 @@ func NewNotificationsPublisher(config runtimeInterfaces.NotificationsConfig, sco
}
}

func NewEventsPublisher(config runtimeInterfaces.NotificationsConfig, scope promutils.Scope) interfaces.Publisher {
if config.ExternalEvent.EventPublisherConfig.TopicName == "" {
func NewEventsPublisher(config runtimeInterfaces.ExternalEventsConfig, scope promutils.Scope) interfaces.Publisher {
if config.Enable == false {
return implementations.NewNoopPublish()
}
reconnectAttempts := config.ReconnectAttempts
reconnectDelay := time.Duration(config.ReconnectDelaySeconds) * time.Second
switch config.Type {
case common.AWS:
snsConfig := gizmoAWS.SNSConfig{
Topic: config.ExternalEvent.EventPublisherConfig.TopicName,
}
if config.AWSConfig.Region != "" {
snsConfig.Region = config.AWSConfig.Region
} else {
snsConfig.Region = config.Region
Topic: config.EventsPublisherConfig.TopicName,
}
snsConfig.Region = config.AWSConfig.Region

var publisher pubsub.Publisher
var err error
Expand All @@ -177,10 +173,10 @@ func NewEventsPublisher(config runtimeInterfaces.NotificationsConfig, scope prom
if err != nil {
panic(err)
}
return implementations.NewEventsPublisher(publisher, scope, config.ExternalEvent.EventPublisherConfig.EventTypes)
return implementations.NewEventsPublisher(publisher, scope, config.EventsPublisherConfig.EventTypes)
case common.GCP:
pubsubConfig := gizmoGCP.Config{
Topic: config.ExternalEvent.EventPublisherConfig.TopicName,
Topic: config.EventsPublisherConfig.TopicName,
}
pubsubConfig.ProjectID = config.GCPConfig.ProjectID
var publisher pubsub.MultiPublisher
Expand All @@ -193,7 +189,7 @@ func NewEventsPublisher(config runtimeInterfaces.NotificationsConfig, scope prom
if err != nil {
panic(err)
}
return implementations.NewEventsPublisher(publisher, scope, config.ExternalEvent.EventPublisherConfig.EventTypes)
return implementations.NewEventsPublisher(publisher, scope, config.EventsPublisherConfig.EventTypes)
case common.Local:
fallthrough
default:
Expand Down
56 changes: 28 additions & 28 deletions pkg/async/notifications/implementations/event_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package implementations

import (
"context"
"k8s.io/apimachinery/pkg/util/sets"
"strings"

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"
Expand All @@ -16,33 +17,33 @@ import (
)

type eventPublisherSystemMetrics struct {
Scope promutils.Scope
PublishTotal prometheus.Counter
PublishError prometheus.Counter
Scope promutils.Scope
PublishTotal prometheus.Counter
PublishSuccess prometheus.Counter
PublishError prometheus.Counter
}

// TODO: Add a counter that encompasses the publisher stats grouped by project and domain.
type EventPublisher struct {
pub pubsub.Publisher
systemMetrics eventPublisherSystemMetrics
events []string
events sets.String
}

func getSupportedEvents() map[string]string {
supportedEvents := make(map[string]string)
var taskExecutionReq admin.TaskExecutionEventRequest
supportedEvents["task"] = proto.MessageName(&taskExecutionReq)
var nodeExecutionReq admin.NodeExecutionEventRequest
supportedEvents["node"] = proto.MessageName(&nodeExecutionReq)
var workflowExecutionReq admin.WorkflowExecutionEventRequest
supportedEvents["workflow"] = proto.MessageName(&workflowExecutionReq)
return supportedEvents
var taskExecutionReq admin.TaskExecutionEventRequest
var nodeExecutionReq admin.NodeExecutionEventRequest
var workflowExecutionReq admin.WorkflowExecutionEventRequest

var supportedEvents = map[string]string{
"task": proto.MessageName(&taskExecutionReq),
"node": proto.MessageName(&nodeExecutionReq),
"workflow": proto.MessageName(&workflowExecutionReq),
}

// The key is the notification type as defined as an enum.
func (p *EventPublisher) Publish(ctx context.Context, notificationType string, msg proto.Message) error {
p.systemMetrics.PublishTotal.Inc()
logger.Debugf(ctx, "Publishing the following message [%s]", msg.String())
logger.Debugf(ctx, "Publishing the following message [%+v]", msg)

if !p.shouldPublishEvent(notificationType) {
return nil
Expand All @@ -52,47 +53,46 @@ func (p *EventPublisher) Publish(ctx context.Context, notificationType string, m
if err != nil {
p.systemMetrics.PublishError.Inc()
logger.Errorf(ctx, "Failed to publish a message with key [%s] and message [%s] and error: %v", notificationType, msg.String(), err)
} else {
p.systemMetrics.PublishSuccess.Inc()
}
return err
}

func (p *EventPublisher) shouldPublishEvent(notificationType string) bool {
for _, e := range p.events {
if e == notificationType {
return true
}
}
return false
return p.events.Has(notificationType)
}

func newEventPublisherSystemMetrics(scope promutils.Scope) eventPublisherSystemMetrics {
return eventPublisherSystemMetrics{
Scope: scope,
PublishTotal: scope.MustNewCounter("event_publish_total", "overall count of publish messages"),
PublishError: scope.MustNewCounter("event_publish_errors", "count of publish errors"),
Scope: scope,
PublishTotal: scope.MustNewCounter("event_publish_total", "overall count of publish messages"),
PublishSuccess: scope.MustNewCounter("event_publish_success", "sucess count of publish messages"),
PublishError: scope.MustNewCounter("event_publish_errors", "count of publish errors"),
}
}

func NewEventsPublisher(pub pubsub.Publisher, scope promutils.Scope, eventTypes string) interfaces.Publisher {
supportedEvents := getSupportedEvents()

var eventList = make([]string, 0)
eventSet := sets.NewString()
if strings.Contains(eventTypes, "*") || strings.Contains(eventTypes, "all") {
for _, e := range supportedEvents {
eventList = append(eventList, e)
eventSet = eventSet.Insert(e)
}
} else {
events := strings.Split(eventTypes, ",")
for _, event := range events {
if e, found := supportedEvents[event]; found {
eventList = append(eventList, e)
eventSet = eventSet.Insert(e)
} else {
logger.Errorf(context.Background(), "Unsupported event type [%s] in the config")
}
}
}

return &EventPublisher{
pub: pub,
systemMetrics: newEventPublisherSystemMetrics(scope.NewSubScope("events_publisher")),
events: eventList,
events: eventSet,
}
}
2 changes: 1 addition & 1 deletion pkg/rpc/adminservice/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func NewAdminServer(kubeConfig, master string) *AdminService {

publisher := notifications.NewNotificationsPublisher(*configuration.ApplicationConfiguration().GetNotificationsConfig(), adminScope)
processor := notifications.NewNotificationsProcessor(*configuration.ApplicationConfiguration().GetNotificationsConfig(), adminScope)
eventPublisher := notifications.NewEventsPublisher(*configuration.ApplicationConfiguration().GetNotificationsConfig(), adminScope)
eventPublisher := notifications.NewEventsPublisher(*configuration.ApplicationConfiguration().GetExternalEventsConfig(), adminScope)
go func() {
logger.Info(context.Background(), "Started processing notifications.")
processor.StartProcessing()
Expand Down
7 changes: 7 additions & 0 deletions pkg/runtime/application_config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ const scheduler = "scheduler"
const remoteData = "remoteData"
const notifications = "notifications"
const domains = "domains"
const externalEvents = "externalEvents"

var databaseConfig = config.MustRegisterSection(database, &interfaces.DbConfigSection{})
var flyteAdminConfig = config.MustRegisterSection(flyteAdmin, &interfaces.ApplicationConfig{})
var schedulerConfig = config.MustRegisterSection(scheduler, &interfaces.SchedulerConfig{})
var remoteDataConfig = config.MustRegisterSection(remoteData, &interfaces.RemoteDataConfig{})
var notificationsConfig = config.MustRegisterSection(notifications, &interfaces.NotificationsConfig{})
var domainsConfig = config.MustRegisterSection(domains, &interfaces.DomainsConfig{})
var externalEventsConfig = config.MustRegisterSection(externalEvents, &interfaces.ExternalEventsConfig{})

// Implementation of an interfaces.ApplicationConfiguration
type ApplicationConfigurationProvider struct{}
Expand Down Expand Up @@ -72,6 +74,11 @@ func (p *ApplicationConfigurationProvider) GetNotificationsConfig() *interfaces.
func (p *ApplicationConfigurationProvider) GetDomainsConfig() *interfaces.DomainsConfig {
return domainsConfig.GetConfig().(*interfaces.DomainsConfig)
}

func (p *ApplicationConfigurationProvider) GetExternalEventsConfig() *interfaces.ExternalEventsConfig {
return externalEventsConfig.GetConfig().(*interfaces.ExternalEventsConfig)
}

func NewApplicationConfigurationProvider() interfaces.ApplicationConfiguration {
return &ApplicationConfigurationProvider{}
}
18 changes: 14 additions & 4 deletions pkg/runtime/interfaces/application_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,19 @@ type EventsPublisherConfig struct {
EventTypes string `json:"eventTypes"`
}

type ExternalEvent struct {
EventPublisherConfig EventsPublisherConfig `json:"eventPublisher"`
type ExternalEventsConfig struct {
Enable bool `json:"enable"`
// Defines the cloud provider that backs the scheduler. In the absence of a specification the no-op, 'local'
// scheme is used.
Type string `json:"type"`
AWSConfig AWSConfig `json:"aws"`
GCPConfig GCPConfig `json:"gcp"`
// Publish events to a pubsub tops
EventsPublisherConfig EventsPublisherConfig `json:"eventsPublisher"`
// Number of times to attempt recreating a notifications processor client should there be any disruptions.
ReconnectAttempts int `json:"reconnectAttempts"`
// Specifies the time interval to wait before attempting to reconnect the notifications processor client.
ReconnectDelaySeconds int `json:"reconnectDelaySeconds"`
}

// Configuration specific to notifications handling
Expand All @@ -174,8 +185,6 @@ type NotificationsConfig struct {
ReconnectAttempts int `json:"reconnectAttempts"`
// Specifies the time interval to wait before attempting to reconnect the notifications processor client.
ReconnectDelaySeconds int `json:"reconnectDelaySeconds"`
// Publish events to a pubsub tops
ExternalEvent ExternalEvent `json:"externalEvent"`
}

// Domains are always globally set in the application config, whereas individual projects can be individually registered.
Expand All @@ -196,4 +205,5 @@ type ApplicationConfiguration interface {
GetRemoteDataConfig() *RemoteDataConfig
GetNotificationsConfig() *NotificationsConfig
GetDomainsConfig() *DomainsConfig
GetExternalEventsConfig() *ExternalEventsConfig
}
21 changes: 15 additions & 6 deletions pkg/runtime/mocks/mock_application_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import (
)

type MockApplicationProvider struct {
dbConfig interfaces.DbConfig
topLevelConfig interfaces.ApplicationConfig
schedulerConfig interfaces.SchedulerConfig
remoteDataConfig interfaces.RemoteDataConfig
notificationsConfig interfaces.NotificationsConfig
domainsConfig interfaces.DomainsConfig
dbConfig interfaces.DbConfig
topLevelConfig interfaces.ApplicationConfig
schedulerConfig interfaces.SchedulerConfig
remoteDataConfig interfaces.RemoteDataConfig
notificationsConfig interfaces.NotificationsConfig
domainsConfig interfaces.DomainsConfig
externalEventsConfig interfaces.ExternalEventsConfig
}

func (p *MockApplicationProvider) GetDbConfig() interfaces.DbConfig {
Expand Down Expand Up @@ -60,3 +61,11 @@ func (p *MockApplicationProvider) GetDomainsConfig() *interfaces.DomainsConfig {
func (p *MockApplicationProvider) SetDomainsConfig(domainsConfig interfaces.DomainsConfig) {
p.domainsConfig = domainsConfig
}

func (p *MockApplicationProvider) SetExternalEventsConfig(externalEventsConfig interfaces.ExternalEventsConfig) {
p.externalEventsConfig = externalEventsConfig
}

func (p *MockApplicationProvider) GetExternalEventsConfig() *interfaces.ExternalEventsConfig {
return &p.externalEventsConfig
}

0 comments on commit 7c2a395

Please sign in to comment.