Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Single task execution (admin-generated workflow implementation) #96

Merged
merged 24 commits into from
May 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ require (
github.com/jmespath/go-jmespath v0.3.0 // indirect
github.com/kelseyhightower/envconfig v1.4.0 // indirect
github.com/lib/pq v1.3.0
github.com/lyft/flyteidl v0.17.27
github.com/lyft/flyteidl v0.17.30
github.com/lyft/flytepropeller v0.2.13
github.com/lyft/flytestdlib v0.3.2
github.com/magiconair/properties v1.8.1
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,10 @@ github.com/lyft/flyteidl v0.17.8 h1:/bZS1K3FO45EMamNrs4Eo6WYQf1TO5bNyNTIUO6cXM0=
github.com/lyft/flyteidl v0.17.8/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.17.27 h1:0EdSHauzdPEYmubYib/XC6fLb+srzP4yDRN1P9o4W/I=
github.com/lyft/flyteidl v0.17.27/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.17.28-0.20200427212215-1d2c77c864b2 h1:+VDdSC4HMWBhlrjMcAS+1kGp+bK1e5OJR81tjzYJS7M=
github.com/lyft/flyteidl v0.17.28-0.20200427212215-1d2c77c864b2/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.17.30 h1:mL767VAPI6+bCRuVE6L6fl0uQXeu1TcnQf02QMQ0fQc=
github.com/lyft/flyteidl v0.17.30/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteplugins v0.3.11/go.mod h1:FOSo04q4EheU6lm0oZFvfYAWgjrum/BDUK+mUT7qDFA=
github.com/lyft/flytepropeller v0.2.13 h1:RDFM8ps5bHWdHYK87NLyYX4iyF16ahkxerI0X9DZSfM=
github.com/lyft/flytepropeller v0.2.13/go.mod h1:QJ9txCCxHnzvwQoG4TbcldVs1in4+C943prLZVDmmIA=
Expand Down
159 changes: 158 additions & 1 deletion pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ type ExecutionManager struct {
userMetrics executionUserMetrics
notificationClient notificationInterfaces.Publisher
urlData dataInterfaces.RemoteURLInterface
workflowManager interfaces.WorkflowInterface
namedEntityManager interfaces.NamedEntityInterface
}

func getExecutionContext(ctx context.Context, id *core.WorkflowExecutionIdentifier) context.Context {
Expand Down Expand Up @@ -303,6 +305,150 @@ func setCompiledTaskDefaults(ctx context.Context, config runtimeInterfaces.Confi
taskResourceSpec)
}

func (m *ExecutionManager) launchSingleTaskExecution(
ctx context.Context, request admin.ExecutionCreateRequest, requestedAt time.Time) (
context.Context, *models.Execution, error) {

taskModel, err := m.db.TaskRepo().Get(ctx, repositoryInterfaces.GetResourceInput{
Project: request.Spec.LaunchPlan.Project,
Domain: request.Spec.LaunchPlan.Domain,
Name: request.Spec.LaunchPlan.Name,
Version: request.Spec.LaunchPlan.Version,
})
if err != nil {
return nil, nil, err
}
task, err := transformers.FromTaskModel(taskModel)
if err != nil {
return nil, nil, err
}

// Prepare a skeleton workflow
taskIdentifier := request.Spec.LaunchPlan
workflowModel, err :=
util.CreateOrGetWorkflowModel(ctx, request, m.db, m.workflowManager, m.namedEntityManager, taskIdentifier, &task)
if err != nil {
logger.Debugf(ctx, "Failed to created skeleton workflow for [%+v] with err: %v", taskIdentifier, err)
return nil, nil, err
}
workflow, err := transformers.FromWorkflowModel(*workflowModel)
if err != nil {
return nil, nil, err
}
closure, err := util.FetchAndGetWorkflowClosure(ctx, m.storageClient, workflowModel.RemoteClosureIdentifier)
if err != nil {
return nil, nil, err
}
closure.CreatedAt = workflow.Closure.CreatedAt
workflow.Closure = closure
// Also prepare a skeleton launch plan.
launchPlan, err := util.CreateOrGetLaunchPlan(ctx, m.db, m.config, taskIdentifier,
workflow.Closure.CompiledWorkflow.Primary.Template.Interface, workflowModel.ID, request.Spec)
if err != nil {
return nil, nil, err
}

name := util.GetExecutionName(request)
workflowExecutionID := core.WorkflowExecutionIdentifier{
Project: request.Project,
Domain: request.Domain,
Name: name,
}
ctx = getExecutionContext(ctx, &workflowExecutionID)

// Get the node execution (if any) that launched this execution
var parentNodeExecutionID uint
if request.Spec.Metadata != nil && request.Spec.Metadata.ParentNodeExecution != nil {
parentNodeExecutionModel, err := util.GetNodeExecutionModel(ctx, m.db, request.Spec.Metadata.ParentNodeExecution)
if err != nil {
logger.Errorf(ctx, "Failed to get node execution [%+v] that launched this execution [%+v] with error %v",
request.Spec.Metadata.ParentNodeExecution, workflowExecutionID, err)
return nil, nil, err
}

parentNodeExecutionID = parentNodeExecutionModel.ID
}

// Dynamically assign task resource defaults.
for _, task := range workflow.Closure.CompiledWorkflow.Tasks {
setCompiledTaskDefaults(ctx, m.config, task, m.db, name)
}

// Dynamically assign execution queues.
m.populateExecutionQueue(ctx, *workflow.Id, workflow.Closure.CompiledWorkflow)

inputsURI, err := m.offloadInputs(ctx, request.Inputs, &workflowExecutionID, shared.Inputs)
if err != nil {
return nil, nil, err
}
userInputsURI, err := m.offloadInputs(ctx, request.Inputs, &workflowExecutionID, shared.UserInputs)
if err != nil {
return nil, nil, err
}
executeTaskInputs := workflowengineInterfaces.ExecuteTaskInput{
ExecutionID: &workflowExecutionID,
WfClosure: *workflow.Closure.CompiledWorkflow,
Inputs: request.Inputs,
ReferenceName: taskIdentifier.Name,
AcceptedAt: requestedAt,
Auth: request.Spec.AuthRole,
}
if request.Spec.Labels != nil {
executeTaskInputs.Labels = request.Spec.Labels.Values
}
if request.Spec.Annotations != nil {
executeTaskInputs.Annotations = request.Spec.Annotations.Values
}

execInfo, err := m.workflowExecutor.ExecuteTask(ctx, executeTaskInputs)
if err != nil {
m.systemMetrics.PropellerFailures.Inc()
logger.Infof(ctx, "Failed to execute workflow %+v with execution id %+v and inputs %+v with err %v",
request, workflowExecutionID, request.Inputs, err)
return nil, nil, err
}
executionCreatedAt := time.Now()
acceptanceDelay := executionCreatedAt.Sub(requestedAt)
m.systemMetrics.AcceptanceDelay.Observe(acceptanceDelay.Seconds())

// Request notification settings takes precedence over the launch plan settings.
// If there is no notification in the request and DisableAll is not true, use the settings from the launch plan.
var notificationsSettings []*admin.Notification
if launchPlan.Spec.GetEntityMetadata() != nil {
notificationsSettings = launchPlan.Spec.EntityMetadata.GetNotifications()
}
if request.Spec.GetNotifications() != nil && request.Spec.GetNotifications().Notifications != nil &&
len(request.Spec.GetNotifications().Notifications) > 0 {
notificationsSettings = request.Spec.GetNotifications().Notifications
} else if request.Spec.GetDisableAll() {
notificationsSettings = make([]*admin.Notification, 0)
}

executionModel, err := transformers.CreateExecutionModel(transformers.CreateExecutionModelInput{
WorkflowExecutionID: workflowExecutionID,
RequestSpec: request.Spec,
TaskID: taskModel.ID,
WorkflowID: workflowModel.ID,
// The execution is not considered running until the propeller sends a specific event saying so.
Phase: core.WorkflowExecution_UNDEFINED,
CreatedAt: m._clock.Now(),
Notifications: notificationsSettings,
WorkflowIdentifier: workflow.Id,
ParentNodeExecutionID: parentNodeExecutionID,
Cluster: execInfo.Cluster,
InputsURI: inputsURI,
UserInputsURI: userInputsURI,
Principal: getUser(ctx),
})
if err != nil {
logger.Infof(ctx, "Failed to create execution model in transformer for id: [%+v] with err: %v",
workflowExecutionID, err)
return nil, nil, err
}
return ctx, executionModel, nil

}

func (m *ExecutionManager) launchExecutionAndPrepareModel(
ctx context.Context, request admin.ExecutionCreateRequest, requestedAt time.Time) (
context.Context, *models.Execution, error) {
Expand All @@ -311,6 +457,11 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
logger.Debugf(ctx, "Failed to validate ExecutionCreateRequest %+v with err %v", request, err)
return nil, nil, err
}
if request.Spec.LaunchPlan.ResourceType == core.ResourceType_TASK {
logger.Debugf(ctx, "Launching single task execution with [%+v]", request.Spec.LaunchPlan)
return m.launchSingleTaskExecution(ctx, request, requestedAt)
}

launchPlanModel, err := util.GetLaunchPlanModel(ctx, m.db, *request.Spec.LaunchPlan)
if err != nil {
logger.Debugf(ctx, "Failed to get launch plan model for ExecutionCreateRequest %+v with err %v", request, err)
Expand All @@ -333,7 +484,9 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
request.Inputs, launchPlan.Spec.FixedInputs, launchPlan.Closure.ExpectedInputs, err)
return nil, nil, err
}

workflow, err := util.GetWorkflow(ctx, m.db, m.storageClient, *launchPlan.Spec.WorkflowId)

if err != nil {
logger.Debugf(ctx, "Failed to get workflow with id %+v with err %v", launchPlan.Spec.WorkflowId, err)
return nil, nil, err
Expand Down Expand Up @@ -1040,7 +1193,9 @@ func NewExecutionManager(
systemScope promutils.Scope,
userScope promutils.Scope,
publisher notificationInterfaces.Publisher,
urlData dataInterfaces.RemoteURLInterface) interfaces.ExecutionInterface {
urlData dataInterfaces.RemoteURLInterface,
workflowManager interfaces.WorkflowInterface,
namedEntityManager interfaces.NamedEntityInterface) interfaces.ExecutionInterface {
queueAllocator := executions.NewQueueAllocator(config, db)
systemMetrics := newExecutionSystemMetrics(systemScope)

Expand All @@ -1060,5 +1215,7 @@ func NewExecutionManager(
userMetrics: userMetrics,
notificationClient: publisher,
urlData: urlData,
workflowManager: workflowManager,
namedEntityManager: namedEntityManager,
}
}
Loading