Skip to content

Commit

Permalink
Propagate nesting and principal for child executions (flyteorg#177)
Browse files Browse the repository at this point in the history
  • Loading branch information
Katrina Rogan authored Apr 8, 2021
1 parent 19af628 commit af505fb
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 49 deletions.
106 changes: 67 additions & 39 deletions pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,43 @@ func (m *ExecutionManager) setCompiledTaskDefaults(ctx context.Context, task *co
checkTaskRequestsLessThanLimits(ctx, task.Template.Id, task.Template.GetContainer().Resources)
}

// Fetches inherited execution metadata including the parent node execution db model id and the source execution model id
// as well as sets request spec metadata with the inherited principal and adjusted nesting data.
func (m *ExecutionManager) getInheritedExecMetadata(ctx context.Context, requestSpec *admin.ExecutionSpec,
workflowExecutionID *core.WorkflowExecutionIdentifier) (parentNodeExecutionID uint, sourceExecutionID uint, err error) {
if requestSpec.Metadata == nil || requestSpec.Metadata.ParentNodeExecution == nil {
return parentNodeExecutionID, sourceExecutionID, nil
}
parentNodeExecutionModel, err := util.GetNodeExecutionModel(ctx, m.db, requestSpec.Metadata.ParentNodeExecution)
if err != nil {
logger.Errorf(ctx, "Failed to get node execution [%+v] that launched this execution [%+v] with error %v",
requestSpec.Metadata.ParentNodeExecution, workflowExecutionID, err)
return parentNodeExecutionID, sourceExecutionID, err
}

parentNodeExecutionID = parentNodeExecutionModel.ID

sourceExecutionModel, err := util.GetExecutionModel(ctx, m.db, *requestSpec.Metadata.ParentNodeExecution.ExecutionId)
if err != nil {
logger.Errorf(ctx, "Failed to get workflow execution [%+v] that launched this execution [%+v] with error %v",
requestSpec.Metadata.ParentNodeExecution, workflowExecutionID, err)
return parentNodeExecutionID, sourceExecutionID, err
}
sourceExecutionID = sourceExecutionModel.ID
requestSpec.Metadata.Principal = sourceExecutionModel.User
sourceExecution, err := transformers.FromExecutionModel(*sourceExecutionModel)
if err != nil {
logger.Errorf(ctx, "Failed transform parent execution model for child execution [%+v] with err: %v", workflowExecutionID, err)
return parentNodeExecutionID, sourceExecutionID, err
}
if sourceExecution.Spec.Metadata != nil {
requestSpec.Metadata.Nesting = sourceExecution.Spec.Metadata.Nesting + 1
} else {
requestSpec.Metadata.Nesting = 1
}
return parentNodeExecutionID, sourceExecutionID, nil
}

func (m *ExecutionManager) launchSingleTaskExecution(
ctx context.Context, request admin.ExecutionCreateRequest, requestedAt time.Time) (
context.Context, *models.Execution, error) {
Expand Down Expand Up @@ -418,17 +455,18 @@ func (m *ExecutionManager) launchSingleTaskExecution(
}
ctx = getExecutionContext(ctx, &workflowExecutionID)

requestSpec := request.Spec
if requestSpec.Metadata == nil {
requestSpec.Metadata = &admin.ExecutionMetadata{}
}
requestSpec.Metadata.Principal = getUser(ctx)

// Get the node execution (if any) that launched this execution
var parentNodeExecutionID uint
if request.Spec.Metadata != nil && request.Spec.Metadata.ParentNodeExecution != nil {
parentNodeExecutionModel, err := util.GetNodeExecutionModel(ctx, m.db, request.Spec.Metadata.ParentNodeExecution)
if err != nil {
logger.Errorf(ctx, "Failed to get node execution [%+v] that launched this execution [%+v] with error %v",
request.Spec.Metadata.ParentNodeExecution, workflowExecutionID, err)
return nil, nil, err
}

parentNodeExecutionID = parentNodeExecutionModel.ID
var sourceExecutionID uint
parentNodeExecutionID, sourceExecutionID, err = m.getInheritedExecMetadata(ctx, requestSpec, &workflowExecutionID)
if err != nil {
return nil, nil, err
}

// Dynamically assign task resource defaults.
Expand Down Expand Up @@ -462,19 +500,19 @@ func (m *ExecutionManager) launchSingleTaskExecution(
Inputs: request.Inputs,
ReferenceName: taskIdentifier.Name,
AcceptedAt: requestedAt,
Auth: request.Spec.AuthRole,
Auth: requestSpec.AuthRole,
QueueingBudget: qualityOfService.QueuingBudget,
}
if request.Spec.Labels != nil {
executeTaskInputs.Labels = request.Spec.Labels.Values
if requestSpec.Labels != nil {
executeTaskInputs.Labels = requestSpec.Labels.Values
}
executeTaskInputs.Labels, err = m.addProjectLabels(ctx, request.Project, executeTaskInputs.Labels)
if err != nil {
return nil, nil, err
}

if request.Spec.Annotations != nil {
executeTaskInputs.Annotations = request.Spec.Annotations.Values
if requestSpec.Annotations != nil {
executeTaskInputs.Annotations = requestSpec.Annotations.Values
}

overrides, err := m.addPluginOverrides(ctx, &workflowExecutionID, workflowExecutionID.Name, "")
Expand Down Expand Up @@ -511,7 +549,7 @@ func (m *ExecutionManager) launchSingleTaskExecution(

executionModel, err := transformers.CreateExecutionModel(transformers.CreateExecutionModelInput{
WorkflowExecutionID: workflowExecutionID,
RequestSpec: request.Spec,
RequestSpec: requestSpec,
TaskID: taskModel.ID,
WorkflowID: workflowModel.ID,
// The execution is not considered running until the propeller sends a specific event saying so.
Expand All @@ -520,10 +558,10 @@ func (m *ExecutionManager) launchSingleTaskExecution(
Notifications: notificationsSettings,
WorkflowIdentifier: workflow.Id,
ParentNodeExecutionID: parentNodeExecutionID,
SourceExecutionID: sourceExecutionID,
Cluster: execInfo.Cluster,
InputsURI: inputsURI,
UserInputsURI: userInputsURI,
Principal: getUser(ctx),
})
if err != nil {
logger.Infof(ctx, "Failed to create execution model in transformer for id: [%+v] with err: %v",
Expand Down Expand Up @@ -583,27 +621,18 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
Name: name,
}
ctx = getExecutionContext(ctx, &workflowExecutionID)
var requestSpec = request.Spec
if requestSpec.Metadata == nil {
requestSpec.Metadata = &admin.ExecutionMetadata{}
}
requestSpec.Metadata.Principal = getUser(ctx)

// Get the node and parent execution (if any) that launched this execution
var parentNodeExecutionID uint
var sourceExecutionID uint
if request.Spec.Metadata != nil && request.Spec.Metadata.ParentNodeExecution != nil {
parentNodeExecutionModel, err := util.GetNodeExecutionModel(ctx, m.db, request.Spec.Metadata.ParentNodeExecution)
if err != nil {
logger.Errorf(ctx, "Failed to get node execution [%+v] that launched this execution [%+v] with error %v",
request.Spec.Metadata.ParentNodeExecution, workflowExecutionID, err)
return nil, nil, err
}

parentNodeExecutionID = parentNodeExecutionModel.ID

sourceExecutionModel, err := util.GetExecutionModel(ctx, m.db, *request.Spec.Metadata.ParentNodeExecution.ExecutionId)
if err != nil {
logger.Errorf(ctx, "Failed to get node execution [%+v] that launched this execution [%+v] with error %v",
request.Spec.Metadata.ParentNodeExecution, workflowExecutionID, err)
return nil, nil, err
}
sourceExecutionID = sourceExecutionModel.ID
parentNodeExecutionID, sourceExecutionID, err = m.getInheritedExecMetadata(ctx, requestSpec, &workflowExecutionID)
if err != nil {
return nil, nil, err
}

// Dynamically assign task resource defaults.
Expand Down Expand Up @@ -676,16 +705,16 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
if launchPlan.Spec.GetEntityMetadata() != nil {
notificationsSettings = launchPlan.Spec.EntityMetadata.GetNotifications()
}
if request.Spec.GetNotifications() != nil && request.Spec.GetNotifications().Notifications != nil &&
len(request.Spec.GetNotifications().Notifications) > 0 {
notificationsSettings = request.Spec.GetNotifications().Notifications
} else if request.Spec.GetDisableAll() {
if requestSpec.GetNotifications() != nil && requestSpec.GetNotifications().Notifications != nil &&
len(requestSpec.GetNotifications().Notifications) > 0 {
notificationsSettings = requestSpec.GetNotifications().Notifications
} else if requestSpec.GetDisableAll() {
notificationsSettings = make([]*admin.Notification, 0)
}

executionModel, err := transformers.CreateExecutionModel(transformers.CreateExecutionModelInput{
WorkflowExecutionID: workflowExecutionID,
RequestSpec: request.Spec,
RequestSpec: requestSpec,
LaunchPlanID: launchPlanModel.ID,
WorkflowID: launchPlanModel.WorkflowID,
// The execution is not considered running until the propeller sends a specific event saying so.
Expand All @@ -698,7 +727,6 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
Cluster: execInfo.Cluster,
InputsURI: inputsURI,
UserInputsURI: userInputsURI,
Principal: getUser(ctx),
})
if err != nil {
logger.Infof(ctx, "Failed to create execution model in transformer for id: [%+v] with err: %v",
Expand Down
16 changes: 13 additions & 3 deletions pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,17 +299,27 @@ func TestCreateExecutionFromWorkflowNode(t *testing.T) {
}, nil
},
)

principal := "feeny"
getExecutionCalled := false
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(
func(ctx context.Context, input interfaces.Identifier) (models.Execution, error) {
assert.EqualValues(t, input.Project, parentNodeExecutionID.ExecutionId.Project)
assert.EqualValues(t, input.Domain, parentNodeExecutionID.ExecutionId.Domain)
assert.EqualValues(t, input.Name, parentNodeExecutionID.ExecutionId.Name)
spec := &admin.ExecutionSpec{
Metadata: &admin.ExecutionMetadata{
Nesting: 1,
},
}
specBytes, _ := proto.Marshal(spec)
getExecutionCalled = true
return models.Execution{
BaseModel: models.BaseModel{
ID: 2,
},
Spec: specBytes,
User: principal,
}, nil
},
)
Expand All @@ -322,10 +332,12 @@ func TestCreateExecutionFromWorkflowNode(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, admin.ExecutionMetadata_CHILD_WORKFLOW, spec.Metadata.Mode)
assert.Equal(t, "feeny", spec.Metadata.Principal)
assert.EqualValues(t, 1, spec.Metadata.Nesting)
assert.True(t, proto.Equal(&parentNodeExecutionID, spec.Metadata.ParentNodeExecution))
assert.EqualValues(t, input.ParentNodeExecutionID, 1)
assert.EqualValues(t, input.SourceExecutionID, 2)
assert.Equal(t, 2, int(spec.Metadata.Nesting))
assert.Equal(t, principal, spec.Metadata.Principal)
assert.Equal(t, principal, input.User)
return nil
},
)
Expand All @@ -334,9 +346,7 @@ func TestCreateExecutionFromWorkflowNode(t *testing.T) {
request := testutils.GetExecutionRequest()
request.Spec.Metadata = &admin.ExecutionMetadata{
Mode: admin.ExecutionMetadata_CHILD_WORKFLOW,
Nesting: 1,
ParentNodeExecution: &parentNodeExecutionID,
Principal: "feeny",
}
response, err := execManager.CreateExecution(context.Background(), request, requestedAt)
assert.Nil(t, err)
Expand Down
4 changes: 0 additions & 4 deletions pkg/repositories/transformers/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ type CreateExecutionModelInput struct {
Cluster string
InputsURI storage.DataReference
UserInputsURI storage.DataReference
Principal string
}

// Transforms a ExecutionCreateRequest to a Execution model
Expand All @@ -42,9 +41,6 @@ func CreateExecutionModel(input CreateExecutionModelInput) (*models.Execution, e
if requestSpec.Metadata == nil {
requestSpec.Metadata = &admin.ExecutionMetadata{}
}
if len(input.Principal) > 0 {
requestSpec.Metadata.Principal = input.Principal
}
requestSpec.Metadata.SystemMetadata = &admin.SystemMetadata{
ExecutionCluster: input.Cluster,
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/repositories/transformers/execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ func getRunningExecutionModel(specBytes []byte, existingClosureBytes []byte, sta

func TestCreateExecutionModel(t *testing.T) {
execRequest := testutils.GetExecutionRequest()
principal := "principal"
execRequest.Spec.Metadata = &admin.ExecutionMetadata{
Mode: admin.ExecutionMetadata_SYSTEM,
Mode: admin.ExecutionMetadata_SYSTEM,
Principal: principal,
}
lpID := uint(33)
wfID := uint(23)
Expand All @@ -53,7 +55,6 @@ func TestCreateExecutionModel(t *testing.T) {
Version: "version",
}

principal := "principal"
cluster := "cluster"
execution, err := CreateExecutionModel(CreateExecutionModelInput{
WorkflowExecutionID: core.WorkflowExecutionIdentifier{
Expand All @@ -69,7 +70,6 @@ func TestCreateExecutionModel(t *testing.T) {
WorkflowIdentifier: workflowIdentifier,
ParentNodeExecutionID: nodeID,
SourceExecutionID: sourceID,
Principal: principal,
Cluster: cluster,
})
assert.NoError(t, err)
Expand Down

0 comments on commit af505fb

Please sign in to comment.