From af505fb48d1a66b1f8cafaf8ac8edc82196c5147 Mon Sep 17 00:00:00 2001 From: Katrina Rogan Date: Thu, 8 Apr 2021 09:54:36 -0700 Subject: [PATCH] Propagate nesting and principal for child executions (#177) --- pkg/manager/impl/execution_manager.go | 106 +++++++++++------- pkg/manager/impl/execution_manager_test.go | 16 ++- pkg/repositories/transformers/execution.go | 4 - .../transformers/execution_test.go | 6 +- 4 files changed, 83 insertions(+), 49 deletions(-) diff --git a/pkg/manager/impl/execution_manager.go b/pkg/manager/impl/execution_manager.go index 97d98959ac..f3f400043f 100644 --- a/pkg/manager/impl/execution_manager.go +++ b/pkg/manager/impl/execution_manager.go @@ -367,6 +367,43 @@ func (m *ExecutionManager) setCompiledTaskDefaults(ctx context.Context, task *co checkTaskRequestsLessThanLimits(ctx, task.Template.Id, task.Template.GetContainer().Resources) } +// Fetches inherited execution metadata including the parent node execution db model id and the source execution model id +// as well as sets request spec metadata with the inherited principal and adjusted nesting data. +func (m *ExecutionManager) getInheritedExecMetadata(ctx context.Context, requestSpec *admin.ExecutionSpec, + workflowExecutionID *core.WorkflowExecutionIdentifier) (parentNodeExecutionID uint, sourceExecutionID uint, err error) { + if requestSpec.Metadata == nil || requestSpec.Metadata.ParentNodeExecution == nil { + return parentNodeExecutionID, sourceExecutionID, nil + } + parentNodeExecutionModel, err := util.GetNodeExecutionModel(ctx, m.db, requestSpec.Metadata.ParentNodeExecution) + if err != nil { + logger.Errorf(ctx, "Failed to get node execution [%+v] that launched this execution [%+v] with error %v", + requestSpec.Metadata.ParentNodeExecution, workflowExecutionID, err) + return parentNodeExecutionID, sourceExecutionID, err + } + + parentNodeExecutionID = parentNodeExecutionModel.ID + + sourceExecutionModel, err := util.GetExecutionModel(ctx, m.db, *requestSpec.Metadata.ParentNodeExecution.ExecutionId) + if err != nil { + logger.Errorf(ctx, "Failed to get workflow execution [%+v] that launched this execution [%+v] with error %v", + requestSpec.Metadata.ParentNodeExecution, workflowExecutionID, err) + return parentNodeExecutionID, sourceExecutionID, err + } + sourceExecutionID = sourceExecutionModel.ID + requestSpec.Metadata.Principal = sourceExecutionModel.User + sourceExecution, err := transformers.FromExecutionModel(*sourceExecutionModel) + if err != nil { + logger.Errorf(ctx, "Failed transform parent execution model for child execution [%+v] with err: %v", workflowExecutionID, err) + return parentNodeExecutionID, sourceExecutionID, err + } + if sourceExecution.Spec.Metadata != nil { + requestSpec.Metadata.Nesting = sourceExecution.Spec.Metadata.Nesting + 1 + } else { + requestSpec.Metadata.Nesting = 1 + } + return parentNodeExecutionID, sourceExecutionID, nil +} + func (m *ExecutionManager) launchSingleTaskExecution( ctx context.Context, request admin.ExecutionCreateRequest, requestedAt time.Time) ( context.Context, *models.Execution, error) { @@ -418,17 +455,18 @@ func (m *ExecutionManager) launchSingleTaskExecution( } ctx = getExecutionContext(ctx, &workflowExecutionID) + requestSpec := request.Spec + if requestSpec.Metadata == nil { + requestSpec.Metadata = &admin.ExecutionMetadata{} + } + requestSpec.Metadata.Principal = getUser(ctx) + // Get the node execution (if any) that launched this execution var parentNodeExecutionID uint - if request.Spec.Metadata != nil && request.Spec.Metadata.ParentNodeExecution != nil { - parentNodeExecutionModel, err := util.GetNodeExecutionModel(ctx, m.db, request.Spec.Metadata.ParentNodeExecution) - if err != nil { - logger.Errorf(ctx, "Failed to get node execution [%+v] that launched this execution [%+v] with error %v", - request.Spec.Metadata.ParentNodeExecution, workflowExecutionID, err) - return nil, nil, err - } - - parentNodeExecutionID = parentNodeExecutionModel.ID + var sourceExecutionID uint + parentNodeExecutionID, sourceExecutionID, err = m.getInheritedExecMetadata(ctx, requestSpec, &workflowExecutionID) + if err != nil { + return nil, nil, err } // Dynamically assign task resource defaults. @@ -462,19 +500,19 @@ func (m *ExecutionManager) launchSingleTaskExecution( Inputs: request.Inputs, ReferenceName: taskIdentifier.Name, AcceptedAt: requestedAt, - Auth: request.Spec.AuthRole, + Auth: requestSpec.AuthRole, QueueingBudget: qualityOfService.QueuingBudget, } - if request.Spec.Labels != nil { - executeTaskInputs.Labels = request.Spec.Labels.Values + if requestSpec.Labels != nil { + executeTaskInputs.Labels = requestSpec.Labels.Values } executeTaskInputs.Labels, err = m.addProjectLabels(ctx, request.Project, executeTaskInputs.Labels) if err != nil { return nil, nil, err } - if request.Spec.Annotations != nil { - executeTaskInputs.Annotations = request.Spec.Annotations.Values + if requestSpec.Annotations != nil { + executeTaskInputs.Annotations = requestSpec.Annotations.Values } overrides, err := m.addPluginOverrides(ctx, &workflowExecutionID, workflowExecutionID.Name, "") @@ -511,7 +549,7 @@ func (m *ExecutionManager) launchSingleTaskExecution( executionModel, err := transformers.CreateExecutionModel(transformers.CreateExecutionModelInput{ WorkflowExecutionID: workflowExecutionID, - RequestSpec: request.Spec, + RequestSpec: requestSpec, TaskID: taskModel.ID, WorkflowID: workflowModel.ID, // The execution is not considered running until the propeller sends a specific event saying so. @@ -520,10 +558,10 @@ func (m *ExecutionManager) launchSingleTaskExecution( Notifications: notificationsSettings, WorkflowIdentifier: workflow.Id, ParentNodeExecutionID: parentNodeExecutionID, + SourceExecutionID: sourceExecutionID, Cluster: execInfo.Cluster, InputsURI: inputsURI, UserInputsURI: userInputsURI, - Principal: getUser(ctx), }) if err != nil { logger.Infof(ctx, "Failed to create execution model in transformer for id: [%+v] with err: %v", @@ -583,27 +621,18 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( Name: name, } ctx = getExecutionContext(ctx, &workflowExecutionID) + var requestSpec = request.Spec + if requestSpec.Metadata == nil { + requestSpec.Metadata = &admin.ExecutionMetadata{} + } + requestSpec.Metadata.Principal = getUser(ctx) // Get the node and parent execution (if any) that launched this execution var parentNodeExecutionID uint var sourceExecutionID uint - if request.Spec.Metadata != nil && request.Spec.Metadata.ParentNodeExecution != nil { - parentNodeExecutionModel, err := util.GetNodeExecutionModel(ctx, m.db, request.Spec.Metadata.ParentNodeExecution) - if err != nil { - logger.Errorf(ctx, "Failed to get node execution [%+v] that launched this execution [%+v] with error %v", - request.Spec.Metadata.ParentNodeExecution, workflowExecutionID, err) - return nil, nil, err - } - - parentNodeExecutionID = parentNodeExecutionModel.ID - - sourceExecutionModel, err := util.GetExecutionModel(ctx, m.db, *request.Spec.Metadata.ParentNodeExecution.ExecutionId) - if err != nil { - logger.Errorf(ctx, "Failed to get node execution [%+v] that launched this execution [%+v] with error %v", - request.Spec.Metadata.ParentNodeExecution, workflowExecutionID, err) - return nil, nil, err - } - sourceExecutionID = sourceExecutionModel.ID + parentNodeExecutionID, sourceExecutionID, err = m.getInheritedExecMetadata(ctx, requestSpec, &workflowExecutionID) + if err != nil { + return nil, nil, err } // Dynamically assign task resource defaults. @@ -676,16 +705,16 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( if launchPlan.Spec.GetEntityMetadata() != nil { notificationsSettings = launchPlan.Spec.EntityMetadata.GetNotifications() } - if request.Spec.GetNotifications() != nil && request.Spec.GetNotifications().Notifications != nil && - len(request.Spec.GetNotifications().Notifications) > 0 { - notificationsSettings = request.Spec.GetNotifications().Notifications - } else if request.Spec.GetDisableAll() { + if requestSpec.GetNotifications() != nil && requestSpec.GetNotifications().Notifications != nil && + len(requestSpec.GetNotifications().Notifications) > 0 { + notificationsSettings = requestSpec.GetNotifications().Notifications + } else if requestSpec.GetDisableAll() { notificationsSettings = make([]*admin.Notification, 0) } executionModel, err := transformers.CreateExecutionModel(transformers.CreateExecutionModelInput{ WorkflowExecutionID: workflowExecutionID, - RequestSpec: request.Spec, + RequestSpec: requestSpec, LaunchPlanID: launchPlanModel.ID, WorkflowID: launchPlanModel.WorkflowID, // The execution is not considered running until the propeller sends a specific event saying so. @@ -698,7 +727,6 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( Cluster: execInfo.Cluster, InputsURI: inputsURI, UserInputsURI: userInputsURI, - Principal: getUser(ctx), }) if err != nil { logger.Infof(ctx, "Failed to create execution model in transformer for id: [%+v] with err: %v", diff --git a/pkg/manager/impl/execution_manager_test.go b/pkg/manager/impl/execution_manager_test.go index 4e3cc65d42..ecd78ce82c 100644 --- a/pkg/manager/impl/execution_manager_test.go +++ b/pkg/manager/impl/execution_manager_test.go @@ -299,17 +299,27 @@ func TestCreateExecutionFromWorkflowNode(t *testing.T) { }, nil }, ) + + principal := "feeny" getExecutionCalled := false repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback( func(ctx context.Context, input interfaces.Identifier) (models.Execution, error) { assert.EqualValues(t, input.Project, parentNodeExecutionID.ExecutionId.Project) assert.EqualValues(t, input.Domain, parentNodeExecutionID.ExecutionId.Domain) assert.EqualValues(t, input.Name, parentNodeExecutionID.ExecutionId.Name) + spec := &admin.ExecutionSpec{ + Metadata: &admin.ExecutionMetadata{ + Nesting: 1, + }, + } + specBytes, _ := proto.Marshal(spec) getExecutionCalled = true return models.Execution{ BaseModel: models.BaseModel{ ID: 2, }, + Spec: specBytes, + User: principal, }, nil }, ) @@ -322,10 +332,12 @@ func TestCreateExecutionFromWorkflowNode(t *testing.T) { assert.NoError(t, err) assert.Equal(t, admin.ExecutionMetadata_CHILD_WORKFLOW, spec.Metadata.Mode) assert.Equal(t, "feeny", spec.Metadata.Principal) - assert.EqualValues(t, 1, spec.Metadata.Nesting) assert.True(t, proto.Equal(&parentNodeExecutionID, spec.Metadata.ParentNodeExecution)) assert.EqualValues(t, input.ParentNodeExecutionID, 1) assert.EqualValues(t, input.SourceExecutionID, 2) + assert.Equal(t, 2, int(spec.Metadata.Nesting)) + assert.Equal(t, principal, spec.Metadata.Principal) + assert.Equal(t, principal, input.User) return nil }, ) @@ -334,9 +346,7 @@ func TestCreateExecutionFromWorkflowNode(t *testing.T) { request := testutils.GetExecutionRequest() request.Spec.Metadata = &admin.ExecutionMetadata{ Mode: admin.ExecutionMetadata_CHILD_WORKFLOW, - Nesting: 1, ParentNodeExecution: &parentNodeExecutionID, - Principal: "feeny", } response, err := execManager.CreateExecution(context.Background(), request, requestedAt) assert.Nil(t, err) diff --git a/pkg/repositories/transformers/execution.go b/pkg/repositories/transformers/execution.go index 8c26b042f4..3d39ec172b 100644 --- a/pkg/repositories/transformers/execution.go +++ b/pkg/repositories/transformers/execution.go @@ -33,7 +33,6 @@ type CreateExecutionModelInput struct { Cluster string InputsURI storage.DataReference UserInputsURI storage.DataReference - Principal string } // Transforms a ExecutionCreateRequest to a Execution model @@ -42,9 +41,6 @@ func CreateExecutionModel(input CreateExecutionModelInput) (*models.Execution, e if requestSpec.Metadata == nil { requestSpec.Metadata = &admin.ExecutionMetadata{} } - if len(input.Principal) > 0 { - requestSpec.Metadata.Principal = input.Principal - } requestSpec.Metadata.SystemMetadata = &admin.SystemMetadata{ ExecutionCluster: input.Cluster, } diff --git a/pkg/repositories/transformers/execution_test.go b/pkg/repositories/transformers/execution_test.go index 118a6f295b..df613d3f7e 100644 --- a/pkg/repositories/transformers/execution_test.go +++ b/pkg/repositories/transformers/execution_test.go @@ -38,8 +38,10 @@ func getRunningExecutionModel(specBytes []byte, existingClosureBytes []byte, sta func TestCreateExecutionModel(t *testing.T) { execRequest := testutils.GetExecutionRequest() + principal := "principal" execRequest.Spec.Metadata = &admin.ExecutionMetadata{ - Mode: admin.ExecutionMetadata_SYSTEM, + Mode: admin.ExecutionMetadata_SYSTEM, + Principal: principal, } lpID := uint(33) wfID := uint(23) @@ -53,7 +55,6 @@ func TestCreateExecutionModel(t *testing.T) { Version: "version", } - principal := "principal" cluster := "cluster" execution, err := CreateExecutionModel(CreateExecutionModelInput{ WorkflowExecutionID: core.WorkflowExecutionIdentifier{ @@ -69,7 +70,6 @@ func TestCreateExecutionModel(t *testing.T) { WorkflowIdentifier: workflowIdentifier, ParentNodeExecutionID: nodeID, SourceExecutionID: sourceID, - Principal: principal, Cluster: cluster, }) assert.NoError(t, err)