Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Publish raw events #151

Merged
merged 9 commits into from
Feb 9, 2021
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions pkg/async/notifications/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,3 +148,53 @@ func NewNotificationsPublisher(config runtimeInterfaces.NotificationsConfig, sco
return implementations.NewNoopPublish()
}
}

func NewEventsPublisher(config runtimeInterfaces.ExternalEventsConfig, scope promutils.Scope) interfaces.Publisher {
if !config.Enable {
return implementations.NewNoopPublish()
}
reconnectAttempts := config.ReconnectAttempts
reconnectDelay := time.Duration(config.ReconnectDelaySeconds) * time.Second
switch config.Type {
case common.AWS:
snsConfig := gizmoAWS.SNSConfig{
Topic: config.EventsPublisherConfig.TopicName,
}
snsConfig.Region = config.AWSConfig.Region

var publisher pubsub.Publisher
var err error
err = async.Retry(reconnectAttempts, reconnectDelay, func() error {
publisher, err = gizmoAWS.NewPublisher(snsConfig)
return err
})

// Any persistent errors initiating Publisher with Amazon configurations results in a failed start up.
if err != nil {
panic(err)
}
return implementations.NewEventsPublisher(publisher, scope, config.EventsPublisherConfig.EventTypes)
case common.GCP:
pubsubConfig := gizmoGCP.Config{
Topic: config.EventsPublisherConfig.TopicName,
}
pubsubConfig.ProjectID = config.GCPConfig.ProjectID
var publisher pubsub.MultiPublisher
var err error
err = async.Retry(reconnectAttempts, reconnectDelay, func() error {
publisher, err = gizmoGCP.NewPublisher(context.TODO(), pubsubConfig)
return err
})

if err != nil {
panic(err)
}
return implementations.NewEventsPublisher(publisher, scope, config.EventsPublisherConfig.EventTypes)
case common.Local:
fallthrough
default:
logger.Infof(context.Background(),
"Using default noop events publisher implementation for config type [%s]", config.Type)
return implementations.NewNoopPublish()
}
}
105 changes: 105 additions & 0 deletions pkg/async/notifications/implementations/event_publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package implementations

import (
"context"

"k8s.io/apimachinery/pkg/util/sets"

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"

"github.com/lyft/flyteadmin/pkg/async/notifications/interfaces"

"github.com/NYTimes/gizmo/pubsub"
"github.com/golang/protobuf/proto"
"github.com/lyft/flytestdlib/logger"
"github.com/lyft/flytestdlib/promutils"
"github.com/prometheus/client_golang/prometheus"
)

type eventPublisherSystemMetrics struct {
Scope promutils.Scope
PublishTotal prometheus.Counter
PublishSuccess prometheus.Counter
PublishError prometheus.Counter
}

// TODO: Add a counter that encompasses the publisher stats grouped by project and domain.
type EventPublisher struct {
pub pubsub.Publisher
systemMetrics eventPublisherSystemMetrics
events sets.String
}

var taskExecutionReq admin.TaskExecutionEventRequest
var nodeExecutionReq admin.NodeExecutionEventRequest
var workflowExecutionReq admin.WorkflowExecutionEventRequest

const (
Task = "task"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still concerned this is a little brittle because here we define the strings statically but in the manager methods we use the proto message name as the event type. does that correspond to these raw strings like "task" or "node"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Node = "node"
Workflow = "workflow"
AllTypes = "all"
AllTypesShort = "*"
)

var supportedEvents = map[string]string{
Task: proto.MessageName(&taskExecutionReq),
Node: proto.MessageName(&nodeExecutionReq),
Workflow: proto.MessageName(&workflowExecutionReq),
}

// The key is the notification type as defined as an enum.
func (p *EventPublisher) Publish(ctx context.Context, notificationType string, msg proto.Message) error {
p.systemMetrics.PublishTotal.Inc()

if !p.shouldPublishEvent(notificationType) {
return nil
}
logger.Debugf(ctx, "Publishing the following message [%+v]", msg)

err := p.pub.Publish(ctx, notificationType, msg)
if err != nil {
p.systemMetrics.PublishError.Inc()
logger.Errorf(ctx, "Failed to publish a message with key [%s] and message [%s] and error: %v", notificationType, msg.String(), err)
} else {
p.systemMetrics.PublishSuccess.Inc()
}
return err
}

func (p *EventPublisher) shouldPublishEvent(notificationType string) bool {
return p.events.Has(notificationType)
}

func newEventPublisherSystemMetrics(scope promutils.Scope) eventPublisherSystemMetrics {
return eventPublisherSystemMetrics{
Scope: scope,
PublishTotal: scope.MustNewCounter("event_publish_total", "overall count of publish messages"),
PublishSuccess: scope.MustNewCounter("event_publish_success", "success count of publish messages"),
PublishError: scope.MustNewCounter("event_publish_errors", "count of publish errors"),
}
}

func NewEventsPublisher(pub pubsub.Publisher, scope promutils.Scope, eventTypes []string) interfaces.Publisher {
eventSet := sets.NewString()

for _, event := range eventTypes {
if event == AllTypes || event == AllTypesShort {
for _, e := range supportedEvents {
eventSet = eventSet.Insert(e)
}
break
}
if e, found := supportedEvents[event]; found {
eventSet = eventSet.Insert(e)
} else {
logger.Errorf(context.Background(), "Unsupported event type [%s] in the config")
}
katrogan marked this conversation as resolved.
Show resolved Hide resolved
}

return &EventPublisher{
pub: pub,
systemMetrics: newEventPublisherSystemMetrics(scope.NewSubScope("events_publisher")),
events: eventSet,
}
}
173 changes: 173 additions & 0 deletions pkg/async/notifications/implementations/event_publisher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package implementations

import (
"context"
"errors"
"testing"
"time"

"github.com/golang/protobuf/ptypes"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/event"

"github.com/NYTimes/gizmo/pubsub"
"github.com/NYTimes/gizmo/pubsub/pubsubtest"
"github.com/golang/protobuf/proto"
"github.com/lyft/flytestdlib/promutils"
"github.com/stretchr/testify/assert"
)

var testEventPublisher pubsubtest.TestPublisher
var mockEventPublisher pubsub.Publisher = &testEventPublisher

var executionID = core.WorkflowExecutionIdentifier{
Project: "project",
Domain: "domain",
Name: "name",
}
var nodeExecutionID = core.NodeExecutionIdentifier{
NodeId: "node id",
ExecutionId: &executionID,
}

var taskID = &core.Identifier{
ResourceType: core.ResourceType_TASK,
Project: "p",
Domain: "d",
Version: "v",
Name: "n",
}

var occurredAt = time.Now().UTC()
var occurredAtProto, _ = ptypes.TimestampProto(occurredAt)

var taskPhase = core.TaskExecution_RUNNING

var retryAttempt = uint32(1)

const requestID = "request id"

var taskRequest = &admin.TaskExecutionEventRequest{
RequestId: requestID,
Event: &event.TaskExecutionEvent{
TaskId: taskID,
ParentNodeExecutionId: &nodeExecutionID,
RetryAttempt: retryAttempt,
Phase: taskPhase,
OccurredAt: occurredAtProto,
},
}

var nodeRequest = &admin.NodeExecutionEventRequest{
RequestId: requestID,
Event: &event.NodeExecutionEvent{
ProducerId: "propeller",
Id: &nodeExecutionID,
OccurredAt: occurredAtProto,
Phase: core.NodeExecution_RUNNING,
InputUri: "input uri",
},
}

var workflowRequest = &admin.WorkflowExecutionEventRequest{
Event: &event.WorkflowExecutionEvent{
Phase: core.WorkflowExecution_SUCCEEDED,
OutputResult: &event.WorkflowExecutionEvent_OutputUri{
OutputUri: "somestring",
},
ExecutionId: &executionID,
},
}

// This method should be invoked before every test around Publisher.
func initializeEventPublisher() {
testEventPublisher.Published = nil
testEventPublisher.GivenError = nil
testEventPublisher.FoundError = nil
}

func TestNewEventsPublisher_EventTypes(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this very exhaustive test 👍

{
tests := []struct {
name string
eventTypes []string
events []proto.Message
shouldSendEvent []bool
expectedSendCnt int
}{
{"eventTypes as workflow,node", []string{"workflow", "node"},
[]proto.Message{workflowRequest, nodeRequest, taskRequest},
[]bool{true, true, false},
2},
{"eventTypes as workflow,task", []string{"workflow", "task"},
[]proto.Message{workflowRequest, nodeRequest, taskRequest},
[]bool{true, false, true},
2},
{"eventTypes as workflow,task", []string{"node", "task"},
[]proto.Message{workflowRequest, nodeRequest, taskRequest},
[]bool{false, true, true},
2},
{"eventTypes as task", []string{"task"},
[]proto.Message{taskRequest},
[]bool{true},
1},
{"eventTypes as node", []string{"node"},
[]proto.Message{nodeRequest},
[]bool{true},
1},
{"eventTypes as workflow", []string{"workflow"},
[]proto.Message{workflowRequest},
[]bool{true},
1},
{"eventTypes as workflow", []string{"workflow"},
[]proto.Message{nodeRequest, taskRequest},
[]bool{false, false},
0},
{"eventTypes as task", []string{"task"},
[]proto.Message{workflowRequest, nodeRequest},
[]bool{false, false},
0},
{"eventTypes as node", []string{"node"},
[]proto.Message{workflowRequest, taskRequest},
[]bool{false, false},
0},
{"eventTypes as all", []string{"all"},
[]proto.Message{workflowRequest, nodeRequest, taskRequest},
[]bool{true, true, true},
3},
{"eventTypes as *", []string{"*"},
[]proto.Message{workflowRequest, nodeRequest, taskRequest},
[]bool{true, true, true},
3},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
initializeEventPublisher()
var currentEventPublisher = NewEventsPublisher(mockEventPublisher, promutils.NewTestScope(), test.eventTypes)
var cnt = 0
for id, event := range test.events {
assert.Nil(t, currentEventPublisher.Publish(context.Background(), proto.MessageName(event),
event))
if test.shouldSendEvent[id] {
assert.Equal(t, proto.MessageName(event), testEventPublisher.Published[cnt].Key)
marshalledData, err := proto.Marshal(event)
assert.Nil(t, err)
assert.Equal(t, marshalledData, testEventPublisher.Published[cnt].Body)
cnt++
}
}
assert.Equal(t, test.expectedSendCnt, len(testEventPublisher.Published))
})
}
}
}

func TestEventPublisher_PublishError(t *testing.T) {
initializeEventPublisher()
currentEventPublisher := NewEventsPublisher(mockEventPublisher, promutils.NewTestScope(), []string{"*"})
var publishError = errors.New("publish() returns an error")
testEventPublisher.GivenError = publishError
assert.Equal(t, publishError, currentEventPublisher.Publish(context.Background(),
proto.MessageName(taskRequest), taskRequest))
}
21 changes: 10 additions & 11 deletions pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type executionSystemMetrics struct {
SpecSizeBytes prometheus.Summary
ClosureSizeBytes prometheus.Summary
AcceptanceDelay prometheus.Summary
PublishEventError prometheus.Counter
}

type executionUserMetrics struct {
Expand All @@ -89,6 +90,7 @@ type ExecutionManager struct {
namedEntityManager interfaces.NamedEntityInterface
resourceManager interfaces.ResourceInterface
qualityOfServiceAllocator executions.QualityOfServiceAllocator
eventPublisher notificationInterfaces.Publisher
}

func getExecutionContext(ctx context.Context, id *core.WorkflowExecutionIdentifier) context.Context {
Expand Down Expand Up @@ -1004,6 +1006,10 @@ func (m *ExecutionManager) CreateWorkflowEvent(ctx context.Context, request admi
return nil, err
}
}
if err := m.eventPublisher.Publish(ctx, proto.MessageName(&request), &request); err != nil {
m.systemMetrics.PublishEventError.Inc()
logger.Infof(ctx, "error publishing event [%+v] with err: [%v]", request.RequestId, err)
}

m.systemMetrics.ExecutionEventsCreated.Inc()
return &admin.WorkflowExecutionEventResponse{}, nil
Expand Down Expand Up @@ -1326,20 +1332,12 @@ func newExecutionSystemMetrics(scope promutils.Scope) executionSystemMetrics {
ClosureSizeBytes: scope.MustNewSummary("closure_size_bytes", "size in bytes of serialized execution closure"),
AcceptanceDelay: scope.MustNewSummary("acceptance_delay",
"delay in seconds from when an execution was requested to be created and when it actually was"),
PublishEventError: scope.MustNewCounter("publish_event_error",
"overall count of publish event errors when invoking publish()"),
}
}

func NewExecutionManager(
db repositories.RepositoryInterface,
config runtimeInterfaces.Configuration,
storageClient *storage.DataStore,
workflowExecutor workflowengineInterfaces.Executor,
systemScope promutils.Scope,
userScope promutils.Scope,
publisher notificationInterfaces.Publisher,
urlData dataInterfaces.RemoteURLInterface,
workflowManager interfaces.WorkflowInterface,
namedEntityManager interfaces.NamedEntityInterface) interfaces.ExecutionInterface {
func NewExecutionManager(db repositories.RepositoryInterface, config runtimeInterfaces.Configuration, storageClient *storage.DataStore, workflowExecutor workflowengineInterfaces.Executor, systemScope promutils.Scope, userScope promutils.Scope, publisher notificationInterfaces.Publisher, urlData dataInterfaces.RemoteURLInterface, workflowManager interfaces.WorkflowInterface, namedEntityManager interfaces.NamedEntityInterface, eventPublisher notificationInterfaces.Publisher) interfaces.ExecutionInterface {
queueAllocator := executions.NewQueueAllocator(config, db)
systemMetrics := newExecutionSystemMetrics(systemScope)

Expand Down Expand Up @@ -1369,6 +1367,7 @@ func NewExecutionManager(
namedEntityManager: namedEntityManager,
resourceManager: resourceManager,
qualityOfServiceAllocator: executions.NewQualityOfServiceAllocator(config, resourceManager),
eventPublisher: eventPublisher,
}
}

Expand Down
Loading