-
Notifications
You must be signed in to change notification settings - Fork 63
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good! just a few suggestions
pkg/async/notifications/factory.go
Outdated
@@ -148,3 +148,57 @@ func NewNotificationsPublisher(config runtimeInterfaces.NotificationsConfig, sco | |||
return implementations.NewNoopPublish() | |||
} | |||
} | |||
|
|||
func NewEventsPublisher(config runtimeInterfaces.NotificationsConfig, scope promutils.Scope) interfaces.Publisher { | |||
if config.ExternalEvent.EventPublisherConfig.TopicName == "" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we explicitly just use the Type
of the Notificationsconfig. and if it's not no-op, error when Topic Name is empty?
events []string | ||
} | ||
|
||
func getSupportedEvents() map[string]string { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: instead of using a func you can actually initialize the map in golang as a var. see https://stackoverflow.com/questions/41078272/initializing-a-go-map-in-a-single-statement for an example
func NewEventsPublisher(pub pubsub.Publisher, scope promutils.Scope, eventTypes string) interfaces.Publisher { | ||
supportedEvents := getSupportedEvents() | ||
|
||
var eventList = make([]string, 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you want to use a String set here instead since you only ever check for membership/
// The key is the notification type as defined as an enum. | ||
func (p *EventPublisher) Publish(ctx context.Context, notificationType string, msg proto.Message) error { | ||
p.systemMetrics.PublishTotal.Inc() | ||
logger.Debugf(ctx, "Publishing the following message [%s]", msg.String()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does String() return a human readable message? if not can we use ...[%+v]", msg)
instead?
func newEventPublisherSystemMetrics(scope promutils.Scope) eventPublisherSystemMetrics { | ||
return eventPublisherSystemMetrics{ | ||
Scope: scope, | ||
PublishTotal: scope.MustNewCounter("event_publish_total", "overall count of publish messages"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i know it's all additive, but can we add a success counter too?
// The topic which events should be published, e.g. node, task, workflow | ||
TopicName string `json:"topicName"` | ||
// Event types: task, node, workflow executions | ||
EventTypes string `json:"eventTypes"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not an array type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because I want to support the option to use all
or *
and using a single string makes it easier to filter on such cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but you can still do that with a single entry, no? I'd prefer to use conventional yaml list formatting vs our own string-split scheme
Yes let's please do that! I think we can worry about refactoring the existing email notifications implementation separately from the egress event work you're doing. Eventually notifications can be handled only by a separately configured subscriber so it make sense to separate out the events config now! |
d10d5a0
to
3b170a1
Compare
hey @tnsetting is this ready for re-review? |
Yes, it is ready. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry just a few more comments
// The topic which events should be published, e.g. node, task, workflow | ||
TopicName string `json:"topicName"` | ||
// Event types: task, node, workflow executions | ||
EventTypes string `json:"eventTypes"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but you can still do that with a single entry, no? I'd prefer to use conventional yaml list formatting vs our own string-split scheme
var workflowExecutionReq admin.WorkflowExecutionEventRequest | ||
|
||
var supportedEvents = map[string]string{ | ||
"task": proto.MessageName(&taskExecutionReq), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry should have brought this up in the last round, but can we use shared constants here as map keys? perhaps https://github.com/flyteorg/flyteadmin/blob/master/pkg/common/entity.go could work
// The key is the notification type as defined as an enum. | ||
func (p *EventPublisher) Publish(ctx context.Context, notificationType string, msg proto.Message) error { | ||
p.systemMetrics.PublishTotal.Inc() | ||
logger.Debugf(ctx, "Publishing the following message [%+v]", msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: do you want to move this log statement below the check to shouldPublishEvent?
testEventPublisher.FoundError = nil | ||
} | ||
|
||
func TestNewEventsPublisher_EventTypes(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for this very exhaustive test 👍
@@ -4,6 +4,9 @@ import ( | |||
"context" | |||
"strconv" | |||
|
|||
"github.com/gogo/protobuf/proto" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use github.com/golang/protobuf/proto
instead? gogo behaves weirdly with oneofs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Never paid attention for that as intellij did that for me.
var workflowExecutionReq admin.WorkflowExecutionEventRequest | ||
|
||
const ( | ||
Task = "task" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still concerned this is a little brittle because here we define the strings statically but in the manager methods we use the proto message name as the event type. does that correspond to these raw strings like "task" or "node"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you mind updating https://github.com/flyteorg/flyteadmin/blob/master/flyteadmin_config.yaml with an example of the config changes too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
THANK YOU so much for this change
Signed-off-by: Haytham Abuelfutuh <[email protected]>
* Move scopes to config Signed-off-by: Haytham Abuelfutuh <[email protected]> * missed Signed-off-by: Haytham Abuelfutuh <[email protected]> * wip Signed-off-by: Haytham Abuelfutuh <[email protected]> * Update handlers.go Signed-off-by: Haytham Abuelfutuh <[email protected]> * go get propeller at v0.5.12 (#146) Signed-off-by: Haytham Abuelfutuh <[email protected]> * Update to using latest flytepropeller v0.5.13 (#148) Signed-off-by: Haytham Abuelfutuh <[email protected]> * Update propeller to 0.5.14 (#149) * Update propeller * cleanup Signed-off-by: Haytham Abuelfutuh <[email protected]> * Filter executions by user (#150) Signed-off-by: Haytham Abuelfutuh <[email protected]> * Update CI post migration Signed-off-by: Haytham Abuelfutuh <[email protected]> * Update codecov link Signed-off-by: Haytham Abuelfutuh <[email protected]> * Publish raw events (#151) Signed-off-by: Haytham Abuelfutuh <[email protected]> * fix test Signed-off-by: Haytham Abuelfutuh <[email protected]> * wip Signed-off-by: Haytham Abuelfutuh <[email protected]> * Fix token retrieval from cookies Signed-off-by: Haytham Abuelfutuh <[email protected]> * Fix unit tests and lint Signed-off-by: Haytham Abuelfutuh <[email protected]> * Move to const Signed-off-by: Haytham Abuelfutuh <[email protected]> * Revert Auth config Signed-off-by: Haytham Abuelfutuh <[email protected]> * Revert kube config path Signed-off-by: Haytham Abuelfutuh <[email protected]> * Use access token when posting to IdP Signed-off-by: Haytham Abuelfutuh <[email protected]> * Ignore refresh token error Signed-off-by: Haytham Abuelfutuh <[email protected]> * Fix tests Signed-off-by: Haytham Abuelfutuh <[email protected]> * Expose openId metadata endpoint and expose scopes in /config endpoint Signed-off-by: Haytham Abuelfutuh <[email protected]> * mod tidy Signed-off-by: Haytham Abuelfutuh <[email protected]> * rename Signed-off-by: Haytham Abuelfutuh <[email protected]> * lint Signed-off-by: Haytham Abuelfutuh <[email protected]> * unit tests Signed-off-by: Haytham Abuelfutuh <[email protected]> Co-authored-by: Yee Hing Tong <[email protected]> Co-authored-by: Katrina Rogan <[email protected]> Co-authored-by: tnsetting <[email protected]>
* Move scopes to config Signed-off-by: Haytham Abuelfutuh <[email protected]> * missed Signed-off-by: Haytham Abuelfutuh <[email protected]> * wip Signed-off-by: Haytham Abuelfutuh <[email protected]> * Update handlers.go Signed-off-by: Haytham Abuelfutuh <[email protected]> * go get propeller at v0.5.12 (#146) Signed-off-by: Haytham Abuelfutuh <[email protected]> * Update to using latest flytepropeller v0.5.13 (#148) Signed-off-by: Haytham Abuelfutuh <[email protected]> * Update propeller to 0.5.14 (#149) * Update propeller * cleanup Signed-off-by: Haytham Abuelfutuh <[email protected]> * Filter executions by user (#150) Signed-off-by: Haytham Abuelfutuh <[email protected]> * Update CI post migration Signed-off-by: Haytham Abuelfutuh <[email protected]> * Update codecov link Signed-off-by: Haytham Abuelfutuh <[email protected]> * Publish raw events (#151) Signed-off-by: Haytham Abuelfutuh <[email protected]> * fix test Signed-off-by: Haytham Abuelfutuh <[email protected]> * wip Signed-off-by: Haytham Abuelfutuh <[email protected]> * Fix token retrieval from cookies Signed-off-by: Haytham Abuelfutuh <[email protected]> * Fix unit tests and lint Signed-off-by: Haytham Abuelfutuh <[email protected]> * Move to const Signed-off-by: Haytham Abuelfutuh <[email protected]> * Revert Auth config Signed-off-by: Haytham Abuelfutuh <[email protected]> * Revert kube config path Signed-off-by: Haytham Abuelfutuh <[email protected]> * Use access token when posting to IdP Signed-off-by: Haytham Abuelfutuh <[email protected]> * Ignore refresh token error Signed-off-by: Haytham Abuelfutuh <[email protected]> * Fix tests Signed-off-by: Haytham Abuelfutuh <[email protected]> * Expose openId metadata endpoint and expose scopes in /config endpoint Signed-off-by: Haytham Abuelfutuh <[email protected]> * mod tidy Signed-off-by: Haytham Abuelfutuh <[email protected]> * rename Signed-off-by: Haytham Abuelfutuh <[email protected]> * lint Signed-off-by: Haytham Abuelfutuh <[email protected]> * unit tests Signed-off-by: Haytham Abuelfutuh <[email protected]> Co-authored-by: Yee Hing Tong <[email protected]> Co-authored-by: Katrina Rogan <[email protected]> Co-authored-by: tnsetting <[email protected]>
TL;DR
Send raw flyte events to a pubsub topic. This is the first step for flyte lineage events. These events will be consumed and enriched to external by another thread or process. If the
externalEvent
part is not configured, a noop publisher will be created.Example of config (the notifications bit is orthogonal):
Type
Are all requirements met?
Complete description
How did you fix the bug, make the feature etc. Link to any design docs etc
Tracking Issue
https://github.com/lyft/flyte/issues/
Follow-up issue
NA
OR
https://github.com/lyft/flyte/issues/