Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Publish raw events #151

Merged
merged 9 commits into from
Feb 9, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions pkg/async/notifications/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,3 +148,57 @@ func NewNotificationsPublisher(config runtimeInterfaces.NotificationsConfig, sco
return implementations.NewNoopPublish()
}
}

func NewEventsPublisher(config runtimeInterfaces.NotificationsConfig, scope promutils.Scope) interfaces.Publisher {
if config.ExternalEvent.EventPublisherConfig.TopicName == "" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we explicitly just use the Type of the Notificationsconfig. and if it's not no-op, error when Topic Name is empty?

return implementations.NewNoopPublish()
}
reconnectAttempts := config.ReconnectAttempts
reconnectDelay := time.Duration(config.ReconnectDelaySeconds) * time.Second
switch config.Type {
case common.AWS:
snsConfig := gizmoAWS.SNSConfig{
Topic: config.ExternalEvent.EventPublisherConfig.TopicName,
}
if config.AWSConfig.Region != "" {
snsConfig.Region = config.AWSConfig.Region
} else {
snsConfig.Region = config.Region
}

var publisher pubsub.Publisher
var err error
err = async.Retry(reconnectAttempts, reconnectDelay, func() error {
publisher, err = gizmoAWS.NewPublisher(snsConfig)
return err
})

// Any persistent errors initiating Publisher with Amazon configurations results in a failed start up.
if err != nil {
panic(err)
}
return implementations.NewEventsPublisher(publisher, scope, config.ExternalEvent.EventPublisherConfig.EventTypes)
case common.GCP:
pubsubConfig := gizmoGCP.Config{
Topic: config.ExternalEvent.EventPublisherConfig.TopicName,
}
pubsubConfig.ProjectID = config.GCPConfig.ProjectID
var publisher pubsub.MultiPublisher
var err error
err = async.Retry(reconnectAttempts, reconnectDelay, func() error {
publisher, err = gizmoGCP.NewPublisher(context.TODO(), pubsubConfig)
return err
})

if err != nil {
panic(err)
}
return implementations.NewEventsPublisher(publisher, scope, config.ExternalEvent.EventPublisherConfig.EventTypes)
case common.Local:
fallthrough
default:
logger.Infof(context.Background(),
"Using default noop events publisher implementation for config type [%s]", config.Type)
return implementations.NewNoopPublish()
}
}
98 changes: 98 additions & 0 deletions pkg/async/notifications/implementations/event_publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package implementations

import (
"context"
"strings"

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"

"github.com/lyft/flyteadmin/pkg/async/notifications/interfaces"

"github.com/NYTimes/gizmo/pubsub"
"github.com/golang/protobuf/proto"
"github.com/lyft/flytestdlib/logger"
"github.com/lyft/flytestdlib/promutils"
"github.com/prometheus/client_golang/prometheus"
)

type eventPublisherSystemMetrics struct {
Scope promutils.Scope
PublishTotal prometheus.Counter
PublishError prometheus.Counter
}

// TODO: Add a counter that encompasses the publisher stats grouped by project and domain.
type EventPublisher struct {
pub pubsub.Publisher
systemMetrics eventPublisherSystemMetrics
events []string
}

func getSupportedEvents() map[string]string {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: instead of using a func you can actually initialize the map in golang as a var. see https://stackoverflow.com/questions/41078272/initializing-a-go-map-in-a-single-statement for an example

supportedEvents := make(map[string]string)
var taskExecutionReq admin.TaskExecutionEventRequest
supportedEvents["task"] = proto.MessageName(&taskExecutionReq)
var nodeExecutionReq admin.NodeExecutionEventRequest
supportedEvents["node"] = proto.MessageName(&nodeExecutionReq)
var workflowExecutionReq admin.WorkflowExecutionEventRequest
supportedEvents["workflow"] = proto.MessageName(&workflowExecutionReq)
return supportedEvents
}

// The key is the notification type as defined as an enum.
func (p *EventPublisher) Publish(ctx context.Context, notificationType string, msg proto.Message) error {
p.systemMetrics.PublishTotal.Inc()
logger.Debugf(ctx, "Publishing the following message [%s]", msg.String())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does String() return a human readable message? if not can we use ...[%+v]", msg) instead?


if !p.shouldPublishEvent(notificationType) {
return nil
}

err := p.pub.Publish(ctx, notificationType, msg)
if err != nil {
p.systemMetrics.PublishError.Inc()
logger.Errorf(ctx, "Failed to publish a message with key [%s] and message [%s] and error: %v", notificationType, msg.String(), err)
}
return err
}

func (p *EventPublisher) shouldPublishEvent(notificationType string) bool {
for _, e := range p.events {
if e == notificationType {
return true
}
}
return false
}

func newEventPublisherSystemMetrics(scope promutils.Scope) eventPublisherSystemMetrics {
return eventPublisherSystemMetrics{
Scope: scope,
PublishTotal: scope.MustNewCounter("event_publish_total", "overall count of publish messages"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i know it's all additive, but can we add a success counter too?

PublishError: scope.MustNewCounter("event_publish_errors", "count of publish errors"),
}
}

func NewEventsPublisher(pub pubsub.Publisher, scope promutils.Scope, eventTypes string) interfaces.Publisher {
supportedEvents := getSupportedEvents()

var eventList = make([]string, 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you want to use a String set here instead since you only ever check for membership/

if strings.Contains(eventTypes, "*") || strings.Contains(eventTypes, "all") {
for _, e := range supportedEvents {
eventList = append(eventList, e)
}
} else {
events := strings.Split(eventTypes, ",")
for _, event := range events {
if e, found := supportedEvents[event]; found {
eventList = append(eventList, e)
}
}
katrogan marked this conversation as resolved.
Show resolved Hide resolved
}

return &EventPublisher{
pub: pub,
systemMetrics: newEventPublisherSystemMetrics(scope.NewSubScope("events_publisher")),
events: eventList,
}
}
173 changes: 173 additions & 0 deletions pkg/async/notifications/implementations/event_publisher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package implementations

import (
"context"
"errors"
"testing"
"time"

"github.com/golang/protobuf/ptypes"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/event"

"github.com/NYTimes/gizmo/pubsub"
"github.com/NYTimes/gizmo/pubsub/pubsubtest"
"github.com/golang/protobuf/proto"
"github.com/lyft/flytestdlib/promutils"
"github.com/stretchr/testify/assert"
)

var testEventPublisher pubsubtest.TestPublisher
var mockEventPublisher pubsub.Publisher = &testEventPublisher

var executionID = core.WorkflowExecutionIdentifier{
Project: "project",
Domain: "domain",
Name: "name",
}
var nodeExecutionID = core.NodeExecutionIdentifier{
NodeId: "node id",
ExecutionId: &executionID,
}

var taskID = &core.Identifier{
ResourceType: core.ResourceType_TASK,
Project: "p",
Domain: "d",
Version: "v",
Name: "n",
}

var occurredAt = time.Now().UTC()
var occurredAtProto, _ = ptypes.TimestampProto(occurredAt)

var taskPhase = core.TaskExecution_RUNNING

var retryAttempt = uint32(1)

const requestID = "request id"

var taskRequest = &admin.TaskExecutionEventRequest{
RequestId: requestID,
Event: &event.TaskExecutionEvent{
TaskId: taskID,
ParentNodeExecutionId: &nodeExecutionID,
RetryAttempt: retryAttempt,
Phase: taskPhase,
OccurredAt: occurredAtProto,
},
}

var nodeRequest = &admin.NodeExecutionEventRequest{
RequestId: requestID,
Event: &event.NodeExecutionEvent{
ProducerId: "propeller",
Id: &nodeExecutionID,
OccurredAt: occurredAtProto,
Phase: core.NodeExecution_RUNNING,
InputUri: "input uri",
},
}

var workflowRequest = &admin.WorkflowExecutionEventRequest{
Event: &event.WorkflowExecutionEvent{
Phase: core.WorkflowExecution_SUCCEEDED,
OutputResult: &event.WorkflowExecutionEvent_OutputUri{
OutputUri: "somestring",
},
ExecutionId: &executionID,
},
}

// This method should be invoked before every test around Publisher.
func initializeEventPublisher() {
testEventPublisher.Published = nil
testEventPublisher.GivenError = nil
testEventPublisher.FoundError = nil
}

func TestNewEventsPublisher_EventTypes(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this very exhaustive test 👍

{
tests := []struct {
name string
eventTypes string
events []proto.Message
shouldSendEvent []bool
expectedSendCnt int
}{
{"eventTypes as workflow,node", "workflow,node",
[]proto.Message{workflowRequest, nodeRequest, taskRequest},
[]bool{true, true, false},
2},
{"eventTypes as workflow,task", "workflow,task",
[]proto.Message{workflowRequest, nodeRequest, taskRequest},
[]bool{true, false, true},
2},
{"eventTypes as workflow,task", "node,task",
[]proto.Message{workflowRequest, nodeRequest, taskRequest},
[]bool{false, true, true},
2},
{"eventTypes as task", "task",
[]proto.Message{taskRequest},
[]bool{true},
1},
{"eventTypes as node", "node",
[]proto.Message{nodeRequest},
[]bool{true},
1},
{"eventTypes as workflow", "workflow",
[]proto.Message{workflowRequest},
[]bool{true},
1},
{"eventTypes as workflow", "workflow",
[]proto.Message{nodeRequest, taskRequest},
[]bool{false, false},
0},
{"eventTypes as task", "task",
[]proto.Message{workflowRequest, nodeRequest},
[]bool{false, false},
0},
{"eventTypes as node", "node",
[]proto.Message{workflowRequest, taskRequest},
[]bool{false, false},
0},
{"eventTypes as all", "all",
[]proto.Message{workflowRequest, nodeRequest, taskRequest},
[]bool{true, true, true},
3},
{"eventTypes as *", "*",
[]proto.Message{workflowRequest, nodeRequest, taskRequest},
[]bool{true, true, true},
3},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
initializeEventPublisher()
var currentEventPublisher = NewEventsPublisher(mockEventPublisher, promutils.NewTestScope(), test.eventTypes)
var cnt = 0
for id, event := range test.events {
assert.Nil(t, currentEventPublisher.Publish(context.Background(), proto.MessageName(event),
event))
if test.shouldSendEvent[id] {
assert.Equal(t, proto.MessageName(event), testEventPublisher.Published[cnt].Key)
marshalledData, err := proto.Marshal(event)
assert.Nil(t, err)
assert.Equal(t, marshalledData, testEventPublisher.Published[cnt].Body)
cnt++
}
}
assert.Equal(t, test.expectedSendCnt, len(testEventPublisher.Published))
})
}
}
}

func TestEventPublisher_PublishError(t *testing.T) {
initializeEventPublisher()
currentEventPublisher := NewEventsPublisher(mockEventPublisher, promutils.NewTestScope(), "*")
var publishError = errors.New("publish() returns an error")
testEventPublisher.GivenError = publishError
assert.Equal(t, publishError, currentEventPublisher.Publish(context.Background(),
proto.MessageName(taskRequest), taskRequest))
}
21 changes: 10 additions & 11 deletions pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type executionSystemMetrics struct {
SpecSizeBytes prometheus.Summary
ClosureSizeBytes prometheus.Summary
AcceptanceDelay prometheus.Summary
PublishEventError prometheus.Counter
}

type executionUserMetrics struct {
Expand All @@ -89,6 +90,7 @@ type ExecutionManager struct {
namedEntityManager interfaces.NamedEntityInterface
resourceManager interfaces.ResourceInterface
qualityOfServiceAllocator executions.QualityOfServiceAllocator
eventPublisher notificationInterfaces.Publisher
}

func getExecutionContext(ctx context.Context, id *core.WorkflowExecutionIdentifier) context.Context {
Expand Down Expand Up @@ -1004,6 +1006,10 @@ func (m *ExecutionManager) CreateWorkflowEvent(ctx context.Context, request admi
return nil, err
}
}
if err := m.eventPublisher.Publish(ctx, proto.MessageName(&request), &request); err != nil {
m.systemMetrics.PublishEventError.Inc()
logger.Infof(ctx, "error publishing event [%+v] with err: [%v]", request.RequestId, err)
}

m.systemMetrics.ExecutionEventsCreated.Inc()
return &admin.WorkflowExecutionEventResponse{}, nil
Expand Down Expand Up @@ -1326,20 +1332,12 @@ func newExecutionSystemMetrics(scope promutils.Scope) executionSystemMetrics {
ClosureSizeBytes: scope.MustNewSummary("closure_size_bytes", "size in bytes of serialized execution closure"),
AcceptanceDelay: scope.MustNewSummary("acceptance_delay",
"delay in seconds from when an execution was requested to be created and when it actually was"),
PublishEventError: scope.MustNewCounter("publish_event_error",
"overall count of publish event errors when invoking publish()"),
}
}

func NewExecutionManager(
db repositories.RepositoryInterface,
config runtimeInterfaces.Configuration,
storageClient *storage.DataStore,
workflowExecutor workflowengineInterfaces.Executor,
systemScope promutils.Scope,
userScope promutils.Scope,
publisher notificationInterfaces.Publisher,
urlData dataInterfaces.RemoteURLInterface,
workflowManager interfaces.WorkflowInterface,
namedEntityManager interfaces.NamedEntityInterface) interfaces.ExecutionInterface {
func NewExecutionManager(db repositories.RepositoryInterface, config runtimeInterfaces.Configuration, storageClient *storage.DataStore, workflowExecutor workflowengineInterfaces.Executor, systemScope promutils.Scope, userScope promutils.Scope, publisher notificationInterfaces.Publisher, urlData dataInterfaces.RemoteURLInterface, workflowManager interfaces.WorkflowInterface, namedEntityManager interfaces.NamedEntityInterface, eventPublisher notificationInterfaces.Publisher) interfaces.ExecutionInterface {
queueAllocator := executions.NewQueueAllocator(config, db)
systemMetrics := newExecutionSystemMetrics(systemScope)

Expand Down Expand Up @@ -1369,6 +1367,7 @@ func NewExecutionManager(
namedEntityManager: namedEntityManager,
resourceManager: resourceManager,
qualityOfServiceAllocator: executions.NewQualityOfServiceAllocator(config, resourceManager),
eventPublisher: eventPublisher,
}
}

Expand Down
Loading