Skip to content

Commit

Permalink
Publish raw events (flyteorg#151)
Browse files Browse the repository at this point in the history
  • Loading branch information
tnsetting authored Feb 9, 2021
1 parent 955dbef commit 659635f
Show file tree
Hide file tree
Showing 14 changed files with 530 additions and 262 deletions.
8 changes: 8 additions & 0 deletions flyteadmin_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ notifications:
Execution \"{{ name }}\" has {{ phase }} in \"{{ domain }}\". View details at
<a href=\http://example.com/projects/{{ project }}/domains/{{ domain }}/executions/{{ name }}>
http://example.com/projects/{{ project }}/domains/{{ domain }}/executions/{{ name }}</a>. {{ error }}
externalEvents:
Enable: false
type: gcp
gcp:
projectId: "foo"
eventsPublisher:
topicName: "bar"
eventTypes: all
Logger:
show-source: true
level: 6
Expand Down
50 changes: 50 additions & 0 deletions pkg/async/notifications/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,3 +148,53 @@ func NewNotificationsPublisher(config runtimeInterfaces.NotificationsConfig, sco
return implementations.NewNoopPublish()
}
}

func NewEventsPublisher(config runtimeInterfaces.ExternalEventsConfig, scope promutils.Scope) interfaces.Publisher {
if !config.Enable {
return implementations.NewNoopPublish()
}
reconnectAttempts := config.ReconnectAttempts
reconnectDelay := time.Duration(config.ReconnectDelaySeconds) * time.Second
switch config.Type {
case common.AWS:
snsConfig := gizmoAWS.SNSConfig{
Topic: config.EventsPublisherConfig.TopicName,
}
snsConfig.Region = config.AWSConfig.Region

var publisher pubsub.Publisher
var err error
err = async.Retry(reconnectAttempts, reconnectDelay, func() error {
publisher, err = gizmoAWS.NewPublisher(snsConfig)
return err
})

// Any persistent errors initiating Publisher with Amazon configurations results in a failed start up.
if err != nil {
panic(err)
}
return implementations.NewEventsPublisher(publisher, scope, config.EventsPublisherConfig.EventTypes)
case common.GCP:
pubsubConfig := gizmoGCP.Config{
Topic: config.EventsPublisherConfig.TopicName,
}
pubsubConfig.ProjectID = config.GCPConfig.ProjectID
var publisher pubsub.MultiPublisher
var err error
err = async.Retry(reconnectAttempts, reconnectDelay, func() error {
publisher, err = gizmoGCP.NewPublisher(context.TODO(), pubsubConfig)
return err
})

if err != nil {
panic(err)
}
return implementations.NewEventsPublisher(publisher, scope, config.EventsPublisherConfig.EventTypes)
case common.Local:
fallthrough
default:
logger.Infof(context.Background(),
"Using default noop events publisher implementation for config type [%s]", config.Type)
return implementations.NewNoopPublish()
}
}
105 changes: 105 additions & 0 deletions pkg/async/notifications/implementations/event_publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package implementations

import (
"context"

"k8s.io/apimachinery/pkg/util/sets"

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"

"github.com/lyft/flyteadmin/pkg/async/notifications/interfaces"

"github.com/NYTimes/gizmo/pubsub"
"github.com/golang/protobuf/proto"
"github.com/lyft/flytestdlib/logger"
"github.com/lyft/flytestdlib/promutils"
"github.com/prometheus/client_golang/prometheus"
)

type eventPublisherSystemMetrics struct {
Scope promutils.Scope
PublishTotal prometheus.Counter
PublishSuccess prometheus.Counter
PublishError prometheus.Counter
}

// TODO: Add a counter that encompasses the publisher stats grouped by project and domain.
type EventPublisher struct {
pub pubsub.Publisher
systemMetrics eventPublisherSystemMetrics
events sets.String
}

var taskExecutionReq admin.TaskExecutionEventRequest
var nodeExecutionReq admin.NodeExecutionEventRequest
var workflowExecutionReq admin.WorkflowExecutionEventRequest

const (
Task = "task"
Node = "node"
Workflow = "workflow"
AllTypes = "all"
AllTypesShort = "*"
)

var supportedEvents = map[string]string{
Task: proto.MessageName(&taskExecutionReq),
Node: proto.MessageName(&nodeExecutionReq),
Workflow: proto.MessageName(&workflowExecutionReq),
}

// The key is the notification type as defined as an enum.
func (p *EventPublisher) Publish(ctx context.Context, notificationType string, msg proto.Message) error {
p.systemMetrics.PublishTotal.Inc()

if !p.shouldPublishEvent(notificationType) {
return nil
}
logger.Debugf(ctx, "Publishing the following message [%+v]", msg)

err := p.pub.Publish(ctx, notificationType, msg)
if err != nil {
p.systemMetrics.PublishError.Inc()
logger.Errorf(ctx, "Failed to publish a message with key [%s] and message [%s] and error: %v", notificationType, msg.String(), err)
} else {
p.systemMetrics.PublishSuccess.Inc()
}
return err
}

func (p *EventPublisher) shouldPublishEvent(notificationType string) bool {
return p.events.Has(notificationType)
}

func newEventPublisherSystemMetrics(scope promutils.Scope) eventPublisherSystemMetrics {
return eventPublisherSystemMetrics{
Scope: scope,
PublishTotal: scope.MustNewCounter("event_publish_total", "overall count of publish messages"),
PublishSuccess: scope.MustNewCounter("event_publish_success", "success count of publish messages"),
PublishError: scope.MustNewCounter("event_publish_errors", "count of publish errors"),
}
}

func NewEventsPublisher(pub pubsub.Publisher, scope promutils.Scope, eventTypes []string) interfaces.Publisher {
eventSet := sets.NewString()

for _, event := range eventTypes {
if event == AllTypes || event == AllTypesShort {
for _, e := range supportedEvents {
eventSet = eventSet.Insert(e)
}
break
}
if e, found := supportedEvents[event]; found {
eventSet = eventSet.Insert(e)
} else {
logger.Errorf(context.Background(), "Unsupported event type [%s] in the config")
}
}

return &EventPublisher{
pub: pub,
systemMetrics: newEventPublisherSystemMetrics(scope.NewSubScope("events_publisher")),
events: eventSet,
}
}
173 changes: 173 additions & 0 deletions pkg/async/notifications/implementations/event_publisher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package implementations

import (
"context"
"errors"
"testing"
"time"

"github.com/golang/protobuf/ptypes"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/event"

"github.com/NYTimes/gizmo/pubsub"
"github.com/NYTimes/gizmo/pubsub/pubsubtest"
"github.com/golang/protobuf/proto"
"github.com/lyft/flytestdlib/promutils"
"github.com/stretchr/testify/assert"
)

var testEventPublisher pubsubtest.TestPublisher
var mockEventPublisher pubsub.Publisher = &testEventPublisher

var executionID = core.WorkflowExecutionIdentifier{
Project: "project",
Domain: "domain",
Name: "name",
}
var nodeExecutionID = core.NodeExecutionIdentifier{
NodeId: "node id",
ExecutionId: &executionID,
}

var taskID = &core.Identifier{
ResourceType: core.ResourceType_TASK,
Project: "p",
Domain: "d",
Version: "v",
Name: "n",
}

var occurredAt = time.Now().UTC()
var occurredAtProto, _ = ptypes.TimestampProto(occurredAt)

var taskPhase = core.TaskExecution_RUNNING

var retryAttempt = uint32(1)

const requestID = "request id"

var taskRequest = &admin.TaskExecutionEventRequest{
RequestId: requestID,
Event: &event.TaskExecutionEvent{
TaskId: taskID,
ParentNodeExecutionId: &nodeExecutionID,
RetryAttempt: retryAttempt,
Phase: taskPhase,
OccurredAt: occurredAtProto,
},
}

var nodeRequest = &admin.NodeExecutionEventRequest{
RequestId: requestID,
Event: &event.NodeExecutionEvent{
ProducerId: "propeller",
Id: &nodeExecutionID,
OccurredAt: occurredAtProto,
Phase: core.NodeExecution_RUNNING,
InputUri: "input uri",
},
}

var workflowRequest = &admin.WorkflowExecutionEventRequest{
Event: &event.WorkflowExecutionEvent{
Phase: core.WorkflowExecution_SUCCEEDED,
OutputResult: &event.WorkflowExecutionEvent_OutputUri{
OutputUri: "somestring",
},
ExecutionId: &executionID,
},
}

// This method should be invoked before every test around Publisher.
func initializeEventPublisher() {
testEventPublisher.Published = nil
testEventPublisher.GivenError = nil
testEventPublisher.FoundError = nil
}

func TestNewEventsPublisher_EventTypes(t *testing.T) {
{
tests := []struct {
name string
eventTypes []string
events []proto.Message
shouldSendEvent []bool
expectedSendCnt int
}{
{"eventTypes as workflow,node", []string{"workflow", "node"},
[]proto.Message{workflowRequest, nodeRequest, taskRequest},
[]bool{true, true, false},
2},
{"eventTypes as workflow,task", []string{"workflow", "task"},
[]proto.Message{workflowRequest, nodeRequest, taskRequest},
[]bool{true, false, true},
2},
{"eventTypes as workflow,task", []string{"node", "task"},
[]proto.Message{workflowRequest, nodeRequest, taskRequest},
[]bool{false, true, true},
2},
{"eventTypes as task", []string{"task"},
[]proto.Message{taskRequest},
[]bool{true},
1},
{"eventTypes as node", []string{"node"},
[]proto.Message{nodeRequest},
[]bool{true},
1},
{"eventTypes as workflow", []string{"workflow"},
[]proto.Message{workflowRequest},
[]bool{true},
1},
{"eventTypes as workflow", []string{"workflow"},
[]proto.Message{nodeRequest, taskRequest},
[]bool{false, false},
0},
{"eventTypes as task", []string{"task"},
[]proto.Message{workflowRequest, nodeRequest},
[]bool{false, false},
0},
{"eventTypes as node", []string{"node"},
[]proto.Message{workflowRequest, taskRequest},
[]bool{false, false},
0},
{"eventTypes as all", []string{"all"},
[]proto.Message{workflowRequest, nodeRequest, taskRequest},
[]bool{true, true, true},
3},
{"eventTypes as *", []string{"*"},
[]proto.Message{workflowRequest, nodeRequest, taskRequest},
[]bool{true, true, true},
3},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
initializeEventPublisher()
var currentEventPublisher = NewEventsPublisher(mockEventPublisher, promutils.NewTestScope(), test.eventTypes)
var cnt = 0
for id, event := range test.events {
assert.Nil(t, currentEventPublisher.Publish(context.Background(), proto.MessageName(event),
event))
if test.shouldSendEvent[id] {
assert.Equal(t, proto.MessageName(event), testEventPublisher.Published[cnt].Key)
marshalledData, err := proto.Marshal(event)
assert.Nil(t, err)
assert.Equal(t, marshalledData, testEventPublisher.Published[cnt].Body)
cnt++
}
}
assert.Equal(t, test.expectedSendCnt, len(testEventPublisher.Published))
})
}
}
}

func TestEventPublisher_PublishError(t *testing.T) {
initializeEventPublisher()
currentEventPublisher := NewEventsPublisher(mockEventPublisher, promutils.NewTestScope(), []string{"*"})
var publishError = errors.New("publish() returns an error")
testEventPublisher.GivenError = publishError
assert.Equal(t, publishError, currentEventPublisher.Publish(context.Background(),
proto.MessageName(taskRequest), taskRequest))
}
Loading

0 comments on commit 659635f

Please sign in to comment.