Skip to content

Commit

Permalink
Store dynamic workflows (flyteorg#182)
Browse files Browse the repository at this point in the history
  • Loading branch information
Katrina Rogan authored Apr 23, 2021
1 parent a3b74e3 commit 8ec2d55
Show file tree
Hide file tree
Showing 14 changed files with 392 additions and 68 deletions.
4 changes: 2 additions & 2 deletions flyteadmin/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b // indirect
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/evanphx/json-patch v4.9.0+incompatible
github.com/flyteorg/flyteidl v0.18.32
github.com/flyteorg/flyteidl v0.18.38
github.com/flyteorg/flytepropeller v0.7.8
github.com/flyteorg/flytestdlib v0.3.15
github.com/gofrs/uuid v4.0.0+incompatible // indirect
Expand All @@ -40,7 +40,7 @@ require (
github.com/pquerna/cachecontrol v0.0.0-20201205024021-ac21108117ac // indirect
github.com/prometheus/client_golang v1.9.0
github.com/prometheus/common v0.19.0 // indirect
github.com/sirupsen/logrus v1.8.1
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/spf13/cobra v1.1.3
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.7.0
Expand Down
6 changes: 2 additions & 4 deletions flyteadmin/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -258,10 +258,8 @@ github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSw
github.com/flyteorg/flyteidl v0.18.17/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI=
github.com/flyteorg/flyteidl v0.18.24 h1:Y4+y/tu6Qsb3jNXxuVsflycfSocfthUi6XsMgJTfGuc=
github.com/flyteorg/flyteidl v0.18.24/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI=
github.com/flyteorg/flyteidl v0.18.26 h1:hcx/bR5EJSRfAmUmhjiEGHM22eC7APeXGT4MljR78EY=
github.com/flyteorg/flyteidl v0.18.26/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI=
github.com/flyteorg/flyteidl v0.18.32 h1:Z+DeBh4i+mZK75lfJwmsHPf23nbsp2Qiv+kCnGMY9Ds=
github.com/flyteorg/flyteidl v0.18.32/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI=
github.com/flyteorg/flyteidl v0.18.38 h1:XgAw9d2Q/UjWQyXbnZz/j4N6OVGDxr7jceden6PdCgY=
github.com/flyteorg/flyteidl v0.18.38/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI=
github.com/flyteorg/flyteplugins v0.5.38/go.mod h1:CxerBGWWEmNYmPxSMHnwQEr9cc1Fbo/g5fcABazU6Jo=
github.com/flyteorg/flytepropeller v0.7.8 h1:O441kDHJUayS/2rebTj7VG4e1LowrweazQhzTaZ97m4=
github.com/flyteorg/flytepropeller v0.7.8/go.mod h1:2SPJtYS0oM5lN4OCqBDbSRozRWvobFTXXlAC2ygbbWk=
Expand Down
6 changes: 5 additions & 1 deletion flyteadmin/pkg/common/mocks/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ type TestDataStore struct {
ReadProtobufCb func(ctx context.Context, reference storage.DataReference, msg proto.Message) error
WriteProtobufCb func(
ctx context.Context, reference storage.DataReference, opts storage.Options, msg proto.Message) error

ConstructReferenceCb func(
ctx context.Context, reference storage.DataReference, nestedKeys ...string) (storage.DataReference, error)
Store map[storage.DataReference][]byte
}

Expand Down Expand Up @@ -60,6 +61,9 @@ func (t *TestDataStore) CopyRaw(ctx context.Context, source, destination storage

func (t *TestDataStore) ConstructReference(
ctx context.Context, reference storage.DataReference, nestedKeys ...string) (storage.DataReference, error) {
if t.ConstructReferenceCb != nil {
return t.ConstructReferenceCb(ctx, reference, nestedKeys...)
}
nestedPath := strings.Join(nestedKeys, "/")
return storage.DataReference(fmt.Sprintf("%s/%v", reference, nestedPath)), nil
}
Expand Down
83 changes: 71 additions & 12 deletions flyteadmin/pkg/manager/impl/node_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type nodeExecutionMetrics struct {
type NodeExecutionManager struct {
db repositories.RepositoryInterface
config runtimeInterfaces.Configuration
storagePrefix []string
storageClient *storage.DataStore
metrics nodeExecutionMetrics
urlData dataInterfaces.RemoteURLInterface
Expand Down Expand Up @@ -81,7 +82,7 @@ func getNodeExecutionContext(ctx context.Context, identifier *core.NodeExecution
}

func (m *NodeExecutionManager) createNodeExecutionWithEvent(
ctx context.Context, request *admin.NodeExecutionEventRequest) error {
ctx context.Context, request *admin.NodeExecutionEventRequest, dynamicWorkflowRemoteClosureReference string) error {

executionID := request.Event.Id.ExecutionId
workflowExecutionExists, err := m.db.ExecutionRepo().Exists(ctx, repoInterfaces.Identifier{
Expand Down Expand Up @@ -123,9 +124,10 @@ func (m *NodeExecutionManager) createNodeExecutionWithEvent(
parentID = &parentNodeExecutionModel.ID
}
nodeExecutionModel, err := transformers.CreateNodeExecutionModel(transformers.ToNodeExecutionModelInput{
Request: request,
ParentTaskExecutionID: parentTaskExecutionID,
ParentID: parentID,
Request: request,
ParentTaskExecutionID: parentTaskExecutionID,
ParentID: parentID,
DynamicWorkflowRemoteClosure: dynamicWorkflowRemoteClosureReference,
})
if err != nil {
logger.Debugf(ctx, "failed to create node execution model for event request: %s with err: %v",
Expand All @@ -142,7 +144,8 @@ func (m *NodeExecutionManager) createNodeExecutionWithEvent(
}

func (m *NodeExecutionManager) updateNodeExecutionWithEvent(
ctx context.Context, request *admin.NodeExecutionEventRequest, nodeExecutionModel *models.NodeExecution) (updateNodeExecutionStatus, error) {
ctx context.Context, request *admin.NodeExecutionEventRequest, nodeExecutionModel *models.NodeExecution,
dynamicWorkflowRemoteClosureReference string) (updateNodeExecutionStatus, error) {
// If we have an existing execution, check if the phase change is valid
nodeExecPhase := core.NodeExecution_Phase(core.NodeExecution_Phase_value[nodeExecutionModel.Phase])
if nodeExecPhase == request.Event.Phase {
Expand Down Expand Up @@ -172,7 +175,7 @@ func (m *NodeExecutionManager) updateNodeExecutionWithEvent(
return updateFailed, err
}
}
err := transformers.UpdateNodeExecutionModel(request, nodeExecutionModel, childExecutionID)
err := transformers.UpdateNodeExecutionModel(request, nodeExecutionModel, childExecutionID, dynamicWorkflowRemoteClosureReference)
if err != nil {
logger.Debugf(ctx, "failed to update node execution model: %+v with err: %v", request.Event.Id, err)
return updateFailed, err
Expand All @@ -187,15 +190,56 @@ func (m *NodeExecutionManager) updateNodeExecutionWithEvent(
return updateSucceeded, nil
}

func formatDynamicWorkflowID(identifier *core.Identifier) string {
return fmt.Sprintf("%s_%s_%s_%s", identifier.Project, identifier.Domain, identifier.Name, identifier.Version)
}

func (m *NodeExecutionManager) uploadDynamicWorkflowClosure(
ctx context.Context, nodeID *core.NodeExecutionIdentifier, workflowID *core.Identifier,
compiledWorkflowClosure *core.CompiledWorkflowClosure) (storage.DataReference, error) {
nestedSubKeys := []string{
nodeID.ExecutionId.Project,
nodeID.ExecutionId.Domain,
nodeID.ExecutionId.Name,
nodeID.NodeId,
formatDynamicWorkflowID(workflowID),
}
nestedKeys := append(m.storagePrefix, nestedSubKeys...)
remoteClosureDataRef, err := m.storageClient.ConstructReference(ctx, m.storageClient.GetBaseContainerFQN(ctx), nestedKeys...)

if err != nil {
return "", errors.NewFlyteAdminErrorf(codes.Internal,
"Failed to produce remote closure data reference for dynamic workflow yielded by node id [%+v] with workflow id [%+v]; err: %v", nodeID, workflowID, err)
}

err = m.storageClient.WriteProtobuf(ctx, remoteClosureDataRef, defaultStorageOptions, compiledWorkflowClosure)
if err != nil {
return "", errors.NewFlyteAdminErrorf(codes.Internal,
"Failed to upload dynamic workflow closure for node id [%+v] and workflow id [%+v] with err: %v", nodeID, workflowID, err)
}
return remoteClosureDataRef, nil
}

func (m *NodeExecutionManager) CreateNodeEvent(ctx context.Context, request admin.NodeExecutionEventRequest) (
*admin.NodeExecutionEventResponse, error) {
if err := validation.ValidateNodeExecutionIdentifier(request.Event.Id); err != nil {
if err := validation.ValidateNodeExecutionEventRequest(&request); err != nil {
logger.Debugf(ctx, "CreateNodeEvent called with invalid identifier [%+v]: %v", request.Event.Id, err)
}
ctx = getNodeExecutionContext(ctx, request.Event.Id)
logger.Debugf(ctx, "Received node execution event for Node Exec Id [%+v] transitioning to phase [%v], w/ Metadata [%v]",
request.Event.Id, request.Event.Phase, request.Event.ParentTaskMetadata)

var dynamicWorkflowRemoteClosureReference string
if request.Event.GetTaskNodeMetadata() != nil && request.Event.GetTaskNodeMetadata().DynamicWorkflow != nil {
dynamicWorkflowRemoteClosureDataReference, err := m.uploadDynamicWorkflowClosure(
ctx, request.Event.Id, request.Event.GetTaskNodeMetadata().DynamicWorkflow.Id,
request.Event.GetTaskNodeMetadata().DynamicWorkflow.CompiledWorkflow)
if err != nil {
return nil, err
}
dynamicWorkflowRemoteClosureReference = dynamicWorkflowRemoteClosureDataReference.String()
}

nodeExecutionModel, err := m.db.NodeExecutionRepo().Get(ctx, repoInterfaces.NodeExecutionResource{
NodeExecutionIdentifier: *request.Event.Id,
})
Expand All @@ -205,14 +249,14 @@ func (m *NodeExecutionManager) CreateNodeEvent(ctx context.Context, request admi
request.Event.Id, err)
return nil, err
}
err = m.createNodeExecutionWithEvent(ctx, &request)
err = m.createNodeExecutionWithEvent(ctx, &request, dynamicWorkflowRemoteClosureReference)
if err != nil {
return nil, err
}
m.metrics.NodeExecutionsCreated.Inc()
} else {
phase := core.NodeExecution_Phase(core.NodeExecution_Phase_value[nodeExecutionModel.Phase])
updateStatus, err := m.updateNodeExecutionWithEvent(ctx, &request, &nodeExecutionModel)
updateStatus, err := m.updateNodeExecutionWithEvent(ctx, &request, &nodeExecutionModel, dynamicWorkflowRemoteClosureReference)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -427,14 +471,27 @@ func (m *NodeExecutionManager) GetNodeExecutionData(
response.FullOutputs = &fullOutputs
}

if len(nodeExecutionModel.DynamicWorkflowRemoteClosureReference) > 0 {
closure := &core.CompiledWorkflowClosure{}
err := m.storageClient.ReadProtobuf(ctx, storage.DataReference(nodeExecutionModel.DynamicWorkflowRemoteClosureReference), closure)
if err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal,
"Unable to read WorkflowClosure from location %s : %v", nodeExecutionModel.DynamicWorkflowRemoteClosureReference, err)
}
response.DynamicWorkflow = &admin.DynamicWorkflowNodeMetadata{
Id: closure.Primary.Template.Id,
CompiledWorkflow: closure,
}
}

m.metrics.NodeExecutionInputBytes.Observe(float64(response.Inputs.Bytes))
m.metrics.NodeExecutionOutputBytes.Observe(float64(response.Outputs.Bytes))

return response, nil
}

func NewNodeExecutionManager(db repositories.RepositoryInterface, config runtimeInterfaces.Configuration,
storageClient *storage.DataStore, scope promutils.Scope, urlData dataInterfaces.RemoteURLInterface,
storagePrefix []string, storageClient *storage.DataStore, scope promutils.Scope, urlData dataInterfaces.RemoteURLInterface,
eventPublisher notificationInterfaces.Publisher, eventWriter eventWriter.NodeExecutionEventWriter) interfaces.NodeExecutionInterface {
metrics := nodeExecutionMetrics{
Scope: scope,
Expand All @@ -458,8 +515,10 @@ func NewNodeExecutionManager(db repositories.RepositoryInterface, config runtime
"overall count of publish event errors when invoking publish()"),
}
return &NodeExecutionManager{
db: db,
config: config,
db: db,
config: config,

storagePrefix: storagePrefix,
storageClient: storageClient,
metrics: metrics,
urlData: urlData,
Expand Down
Loading

0 comments on commit 8ec2d55

Please sign in to comment.