This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 63
Publish raw events #151
Merged
Merged
Publish raw events #151
Changes from 7 commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
9306023
Add publisher for events
tnsetting 6fa0eab
sending events to pubsub
tnsetting 3e44470
Fix test and lint
tnsetting 33ed060
More fixes for testcases
tnsetting 7c2a395
Address comments and use new config section
tnsetting 3b170a1
Fix goimports
tnsetting 9224c6c
just for trigger the build
tnsetting 1f91d61
more comments fix
tnsetting e242acb
Add config example
tnsetting File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
98 changes: 98 additions & 0 deletions
98
pkg/async/notifications/implementations/event_publisher.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
package implementations | ||
|
||
import ( | ||
"context" | ||
"strings" | ||
|
||
"k8s.io/apimachinery/pkg/util/sets" | ||
|
||
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin" | ||
|
||
"github.com/lyft/flyteadmin/pkg/async/notifications/interfaces" | ||
|
||
"github.com/NYTimes/gizmo/pubsub" | ||
"github.com/golang/protobuf/proto" | ||
"github.com/lyft/flytestdlib/logger" | ||
"github.com/lyft/flytestdlib/promutils" | ||
"github.com/prometheus/client_golang/prometheus" | ||
) | ||
|
||
type eventPublisherSystemMetrics struct { | ||
Scope promutils.Scope | ||
PublishTotal prometheus.Counter | ||
PublishSuccess prometheus.Counter | ||
PublishError prometheus.Counter | ||
} | ||
|
||
// TODO: Add a counter that encompasses the publisher stats grouped by project and domain. | ||
type EventPublisher struct { | ||
pub pubsub.Publisher | ||
systemMetrics eventPublisherSystemMetrics | ||
events sets.String | ||
} | ||
|
||
var taskExecutionReq admin.TaskExecutionEventRequest | ||
var nodeExecutionReq admin.NodeExecutionEventRequest | ||
var workflowExecutionReq admin.WorkflowExecutionEventRequest | ||
|
||
var supportedEvents = map[string]string{ | ||
"task": proto.MessageName(&taskExecutionReq), | ||
"node": proto.MessageName(&nodeExecutionReq), | ||
"workflow": proto.MessageName(&workflowExecutionReq), | ||
} | ||
|
||
// The key is the notification type as defined as an enum. | ||
func (p *EventPublisher) Publish(ctx context.Context, notificationType string, msg proto.Message) error { | ||
p.systemMetrics.PublishTotal.Inc() | ||
logger.Debugf(ctx, "Publishing the following message [%+v]", msg) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: do you want to move this log statement below the check to shouldPublishEvent? |
||
|
||
if !p.shouldPublishEvent(notificationType) { | ||
return nil | ||
} | ||
|
||
err := p.pub.Publish(ctx, notificationType, msg) | ||
if err != nil { | ||
p.systemMetrics.PublishError.Inc() | ||
logger.Errorf(ctx, "Failed to publish a message with key [%s] and message [%s] and error: %v", notificationType, msg.String(), err) | ||
} else { | ||
p.systemMetrics.PublishSuccess.Inc() | ||
} | ||
return err | ||
} | ||
|
||
func (p *EventPublisher) shouldPublishEvent(notificationType string) bool { | ||
return p.events.Has(notificationType) | ||
} | ||
|
||
func newEventPublisherSystemMetrics(scope promutils.Scope) eventPublisherSystemMetrics { | ||
return eventPublisherSystemMetrics{ | ||
Scope: scope, | ||
PublishTotal: scope.MustNewCounter("event_publish_total", "overall count of publish messages"), | ||
PublishSuccess: scope.MustNewCounter("event_publish_success", "success count of publish messages"), | ||
PublishError: scope.MustNewCounter("event_publish_errors", "count of publish errors"), | ||
} | ||
} | ||
|
||
func NewEventsPublisher(pub pubsub.Publisher, scope promutils.Scope, eventTypes string) interfaces.Publisher { | ||
eventSet := sets.NewString() | ||
if strings.Contains(eventTypes, "*") || strings.Contains(eventTypes, "all") { | ||
for _, e := range supportedEvents { | ||
eventSet = eventSet.Insert(e) | ||
} | ||
} else { | ||
events := strings.Split(eventTypes, ",") | ||
for _, event := range events { | ||
if e, found := supportedEvents[event]; found { | ||
eventSet = eventSet.Insert(e) | ||
} else { | ||
logger.Errorf(context.Background(), "Unsupported event type [%s] in the config") | ||
} | ||
} | ||
katrogan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
return &EventPublisher{ | ||
pub: pub, | ||
systemMetrics: newEventPublisherSystemMetrics(scope.NewSubScope("events_publisher")), | ||
events: eventSet, | ||
} | ||
} |
173 changes: 173 additions & 0 deletions
173
pkg/async/notifications/implementations/event_publisher_test.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,173 @@ | ||
package implementations | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"testing" | ||
"time" | ||
|
||
"github.com/golang/protobuf/ptypes" | ||
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin" | ||
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" | ||
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/event" | ||
|
||
"github.com/NYTimes/gizmo/pubsub" | ||
"github.com/NYTimes/gizmo/pubsub/pubsubtest" | ||
"github.com/golang/protobuf/proto" | ||
"github.com/lyft/flytestdlib/promutils" | ||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
var testEventPublisher pubsubtest.TestPublisher | ||
var mockEventPublisher pubsub.Publisher = &testEventPublisher | ||
|
||
var executionID = core.WorkflowExecutionIdentifier{ | ||
Project: "project", | ||
Domain: "domain", | ||
Name: "name", | ||
} | ||
var nodeExecutionID = core.NodeExecutionIdentifier{ | ||
NodeId: "node id", | ||
ExecutionId: &executionID, | ||
} | ||
|
||
var taskID = &core.Identifier{ | ||
ResourceType: core.ResourceType_TASK, | ||
Project: "p", | ||
Domain: "d", | ||
Version: "v", | ||
Name: "n", | ||
} | ||
|
||
var occurredAt = time.Now().UTC() | ||
var occurredAtProto, _ = ptypes.TimestampProto(occurredAt) | ||
|
||
var taskPhase = core.TaskExecution_RUNNING | ||
|
||
var retryAttempt = uint32(1) | ||
|
||
const requestID = "request id" | ||
|
||
var taskRequest = &admin.TaskExecutionEventRequest{ | ||
RequestId: requestID, | ||
Event: &event.TaskExecutionEvent{ | ||
TaskId: taskID, | ||
ParentNodeExecutionId: &nodeExecutionID, | ||
RetryAttempt: retryAttempt, | ||
Phase: taskPhase, | ||
OccurredAt: occurredAtProto, | ||
}, | ||
} | ||
|
||
var nodeRequest = &admin.NodeExecutionEventRequest{ | ||
RequestId: requestID, | ||
Event: &event.NodeExecutionEvent{ | ||
ProducerId: "propeller", | ||
Id: &nodeExecutionID, | ||
OccurredAt: occurredAtProto, | ||
Phase: core.NodeExecution_RUNNING, | ||
InputUri: "input uri", | ||
}, | ||
} | ||
|
||
var workflowRequest = &admin.WorkflowExecutionEventRequest{ | ||
Event: &event.WorkflowExecutionEvent{ | ||
Phase: core.WorkflowExecution_SUCCEEDED, | ||
OutputResult: &event.WorkflowExecutionEvent_OutputUri{ | ||
OutputUri: "somestring", | ||
}, | ||
ExecutionId: &executionID, | ||
}, | ||
} | ||
|
||
// This method should be invoked before every test around Publisher. | ||
func initializeEventPublisher() { | ||
testEventPublisher.Published = nil | ||
testEventPublisher.GivenError = nil | ||
testEventPublisher.FoundError = nil | ||
} | ||
|
||
func TestNewEventsPublisher_EventTypes(t *testing.T) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you for this very exhaustive test 👍 |
||
{ | ||
tests := []struct { | ||
name string | ||
eventTypes string | ||
events []proto.Message | ||
shouldSendEvent []bool | ||
expectedSendCnt int | ||
}{ | ||
{"eventTypes as workflow,node", "workflow,node", | ||
[]proto.Message{workflowRequest, nodeRequest, taskRequest}, | ||
[]bool{true, true, false}, | ||
2}, | ||
{"eventTypes as workflow,task", "workflow,task", | ||
[]proto.Message{workflowRequest, nodeRequest, taskRequest}, | ||
[]bool{true, false, true}, | ||
2}, | ||
{"eventTypes as workflow,task", "node,task", | ||
[]proto.Message{workflowRequest, nodeRequest, taskRequest}, | ||
[]bool{false, true, true}, | ||
2}, | ||
{"eventTypes as task", "task", | ||
[]proto.Message{taskRequest}, | ||
[]bool{true}, | ||
1}, | ||
{"eventTypes as node", "node", | ||
[]proto.Message{nodeRequest}, | ||
[]bool{true}, | ||
1}, | ||
{"eventTypes as workflow", "workflow", | ||
[]proto.Message{workflowRequest}, | ||
[]bool{true}, | ||
1}, | ||
{"eventTypes as workflow", "workflow", | ||
[]proto.Message{nodeRequest, taskRequest}, | ||
[]bool{false, false}, | ||
0}, | ||
{"eventTypes as task", "task", | ||
[]proto.Message{workflowRequest, nodeRequest}, | ||
[]bool{false, false}, | ||
0}, | ||
{"eventTypes as node", "node", | ||
[]proto.Message{workflowRequest, taskRequest}, | ||
[]bool{false, false}, | ||
0}, | ||
{"eventTypes as all", "all", | ||
[]proto.Message{workflowRequest, nodeRequest, taskRequest}, | ||
[]bool{true, true, true}, | ||
3}, | ||
{"eventTypes as *", "*", | ||
[]proto.Message{workflowRequest, nodeRequest, taskRequest}, | ||
[]bool{true, true, true}, | ||
3}, | ||
} | ||
for _, test := range tests { | ||
t.Run(test.name, func(t *testing.T) { | ||
initializeEventPublisher() | ||
var currentEventPublisher = NewEventsPublisher(mockEventPublisher, promutils.NewTestScope(), test.eventTypes) | ||
var cnt = 0 | ||
for id, event := range test.events { | ||
assert.Nil(t, currentEventPublisher.Publish(context.Background(), proto.MessageName(event), | ||
event)) | ||
if test.shouldSendEvent[id] { | ||
assert.Equal(t, proto.MessageName(event), testEventPublisher.Published[cnt].Key) | ||
marshalledData, err := proto.Marshal(event) | ||
assert.Nil(t, err) | ||
assert.Equal(t, marshalledData, testEventPublisher.Published[cnt].Body) | ||
cnt++ | ||
} | ||
} | ||
assert.Equal(t, test.expectedSendCnt, len(testEventPublisher.Published)) | ||
}) | ||
} | ||
} | ||
} | ||
|
||
func TestEventPublisher_PublishError(t *testing.T) { | ||
initializeEventPublisher() | ||
currentEventPublisher := NewEventsPublisher(mockEventPublisher, promutils.NewTestScope(), "*") | ||
var publishError = errors.New("publish() returns an error") | ||
testEventPublisher.GivenError = publishError | ||
assert.Equal(t, publishError, currentEventPublisher.Publish(context.Background(), | ||
proto.MessageName(taskRequest), taskRequest)) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry should have brought this up in the last round, but can we use shared constants here as map keys? perhaps https://github.com/flyteorg/flyteadmin/blob/master/pkg/common/entity.go could work