From 76496836a1137576928689940f3ca111aa433c34 Mon Sep 17 00:00:00 2001 From: Katrina Rogan Date: Mon, 22 May 2023 12:33:40 -0700 Subject: [PATCH] Save execution namespace in system metadata (#568) --- flyteadmin/go.mod | 2 +- flyteadmin/go.sum | 4 +- .../pkg/manager/impl/execution_manager.go | 22 ++-- .../manager/impl/execution_manager_test.go | 117 ++++++++++++++---- .../repositories/transformers/execution.go | 22 +++- .../transformers/execution_test.go | 36 +++++- 6 files changed, 163 insertions(+), 40 deletions(-) diff --git a/flyteadmin/go.mod b/flyteadmin/go.mod index ddef8ccff..46ef4b0de 100644 --- a/flyteadmin/go.mod +++ b/flyteadmin/go.mod @@ -13,7 +13,7 @@ require ( github.com/cloudevents/sdk-go/v2 v2.8.0 github.com/coreos/go-oidc v2.2.1+incompatible github.com/evanphx/json-patch v4.12.0+incompatible - github.com/flyteorg/flyteidl v1.5.5 + github.com/flyteorg/flyteidl v1.5.7 github.com/flyteorg/flyteplugins v1.0.56 github.com/flyteorg/flytepropeller v1.1.87 github.com/flyteorg/flytestdlib v1.0.15 diff --git a/flyteadmin/go.sum b/flyteadmin/go.sum index 91807b3aa..255fa8893 100644 --- a/flyteadmin/go.sum +++ b/flyteadmin/go.sum @@ -312,8 +312,8 @@ github.com/fatih/structs v1.0.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ= github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= -github.com/flyteorg/flyteidl v1.5.5 h1:tNNhuXPog4atAMSGE2kyAg6JzYy1TvjqrrQeh1EZVHs= -github.com/flyteorg/flyteidl v1.5.5/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og= +github.com/flyteorg/flyteidl v1.5.7 h1:voAxMMFsKOseNFSlCyRGlpegqtQXtJjyxgsQzZg4tts= +github.com/flyteorg/flyteidl v1.5.7/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og= github.com/flyteorg/flyteplugins v1.0.56 h1:kBTDgTpdSi7wcptk4cMwz5vfh1MU82VaUMMboe1InXw= github.com/flyteorg/flyteplugins v1.0.56/go.mod h1:aFCKSn8TPzxSAILIiogHtUnHlUCN9+y6Vf+r9R4KZDU= github.com/flyteorg/flytepropeller v1.1.87 h1:Px7ASDjrWyeVrUb15qXmhw9QK7xPcFjL5Yetr2P6iGM= diff --git a/flyteadmin/pkg/manager/impl/execution_manager.go b/flyteadmin/pkg/manager/impl/execution_manager.go index bbcc45a4a..8a6b7287b 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager.go +++ b/flyteadmin/pkg/manager/impl/execution_manager.go @@ -323,7 +323,7 @@ func (m *ExecutionManager) getInheritedExecMetadata(ctx context.Context, request } sourceExecutionID = sourceExecutionModel.ID requestSpec.Metadata.Principal = sourceExecutionModel.User - sourceExecution, err := transformers.FromExecutionModel(*sourceExecutionModel, transformers.DefaultExecutionTransformerOptions) + sourceExecution, err := transformers.FromExecutionModel(ctx, *sourceExecutionModel, transformers.DefaultExecutionTransformerOptions) if err != nil { logger.Errorf(ctx, "Failed transform parent execution model for child execution [%+v] with err: %v", workflowExecutionID, err) return parentNodeExecutionID, sourceExecutionID, err @@ -650,6 +650,7 @@ func (m *ExecutionManager) launchSingleTaskExecution( UserInputsURI: userInputsURI, SecurityContext: executionConfig.SecurityContext, LaunchEntity: taskIdentifier.ResourceType, + Namespace: namespace, }) if err != nil { logger.Infof(ctx, "Failed to create execution model in transformer for id: [%+v] with err: %v", @@ -905,6 +906,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( UserInputsURI: userInputsURI, SecurityContext: executionConfig.SecurityContext, LaunchEntity: launchPlan.Id.ResourceType, + Namespace: namespace, }) if err != nil { logger.Infof(ctx, "Failed to create execution model in transformer for id: [%+v] with err: %v", @@ -965,7 +967,7 @@ func (m *ExecutionManager) RelaunchExecution( logger.Debugf(ctx, "Failed to get execution model for request [%+v] with err %v", request, err) return nil, err } - existingExecution, err := transformers.FromExecutionModel(*existingExecutionModel, transformers.DefaultExecutionTransformerOptions) + existingExecution, err := transformers.FromExecutionModel(ctx, *existingExecutionModel, transformers.DefaultExecutionTransformerOptions) if err != nil { return nil, err } @@ -1022,7 +1024,7 @@ func (m *ExecutionManager) RecoverExecution( logger.Debugf(ctx, "Failed to get execution model for request [%+v] with err %v", request, err) return nil, err } - existingExecution, err := transformers.FromExecutionModel(*existingExecutionModel, transformers.DefaultExecutionTransformerOptions) + existingExecution, err := transformers.FromExecutionModel(ctx, *existingExecutionModel, transformers.DefaultExecutionTransformerOptions) if err != nil { return nil, err } @@ -1073,7 +1075,7 @@ func (m *ExecutionManager) emitScheduledWorkflowMetrics( return } // Find the reference launch plan to get the kickoff time argument - execution, err := transformers.FromExecutionModel(*executionModel, transformers.DefaultExecutionTransformerOptions) + execution, err := transformers.FromExecutionModel(ctx, *executionModel, transformers.DefaultExecutionTransformerOptions) if err != nil { logger.Warningf(context.Background(), "failed to transform execution model when emitting scheduled workflow execution stats with for "+ @@ -1316,7 +1318,11 @@ func (m *ExecutionManager) GetExecution( logger.Debugf(ctx, "Failed to get execution model for request [%+v] with err: %v", request, err) return nil, err } - execution, transformerErr := transformers.FromExecutionModel(*executionModel, transformers.DefaultExecutionTransformerOptions) + namespace := common.GetNamespaceName( + m.config.NamespaceMappingConfiguration().GetNamespaceTemplate(), request.GetId().GetProject(), request.GetId().GetDomain()) + execution, transformerErr := transformers.FromExecutionModel(ctx, *executionModel, &transformers.ExecutionTransformerOptions{ + DefaultNamespace: namespace, + }) if transformerErr != nil { logger.Debugf(ctx, "Failed to transform execution model [%+v] to proto object with err: %v", request.Id, transformerErr) @@ -1359,7 +1365,7 @@ func (m *ExecutionManager) GetExecutionData( logger.Debugf(ctx, "Failed to get execution model for request [%+v] with err: %v", request, err) return nil, err } - execution, err := transformers.FromExecutionModel(*executionModel, transformers.DefaultExecutionTransformerOptions) + execution, err := transformers.FromExecutionModel(ctx, *executionModel, transformers.DefaultExecutionTransformerOptions) if err != nil { logger.Debugf(ctx, "Failed to transform execution model [%+v] to proto object with err: %v", request.Id, err) return nil, err @@ -1459,7 +1465,7 @@ func (m *ExecutionManager) ListExecutions( logger.Debugf(ctx, "Failed to list executions using input [%+v] with err %v", listExecutionsInput, err) return nil, err } - executionList, err := transformers.FromExecutionModels(output.Executions, transformers.ListExecutionTransformerOptions) + executionList, err := transformers.FromExecutionModels(ctx, output.Executions, transformers.ListExecutionTransformerOptions) if err != nil { logger.Errorf(ctx, "Failed to transform execution models [%+v] with err: %v", output.Executions, err) @@ -1489,7 +1495,7 @@ func (m *ExecutionManager) ListExecutions( func (m *ExecutionManager) publishNotifications(ctx context.Context, request admin.WorkflowExecutionEventRequest, execution models.Execution) error { // Notifications are stored in the Spec object of an admin.Execution object. - adminExecution, err := transformers.FromExecutionModel(execution, transformers.DefaultExecutionTransformerOptions) + adminExecution, err := transformers.FromExecutionModel(ctx, execution, transformers.DefaultExecutionTransformerOptions) if err != nil { // This shouldn't happen because execution manager marshaled the data into models.Execution. m.systemMetrics.TransformerError.Inc() diff --git a/flyteadmin/pkg/manager/impl/execution_manager_test.go b/flyteadmin/pkg/manager/impl/execution_manager_test.go index a0e674efc..73fabbc40 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager_test.go +++ b/flyteadmin/pkg/manager/impl/execution_manager_test.go @@ -107,6 +107,37 @@ func getLegacySpecBytes() []byte { return b } +func getExpectedLegacySpec() *admin.ExecutionSpec { + expectedLegacySpec := getLegacySpec() + expectedLegacySpec.Metadata = &admin.ExecutionMetadata{ + SystemMetadata: &admin.SystemMetadata{ + Namespace: "project-domain", + }, + } + return expectedLegacySpec +} + +func getExpectedLegacySpecBytes() []byte { + expectedLegacySpec := getExpectedLegacySpec() + b, _ := proto.Marshal(expectedLegacySpec) + return b +} + +func getExpectedSpec() *admin.ExecutionSpec { + expectedSpec := testutils.GetExecutionRequest().Spec + expectedSpec.Metadata = &admin.ExecutionMetadata{ + SystemMetadata: &admin.SystemMetadata{ + Namespace: "project-domain", + }, + } + return expectedSpec +} + +func getExpectedSpecBytes() []byte { + specBytes, _ := proto.Marshal(getExpectedSpec()) + return specBytes +} + func getLegacyClosure() *admin.ExecutionClosure { return &admin.ExecutionClosure{ Phase: core.WorkflowExecution_RUNNING, @@ -287,6 +318,7 @@ func TestCreateExecution(t *testing.T) { assert.Equal(t, rawOutput, spec.RawOutputDataConfig.OutputLocationPrefix) assert.True(t, proto.Equal(spec.ClusterAssignment, &clusterAssignment)) assert.Equal(t, "launch_plan", input.LaunchEntity) + assert.Equal(t, spec.GetMetadata().GetSystemMetadata().Namespace, "project-domain") return nil }) setDefaultLpCallbackForExecTest(repository) @@ -1167,6 +1199,49 @@ func TestCreateExecutionWithEnvs(t *testing.T) { } } +func TestCreateExecution_CustomNamespaceMappingConfig(t *testing.T) { + request := testutils.GetExecutionRequest() + repository := getMockRepositoryForExecTest() + storageClient := getMockStorageForExecTest(context.Background()) + setDefaultLpCallbackForExecTest(repository) + mockClock := clock.NewMock() + createdAt := time.Now() + mockClock.Set(createdAt) + exCreateFunc := func(ctx context.Context, input models.Execution) error { + var spec admin.ExecutionSpec + err := proto.Unmarshal(input.Spec, &spec) + assert.NoError(t, err) + assert.Equal(t, spec.GetMetadata().GetSystemMetadata().Namespace, "project") + return nil + } + + repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetCreateCallback(exCreateFunc) + mockExecutor := workflowengineMocks.WorkflowExecutor{} + mockExecutor.OnExecuteMatch(mock.Anything, mock.Anything, mock.Anything).Return(workflowengineInterfaces.ExecutionResponse{}, nil) + mockExecutor.OnID().Return("testMockExecutor") + r := plugins.NewRegistry() + r.RegisterDefault(plugins.PluginIDWorkflowExecutor, &mockExecutor) + + mockNs := runtimeMocks.NamespaceMappingConfiguration{} + mockNs.OnGetNamespaceTemplate().Return("{{ project }}") + mockExecutionsConfigProvider := runtimeMocks.NewMockConfigurationProvider( + testutils.GetApplicationConfigWithDefaultDomains(), + runtimeMocks.NewMockQueueConfigurationProvider( + []runtimeInterfaces.ExecutionQueue{}, []runtimeInterfaces.WorkflowConfig{}), + nil, + runtimeMocks.NewMockTaskResourceConfiguration(resourceDefaults, resourceLimits), nil, &mockNs) + mockExecutionsConfigProvider.(*runtimeMocks.MockConfigurationProvider).AddRegistrationValidationConfiguration( + runtimeMocks.NewMockRegistrationValidationProvider()) + + execManager := NewExecutionManager(repository, r, mockExecutionsConfigProvider, storageClient, mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{}) + + execManager.(*ExecutionManager)._clock = mockClock + + response, err := execManager.CreateExecution(context.Background(), request, requestedAt) + assert.Nil(t, err) + assert.True(t, proto.Equal(&executionIdentifier, response.Id)) +} + func makeExecutionGetFunc( t *testing.T, closureBytes []byte, startTime *time.Time) repositoryMocks.GetExecutionFunc { return func(ctx context.Context, input interfaces.Identifier) (models.Execution, error) { @@ -1182,7 +1257,7 @@ func makeExecutionGetFunc( BaseModel: models.BaseModel{ ID: uint(8), }, - Spec: specBytes, + Spec: getExpectedSpecBytes(), Phase: core.WorkflowExecution_QUEUED.String(), Closure: closureBytes, LaunchPlanID: uint(1), @@ -1769,7 +1844,7 @@ func TestRecoverExecution_RecoveredChildNode(t *testing.T) { switch input.Name { case "name": return models.Execution{ - Spec: specBytes, + Spec: getExpectedSpecBytes(), Closure: existingClosureBytes, BaseModel: models.BaseModel{ ID: referencedExecutionID, @@ -2148,7 +2223,7 @@ func TestCreateWorkflowEvent(t *testing.T) { assert.Equal(t, uint(2), execution.WorkflowID) assert.Equal(t, core.WorkflowExecution_FAILED.String(), execution.Phase) assert.Equal(t, closureBytes, execution.Closure) - assert.Equal(t, specBytes, execution.Spec) + assert.Equal(t, getExpectedSpecBytes(), execution.Spec) assert.Equal(t, startTime, *execution.StartedAt) assert.Equal(t, duration, execution.Duration) return nil @@ -2188,7 +2263,7 @@ func TestCreateWorkflowEvent_TerminalState(t *testing.T) { BaseModel: models.BaseModel{ ID: uint(8), }, - Spec: specBytes, + Spec: getExpectedSpecBytes(), Phase: core.WorkflowExecution_FAILED.String(), }, nil } @@ -2228,7 +2303,7 @@ func TestCreateWorkflowEvent_NoRunningToQueued(t *testing.T) { Domain: "domain", Name: "name", }, - Spec: specBytes, + Spec: getExpectedSpecBytes(), Phase: core.WorkflowExecution_RUNNING.String(), }, nil } @@ -2264,7 +2339,7 @@ func TestCreateWorkflowEvent_CurrentlyAborting(t *testing.T) { Domain: "domain", Name: "name", }, - Spec: specBytes, + Spec: getExpectedSpecBytes(), Phase: core.WorkflowExecution_ABORTING.String(), }, nil } @@ -2335,7 +2410,7 @@ func TestCreateWorkflowEvent_StartedRunning(t *testing.T) { assert.Equal(t, uint(2), execution.WorkflowID) assert.Equal(t, core.WorkflowExecution_RUNNING.String(), execution.Phase) assert.Equal(t, closureBytes, execution.Closure) - assert.Equal(t, specBytes, execution.Spec) + assert.Equal(t, getExpectedSpecBytes(), execution.Spec) assert.Equal(t, occurredAt, *execution.StartedAt) assert.Equal(t, time.Duration(0), execution.Duration) assert.Equal(t, occurredAt, *execution.ExecutionUpdatedAt) @@ -2377,7 +2452,7 @@ func TestCreateWorkflowEvent_DuplicateRunning(t *testing.T) { BaseModel: models.BaseModel{ ID: uint(8), }, - Spec: specBytes, + Spec: getExpectedSpecBytes(), Phase: core.WorkflowExecution_RUNNING.String(), Closure: closureBytes, LaunchPlanID: uint(1), @@ -2420,7 +2495,7 @@ func TestCreateWorkflowEvent_InvalidPhaseChange(t *testing.T) { BaseModel: models.BaseModel{ ID: uint(8), }, - Spec: specBytes, + Spec: getExpectedSpecBytes(), Phase: core.WorkflowExecution_SUCCEEDED.String(), Closure: closureBytes, LaunchPlanID: uint(1), @@ -2467,7 +2542,7 @@ func TestCreateWorkflowEvent_ClusterReassignmentOnQueued(t *testing.T) { BaseModel: models.BaseModel{ ID: uint(8), }, - Spec: specBytes, + Spec: getExpectedSpecBytes(), Phase: core.WorkflowExecution_UNDEFINED.String(), Closure: closureBytes, LaunchPlanID: uint(1), @@ -2655,7 +2730,7 @@ func TestCreateWorkflowEvent_IncompatibleCluster(t *testing.T) { BaseModel: models.BaseModel{ ID: uint(8), }, - Spec: specBytes, + Spec: getExpectedSpecBytes(), Phase: core.WorkflowExecution_RUNNING.String(), Closure: closureBytes, LaunchPlanID: uint(1), @@ -2714,7 +2789,7 @@ func TestGetExecution(t *testing.T) { Domain: "domain", Name: "name", }, - Spec: specBytes, + Spec: getExpectedSpecBytes(), Phase: phase, Closure: closureBytes, LaunchPlanID: uint(1), @@ -2732,7 +2807,7 @@ func TestGetExecution(t *testing.T) { }) assert.NoError(t, err) assert.True(t, proto.Equal(&executionIdentifier, execution.Id)) - assert.True(t, proto.Equal(spec, execution.Spec)) + assert.True(t, proto.Equal(getExpectedSpec(), execution.Spec)) assert.True(t, proto.Equal(&closure, execution.Closure)) } @@ -2920,7 +2995,7 @@ func TestListExecutions(t *testing.T) { Domain: domainValue, Name: "my awesome execution", }, - Spec: specBytes, + Spec: getExpectedSpecBytes(), Closure: closureBytes, }, { @@ -2933,7 +3008,7 @@ func TestListExecutions(t *testing.T) { Name: "my other execution", }, Phase: core.WorkflowExecution_SUCCEEDED.String(), - Spec: specBytes, + Spec: getExpectedSpecBytes(), Closure: closureBytes, }, }, @@ -2966,7 +3041,7 @@ func TestListExecutions(t *testing.T) { if idx == 0 { assert.Equal(t, "my awesome execution", execution.Id.Name) } - assert.True(t, proto.Equal(spec, execution.Spec)) + assert.True(t, proto.Equal(getExpectedSpec(), execution.Spec)) assert.True(t, proto.Equal(&closure, execution.Closure)) } assert.Empty(t, executionList.Token) @@ -3150,7 +3225,7 @@ func TestExecutionManager_PublishNotifications(t *testing.T) { LaunchPlanID: uint(1), WorkflowID: uint(2), Closure: execClosureBytes, - Spec: specBytes, + Spec: getExpectedSpecBytes(), } assert.Nil(t, myExecManager.publishNotifications(context.Background(), workflowRequest, executionModel)) } @@ -3259,7 +3334,7 @@ func TestExecutionManager_TestExecutionManager_PublishNotificationsTransformErro LaunchPlanID: uint(1), WorkflowID: uint(2), Closure: execClosureBytes, - Spec: specBytes, + Spec: getExpectedSpecBytes(), } assert.Nil(t, myExecManager.publishNotifications(context.Background(), workflowRequest, executionModel)) @@ -3486,7 +3561,7 @@ func TestGetExecutionData(t *testing.T) { Domain: "domain", Name: "name", }, - Spec: specBytes, + Spec: getExpectedSpecBytes(), Phase: phase, Closure: closureBytes, LaunchPlanID: uint(1), @@ -3661,7 +3736,7 @@ func TestGetExecution_Legacy(t *testing.T) { Domain: "domain", Name: "name", }, - Spec: getLegacySpecBytes(), + Spec: getExpectedLegacySpecBytes(), Phase: phase, Closure: getLegacyClosureBytes(), LaunchPlanID: uint(1), @@ -3678,7 +3753,7 @@ func TestGetExecution_Legacy(t *testing.T) { }) assert.NoError(t, err) assert.True(t, proto.Equal(&executionIdentifier, execution.Id)) - assert.True(t, proto.Equal(getLegacySpec(), execution.Spec)) + assert.True(t, proto.Equal(getExpectedLegacySpec(), execution.Spec)) assert.True(t, proto.Equal(getLegacyClosure(), execution.Closure)) } diff --git a/flyteadmin/pkg/repositories/transformers/execution.go b/flyteadmin/pkg/repositories/transformers/execution.go index e92fbc589..f9e626ebf 100644 --- a/flyteadmin/pkg/repositories/transformers/execution.go +++ b/flyteadmin/pkg/repositories/transformers/execution.go @@ -44,10 +44,12 @@ type CreateExecutionModelInput struct { UserInputsURI storage.DataReference SecurityContext *core.SecurityContext LaunchEntity core.ResourceType + Namespace string } type ExecutionTransformerOptions struct { TrimErrorMessage bool + DefaultNamespace string } var DefaultExecutionTransformerOptions = &ExecutionTransformerOptions{} @@ -63,6 +65,7 @@ func CreateExecutionModel(input CreateExecutionModelInput) (*models.Execution, e } requestSpec.Metadata.SystemMetadata = &admin.SystemMetadata{ ExecutionCluster: input.Cluster, + Namespace: input.Namespace, } requestSpec.SecurityContext = input.SecurityContext spec, err := proto.Marshal(requestSpec) @@ -316,12 +319,25 @@ func GetExecutionIdentifier(executionModel *models.Execution) core.WorkflowExecu } } -func FromExecutionModel(executionModel models.Execution, opts *ExecutionTransformerOptions) (*admin.Execution, error) { +func FromExecutionModel(ctx context.Context, executionModel models.Execution, opts *ExecutionTransformerOptions) (*admin.Execution, error) { var spec admin.ExecutionSpec var err error if err = proto.Unmarshal(executionModel.Spec, &spec); err != nil { return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to unmarshal spec") } + if len(opts.DefaultNamespace) > 0 { + if spec.Metadata == nil { + spec.Metadata = &admin.ExecutionMetadata{} + } + if spec.Metadata.SystemMetadata == nil { + spec.Metadata.SystemMetadata = &admin.SystemMetadata{} + } + if len(spec.GetMetadata().GetSystemMetadata().Namespace) == 0 { + logger.Infof(ctx, "setting execution system metadata namespace to [%s]", opts.DefaultNamespace) + spec.Metadata.SystemMetadata.Namespace = opts.DefaultNamespace + } + } + var closure admin.ExecutionClosure if err = proto.Unmarshal(executionModel.Closure, &closure); err != nil { return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to unmarshal closure") @@ -382,10 +398,10 @@ func PopulateDefaultStateChangeDetails(executionModel models.Execution) (*admin. }, nil } -func FromExecutionModels(executionModels []models.Execution, opts *ExecutionTransformerOptions) ([]*admin.Execution, error) { +func FromExecutionModels(ctx context.Context, executionModels []models.Execution, opts *ExecutionTransformerOptions) ([]*admin.Execution, error) { executions := make([]*admin.Execution, len(executionModels)) for idx, executionModel := range executionModels { - execution, err := FromExecutionModel(executionModel, opts) + execution, err := FromExecutionModel(ctx, executionModel, opts) if err != nil { return nil, err } diff --git a/flyteadmin/pkg/repositories/transformers/execution_test.go b/flyteadmin/pkg/repositories/transformers/execution_test.go index fc42a82cd..43073deb4 100644 --- a/flyteadmin/pkg/repositories/transformers/execution_test.go +++ b/flyteadmin/pkg/repositories/transformers/execution_test.go @@ -72,6 +72,7 @@ func TestCreateExecutionModel(t *testing.T) { IamRole: "iam_role", }, } + namespace := "ns" execution, err := CreateExecutionModel(CreateExecutionModelInput{ WorkflowExecutionID: core.WorkflowExecutionIdentifier{ Project: "project", @@ -89,6 +90,7 @@ func TestCreateExecutionModel(t *testing.T) { Cluster: cluster, SecurityContext: securityCtx, LaunchEntity: core.ResourceType_LAUNCH_PLAN, + Namespace: namespace, }) assert.NoError(t, err) assert.Equal(t, "project", execution.Project) @@ -106,6 +108,7 @@ func TestCreateExecutionModel(t *testing.T) { expectedSpec.Metadata.Principal = principal expectedSpec.Metadata.SystemMetadata = &admin.SystemMetadata{ ExecutionCluster: cluster, + Namespace: namespace, } expectedSpec.SecurityContext = securityCtx expectedSpecBytes, _ := proto.Marshal(expectedSpec) @@ -528,7 +531,7 @@ func TestFromExecutionModel(t *testing.T) { StartedAt: &startedAt, State: &stateInt, } - execution, err := FromExecutionModel(executionModel, DefaultExecutionTransformerOptions) + execution, err := FromExecutionModel(context.TODO(), executionModel, DefaultExecutionTransformerOptions) assert.Nil(t, err) assert.True(t, proto.Equal(&admin.Execution{ Id: &core.WorkflowExecutionIdentifier{ @@ -556,7 +559,7 @@ func TestFromExecutionModel_Aborted(t *testing.T) { AbortCause: abortCause, Closure: executionClosureBytes, } - execution, err := FromExecutionModel(executionModel, DefaultExecutionTransformerOptions) + execution, err := FromExecutionModel(context.TODO(), executionModel, DefaultExecutionTransformerOptions) assert.Nil(t, err) assert.Equal(t, core.WorkflowExecution_ABORTED, execution.Closure.Phase) assert.True(t, proto.Equal(&admin.AbortMetadata{ @@ -564,7 +567,7 @@ func TestFromExecutionModel_Aborted(t *testing.T) { }, execution.Closure.GetAbortMetadata())) executionModel.Phase = core.WorkflowExecution_RUNNING.String() - execution, err = FromExecutionModel(executionModel, DefaultExecutionTransformerOptions) + execution, err = FromExecutionModel(context.TODO(), executionModel, DefaultExecutionTransformerOptions) assert.Nil(t, err) assert.Empty(t, execution.Closure.GetAbortCause()) } @@ -589,7 +592,7 @@ func TestFromExecutionModel_Error(t *testing.T) { Phase: core.WorkflowExecution_FAILED.String(), Closure: executionClosureBytes, } - execution, err := FromExecutionModel(executionModel, &ExecutionTransformerOptions{ + execution, err := FromExecutionModel(context.TODO(), executionModel, &ExecutionTransformerOptions{ TrimErrorMessage: true, }) expectedExecErr := execErr @@ -599,6 +602,29 @@ func TestFromExecutionModel_Error(t *testing.T) { assert.True(t, proto.Equal(expectedExecErr, execution.Closure.GetError())) } +func TestFromExecutionModel_OverwriteNamespace(t *testing.T) { + abortCause := "abort cause" + executionClosureBytes, _ := proto.Marshal(&admin.ExecutionClosure{ + Phase: core.WorkflowExecution_RUNNING, + }) + executionModel := models.Execution{ + ExecutionKey: models.ExecutionKey{ + Project: "project", + Domain: "domain", + Name: "name", + }, + Phase: core.WorkflowExecution_RUNNING.String(), + AbortCause: abortCause, + Closure: executionClosureBytes, + } + overwrittenNamespace := "ns" + execution, err := FromExecutionModel(context.TODO(), executionModel, &ExecutionTransformerOptions{ + DefaultNamespace: overwrittenNamespace, + }) + assert.NoError(t, err) + assert.Equal(t, execution.GetSpec().GetMetadata().GetSystemMetadata().Namespace, overwrittenNamespace) +} + func TestFromExecutionModels(t *testing.T) { spec := testutils.GetExecutionRequest().Spec specBytes, _ := proto.Marshal(spec) @@ -641,7 +667,7 @@ func TestFromExecutionModels(t *testing.T) { State: &stateInt, }, } - executions, err := FromExecutionModels(executionModels, DefaultExecutionTransformerOptions) + executions, err := FromExecutionModels(context.TODO(), executionModels, DefaultExecutionTransformerOptions) assert.Nil(t, err) assert.Len(t, executions, 1) assert.True(t, proto.Equal(&admin.Execution{