Skip to content

Commit

Permalink
Write workflow and node execution events asynchronously (flyteorg#174)
Browse files Browse the repository at this point in the history
  • Loading branch information
Katrina Rogan authored Apr 9, 2021
1 parent af505fb commit 40d6d5c
Show file tree
Hide file tree
Showing 34 changed files with 746 additions and 347 deletions.
45 changes: 45 additions & 0 deletions pkg/async/events/implementations/node_execution_event_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package implementations

import (
"context"

"github.com/flyteorg/flyteadmin/pkg/async/events/interfaces"
"github.com/flyteorg/flyteadmin/pkg/repositories"
"github.com/flyteorg/flyteadmin/pkg/repositories/transformers"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flytestdlib/logger"
)

// This event writer acts to asynchronously persist node execution events. As flytepropeller sends node
// events, node execution processing doesn't have to wait on these to be committed.
type nodeExecutionEventWriter struct {
db repositories.RepositoryInterface
events chan admin.NodeExecutionEventRequest
}

func (w *nodeExecutionEventWriter) Write(event admin.NodeExecutionEventRequest) {
w.events <- event
}

func (w *nodeExecutionEventWriter) Run() {
for event := range w.events {
eventModel, err := transformers.CreateNodeExecutionEventModel(event)
if err != nil {
logger.Warnf(context.TODO(), "Failed to transform event [%+v] to database model with err [%+v]", event, err)
continue
}
err = w.db.NodeExecutionEventRepo().Create(context.TODO(), *eventModel)
if err != nil {
// It's okay to be lossy here. These events aren't used to fetch execution state but rather as a convenience
// to replay and understand the event execution timeline.
logger.Warnf(context.TODO(), "Failed to write event [%+v] to database with err [%+v]", event, err)
}
}
}

func NewNodeExecutionEventWriter(db repositories.RepositoryInterface, bufferSize int) interfaces.NodeExecutionEventWriter {
return &nodeExecutionEventWriter{
db: db,
events: make(chan admin.NodeExecutionEventRequest, bufferSize),
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package implementations

import (
"testing"

"github.com/flyteorg/flyteadmin/pkg/repositories/mocks"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
event2 "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event"
)

func TestNodeExecutionEventWriter(t *testing.T) {
db := mocks.NewMockRepository()

event := admin.NodeExecutionEventRequest{
RequestId: "request_id",
Event: &event2.NodeExecutionEvent{
Id: &core.NodeExecutionIdentifier{
NodeId: "node_id",
ExecutionId: &core.WorkflowExecutionIdentifier{
Project: "project",
Domain: "domain",
Name: "exec_name",
},
},
},
}

nodeExecEventRepo := mocks.NodeExecutionEventRepoInterface{}
nodeExecEventRepo.On("Create", event).Return(nil)
db.(*mocks.MockRepository).NodeExecutionEventRepoIface = &nodeExecEventRepo
writer := NewNodeExecutionEventWriter(db, 100)
// Assert we can write an event using the buffered channel without holding up this process.
writer.Write(event)
go func() { writer.Run() }()
close(writer.(*nodeExecutionEventWriter).events)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package implementations

import (
"context"

"github.com/flyteorg/flyteadmin/pkg/async/events/interfaces"
"github.com/flyteorg/flyteadmin/pkg/repositories"
"github.com/flyteorg/flyteadmin/pkg/repositories/transformers"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flytestdlib/logger"
)

// This event writer acts to asynchronously persist workflow execution events. As flytepropeller sends workflow
// events, workflow execution processing doesn't have to wait on these to be committed.
type workflowExecutionEventWriter struct {
db repositories.RepositoryInterface
events chan admin.WorkflowExecutionEventRequest
}

func (w *workflowExecutionEventWriter) Write(event admin.WorkflowExecutionEventRequest) {
w.events <- event
}

func (w *workflowExecutionEventWriter) Run() {
for event := range w.events {
eventModel, err := transformers.CreateExecutionEventModel(event)
if err != nil {
logger.Warnf(context.TODO(), "Failed to transform event [%+v] to database model with err [%+v]", event, err)
continue
}
err = w.db.ExecutionEventRepo().Create(context.TODO(), *eventModel)
if err != nil {
// It's okay to be lossy here. These events aren't used to fetch execution state but rather as a convenience
// to replay and understand the event execution timeline.
logger.Warnf(context.TODO(), "Failed to write event [%+v] to database with err [%+v]", event, err)
}
}
}

func NewWorkflowExecutionEventWriter(db repositories.RepositoryInterface, bufferSize int) interfaces.WorkflowExecutionEventWriter {
return &workflowExecutionEventWriter{
db: db,
events: make(chan admin.WorkflowExecutionEventRequest, bufferSize),
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package implementations

import (
"testing"

"github.com/flyteorg/flyteadmin/pkg/repositories/mocks"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
event2 "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event"
)

func TestWorkflowExecutionEventWriter(t *testing.T) {
db := mocks.NewMockRepository()

event := admin.WorkflowExecutionEventRequest{
RequestId: "request_id",
Event: &event2.WorkflowExecutionEvent{
ExecutionId: &core.WorkflowExecutionIdentifier{
Project: "project",
Domain: "domain",
Name: "exec_name",
},
},
}

workflowExecEventRepo := mocks.ExecutionEventRepoInterface{}
workflowExecEventRepo.On("Create", event).Return(nil)
db.(*mocks.MockRepository).ExecutionEventRepoIface = &workflowExecEventRepo
writer := NewWorkflowExecutionEventWriter(db, 100)
// Assert we can write an event using the buffered channel without holding up this process.
writer.Write(event)
go func() { writer.Run() }()
close(writer.(*workflowExecutionEventWriter).events)
}
12 changes: 12 additions & 0 deletions pkg/async/events/interfaces/node_execution.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package interfaces

import (
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
)

//go:generate mockery -name=NodeExecutionEventWriter -output=../mocks -case=underscore

type NodeExecutionEventWriter interface {
Run()
Write(nodeExecutionEvent admin.NodeExecutionEventRequest)
}
12 changes: 12 additions & 0 deletions pkg/async/events/interfaces/workflow_execution.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package interfaces

import (
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
)

//go:generate mockery -name=WorkflowExecutionEventWriter -output=../mocks -case=underscore

type WorkflowExecutionEventWriter interface {
Run()
Write(workflowExecutionEvent admin.WorkflowExecutionEventRequest)
}
24 changes: 24 additions & 0 deletions pkg/async/events/mocks/node_execution_event_writer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions pkg/async/events/mocks/workflow_execution_event_writer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 12 additions & 10 deletions pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/storage"

eventWriter "github.com/flyteorg/flyteadmin/pkg/async/events/interfaces"
"github.com/flyteorg/flyteadmin/pkg/async/notifications"
notificationInterfaces "github.com/flyteorg/flyteadmin/pkg/async/notifications/interfaces"
"github.com/flyteorg/flyteadmin/pkg/errors"
Expand Down Expand Up @@ -90,6 +91,7 @@ type ExecutionManager struct {
resourceManager interfaces.ResourceInterface
qualityOfServiceAllocator executions.QualityOfServiceAllocator
eventPublisher notificationInterfaces.Publisher
dbEventWriter eventWriter.WorkflowExecutionEventWriter
}

func getExecutionContext(ctx context.Context, id *core.WorkflowExecutionIdentifier) context.Context {
Expand Down Expand Up @@ -1001,18 +1003,13 @@ func (m *ExecutionManager) CreateWorkflowEvent(ctx context.Context, request admi
request.Event.ExecutionId, err)
return nil, err
}
executionEventModel, err := transformers.CreateExecutionEventModel(request)
if err != nil {
logger.Debugf(ctx, "failed to transform workflow execution event %s for [%+v] after receiving event with err: %v",
request.RequestId, request.Event.ExecutionId, err)
return nil, err
}
err = m.db.ExecutionRepo().Update(ctx, *executionEventModel, *executionModel)
err = m.db.ExecutionRepo().Update(ctx, *executionModel)
if err != nil {
logger.Debugf(ctx, "Failed to update execution with CreateWorkflowEvent [%+v] with err %v",
request, err)
return nil, err
}
m.dbEventWriter.Write(request)

if request.Event.Phase == core.WorkflowExecution_RUNNING {
// Workflow executions are created in state "UNDEFINED". All the time up until a RUNNING event is received is
Expand Down Expand Up @@ -1098,7 +1095,7 @@ func (m *ExecutionManager) GetExecutionData(
}
// Update model so as not to offload again.
executionModel.InputsURI = newInputsURI
if err := m.db.ExecutionRepo().UpdateExecution(ctx, *executionModel); err != nil {
if err := m.db.ExecutionRepo().Update(ctx, *executionModel); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -1296,7 +1293,7 @@ func (m *ExecutionManager) TerminateExecution(
logger.Debugf(ctx, "failed to add abort metadata for execution [%+v] with err: %v", request.Id, err)
return nil, err
}
err = m.db.ExecutionRepo().UpdateExecution(ctx, executionModel)
err = m.db.ExecutionRepo().Update(ctx, executionModel)
if err != nil {
logger.Debugf(ctx, "failed to save abort cause for terminated execution: %+v with err: %v", request.Id, err)
return nil, err
Expand Down Expand Up @@ -1332,7 +1329,11 @@ func newExecutionSystemMetrics(scope promutils.Scope) executionSystemMetrics {
}
}

func NewExecutionManager(db repositories.RepositoryInterface, config runtimeInterfaces.Configuration, storageClient *storage.DataStore, workflowExecutor workflowengineInterfaces.Executor, systemScope promutils.Scope, userScope promutils.Scope, publisher notificationInterfaces.Publisher, urlData dataInterfaces.RemoteURLInterface, workflowManager interfaces.WorkflowInterface, namedEntityManager interfaces.NamedEntityInterface, eventPublisher notificationInterfaces.Publisher) interfaces.ExecutionInterface {
func NewExecutionManager(db repositories.RepositoryInterface, config runtimeInterfaces.Configuration,
storageClient *storage.DataStore, workflowExecutor workflowengineInterfaces.Executor, systemScope promutils.Scope,
userScope promutils.Scope, publisher notificationInterfaces.Publisher, urlData dataInterfaces.RemoteURLInterface,
workflowManager interfaces.WorkflowInterface, namedEntityManager interfaces.NamedEntityInterface,
eventPublisher notificationInterfaces.Publisher, eventWriter eventWriter.WorkflowExecutionEventWriter) interfaces.ExecutionInterface {
queueAllocator := executions.NewQueueAllocator(config, db)
systemMetrics := newExecutionSystemMetrics(systemScope)

Expand Down Expand Up @@ -1363,6 +1364,7 @@ func NewExecutionManager(db repositories.RepositoryInterface, config runtimeInte
resourceManager: resourceManager,
qualityOfServiceAllocator: executions.NewQualityOfServiceAllocator(config, resourceManager),
eventPublisher: eventPublisher,
dbEventWriter: eventWriter,
}
}

Expand Down
Loading

0 comments on commit 40d6d5c

Please sign in to comment.