Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Save execution namespace in system metadata (#568)
Browse files Browse the repository at this point in the history
  • Loading branch information
katrogan authored May 22, 2023
1 parent d088886 commit 7649683
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 40 deletions.
2 changes: 1 addition & 1 deletion flyteadmin/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/cloudevents/sdk-go/v2 v2.8.0
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/evanphx/json-patch v4.12.0+incompatible
github.com/flyteorg/flyteidl v1.5.5
github.com/flyteorg/flyteidl v1.5.7
github.com/flyteorg/flyteplugins v1.0.56
github.com/flyteorg/flytepropeller v1.1.87
github.com/flyteorg/flytestdlib v1.0.15
Expand Down
4 changes: 2 additions & 2 deletions flyteadmin/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,8 @@ github.com/fatih/structs v1.0.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ=
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flyteorg/flyteidl v1.5.5 h1:tNNhuXPog4atAMSGE2kyAg6JzYy1TvjqrrQeh1EZVHs=
github.com/flyteorg/flyteidl v1.5.5/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og=
github.com/flyteorg/flyteidl v1.5.7 h1:voAxMMFsKOseNFSlCyRGlpegqtQXtJjyxgsQzZg4tts=
github.com/flyteorg/flyteidl v1.5.7/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og=
github.com/flyteorg/flyteplugins v1.0.56 h1:kBTDgTpdSi7wcptk4cMwz5vfh1MU82VaUMMboe1InXw=
github.com/flyteorg/flyteplugins v1.0.56/go.mod h1:aFCKSn8TPzxSAILIiogHtUnHlUCN9+y6Vf+r9R4KZDU=
github.com/flyteorg/flytepropeller v1.1.87 h1:Px7ASDjrWyeVrUb15qXmhw9QK7xPcFjL5Yetr2P6iGM=
Expand Down
22 changes: 14 additions & 8 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func (m *ExecutionManager) getInheritedExecMetadata(ctx context.Context, request
}
sourceExecutionID = sourceExecutionModel.ID
requestSpec.Metadata.Principal = sourceExecutionModel.User
sourceExecution, err := transformers.FromExecutionModel(*sourceExecutionModel, transformers.DefaultExecutionTransformerOptions)
sourceExecution, err := transformers.FromExecutionModel(ctx, *sourceExecutionModel, transformers.DefaultExecutionTransformerOptions)
if err != nil {
logger.Errorf(ctx, "Failed transform parent execution model for child execution [%+v] with err: %v", workflowExecutionID, err)
return parentNodeExecutionID, sourceExecutionID, err
Expand Down Expand Up @@ -650,6 +650,7 @@ func (m *ExecutionManager) launchSingleTaskExecution(
UserInputsURI: userInputsURI,
SecurityContext: executionConfig.SecurityContext,
LaunchEntity: taskIdentifier.ResourceType,
Namespace: namespace,
})
if err != nil {
logger.Infof(ctx, "Failed to create execution model in transformer for id: [%+v] with err: %v",
Expand Down Expand Up @@ -905,6 +906,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
UserInputsURI: userInputsURI,
SecurityContext: executionConfig.SecurityContext,
LaunchEntity: launchPlan.Id.ResourceType,
Namespace: namespace,
})
if err != nil {
logger.Infof(ctx, "Failed to create execution model in transformer for id: [%+v] with err: %v",
Expand Down Expand Up @@ -965,7 +967,7 @@ func (m *ExecutionManager) RelaunchExecution(
logger.Debugf(ctx, "Failed to get execution model for request [%+v] with err %v", request, err)
return nil, err
}
existingExecution, err := transformers.FromExecutionModel(*existingExecutionModel, transformers.DefaultExecutionTransformerOptions)
existingExecution, err := transformers.FromExecutionModel(ctx, *existingExecutionModel, transformers.DefaultExecutionTransformerOptions)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1022,7 +1024,7 @@ func (m *ExecutionManager) RecoverExecution(
logger.Debugf(ctx, "Failed to get execution model for request [%+v] with err %v", request, err)
return nil, err
}
existingExecution, err := transformers.FromExecutionModel(*existingExecutionModel, transformers.DefaultExecutionTransformerOptions)
existingExecution, err := transformers.FromExecutionModel(ctx, *existingExecutionModel, transformers.DefaultExecutionTransformerOptions)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1073,7 +1075,7 @@ func (m *ExecutionManager) emitScheduledWorkflowMetrics(
return
}
// Find the reference launch plan to get the kickoff time argument
execution, err := transformers.FromExecutionModel(*executionModel, transformers.DefaultExecutionTransformerOptions)
execution, err := transformers.FromExecutionModel(ctx, *executionModel, transformers.DefaultExecutionTransformerOptions)
if err != nil {
logger.Warningf(context.Background(),
"failed to transform execution model when emitting scheduled workflow execution stats with for "+
Expand Down Expand Up @@ -1316,7 +1318,11 @@ func (m *ExecutionManager) GetExecution(
logger.Debugf(ctx, "Failed to get execution model for request [%+v] with err: %v", request, err)
return nil, err
}
execution, transformerErr := transformers.FromExecutionModel(*executionModel, transformers.DefaultExecutionTransformerOptions)
namespace := common.GetNamespaceName(
m.config.NamespaceMappingConfiguration().GetNamespaceTemplate(), request.GetId().GetProject(), request.GetId().GetDomain())
execution, transformerErr := transformers.FromExecutionModel(ctx, *executionModel, &transformers.ExecutionTransformerOptions{
DefaultNamespace: namespace,
})
if transformerErr != nil {
logger.Debugf(ctx, "Failed to transform execution model [%+v] to proto object with err: %v", request.Id,
transformerErr)
Expand Down Expand Up @@ -1359,7 +1365,7 @@ func (m *ExecutionManager) GetExecutionData(
logger.Debugf(ctx, "Failed to get execution model for request [%+v] with err: %v", request, err)
return nil, err
}
execution, err := transformers.FromExecutionModel(*executionModel, transformers.DefaultExecutionTransformerOptions)
execution, err := transformers.FromExecutionModel(ctx, *executionModel, transformers.DefaultExecutionTransformerOptions)
if err != nil {
logger.Debugf(ctx, "Failed to transform execution model [%+v] to proto object with err: %v", request.Id, err)
return nil, err
Expand Down Expand Up @@ -1459,7 +1465,7 @@ func (m *ExecutionManager) ListExecutions(
logger.Debugf(ctx, "Failed to list executions using input [%+v] with err %v", listExecutionsInput, err)
return nil, err
}
executionList, err := transformers.FromExecutionModels(output.Executions, transformers.ListExecutionTransformerOptions)
executionList, err := transformers.FromExecutionModels(ctx, output.Executions, transformers.ListExecutionTransformerOptions)
if err != nil {
logger.Errorf(ctx,
"Failed to transform execution models [%+v] with err: %v", output.Executions, err)
Expand Down Expand Up @@ -1489,7 +1495,7 @@ func (m *ExecutionManager) ListExecutions(
func (m *ExecutionManager) publishNotifications(ctx context.Context, request admin.WorkflowExecutionEventRequest,
execution models.Execution) error {
// Notifications are stored in the Spec object of an admin.Execution object.
adminExecution, err := transformers.FromExecutionModel(execution, transformers.DefaultExecutionTransformerOptions)
adminExecution, err := transformers.FromExecutionModel(ctx, execution, transformers.DefaultExecutionTransformerOptions)
if err != nil {
// This shouldn't happen because execution manager marshaled the data into models.Execution.
m.systemMetrics.TransformerError.Inc()
Expand Down
117 changes: 96 additions & 21 deletions flyteadmin/pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,37 @@ func getLegacySpecBytes() []byte {
return b
}

func getExpectedLegacySpec() *admin.ExecutionSpec {
expectedLegacySpec := getLegacySpec()
expectedLegacySpec.Metadata = &admin.ExecutionMetadata{
SystemMetadata: &admin.SystemMetadata{
Namespace: "project-domain",
},
}
return expectedLegacySpec
}

func getExpectedLegacySpecBytes() []byte {
expectedLegacySpec := getExpectedLegacySpec()
b, _ := proto.Marshal(expectedLegacySpec)
return b
}

func getExpectedSpec() *admin.ExecutionSpec {
expectedSpec := testutils.GetExecutionRequest().Spec
expectedSpec.Metadata = &admin.ExecutionMetadata{
SystemMetadata: &admin.SystemMetadata{
Namespace: "project-domain",
},
}
return expectedSpec
}

func getExpectedSpecBytes() []byte {
specBytes, _ := proto.Marshal(getExpectedSpec())
return specBytes
}

func getLegacyClosure() *admin.ExecutionClosure {
return &admin.ExecutionClosure{
Phase: core.WorkflowExecution_RUNNING,
Expand Down Expand Up @@ -287,6 +318,7 @@ func TestCreateExecution(t *testing.T) {
assert.Equal(t, rawOutput, spec.RawOutputDataConfig.OutputLocationPrefix)
assert.True(t, proto.Equal(spec.ClusterAssignment, &clusterAssignment))
assert.Equal(t, "launch_plan", input.LaunchEntity)
assert.Equal(t, spec.GetMetadata().GetSystemMetadata().Namespace, "project-domain")
return nil
})
setDefaultLpCallbackForExecTest(repository)
Expand Down Expand Up @@ -1167,6 +1199,49 @@ func TestCreateExecutionWithEnvs(t *testing.T) {
}
}

func TestCreateExecution_CustomNamespaceMappingConfig(t *testing.T) {
request := testutils.GetExecutionRequest()
repository := getMockRepositoryForExecTest()
storageClient := getMockStorageForExecTest(context.Background())
setDefaultLpCallbackForExecTest(repository)
mockClock := clock.NewMock()
createdAt := time.Now()
mockClock.Set(createdAt)
exCreateFunc := func(ctx context.Context, input models.Execution) error {
var spec admin.ExecutionSpec
err := proto.Unmarshal(input.Spec, &spec)
assert.NoError(t, err)
assert.Equal(t, spec.GetMetadata().GetSystemMetadata().Namespace, "project")
return nil
}

repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetCreateCallback(exCreateFunc)
mockExecutor := workflowengineMocks.WorkflowExecutor{}
mockExecutor.OnExecuteMatch(mock.Anything, mock.Anything, mock.Anything).Return(workflowengineInterfaces.ExecutionResponse{}, nil)
mockExecutor.OnID().Return("testMockExecutor")
r := plugins.NewRegistry()
r.RegisterDefault(plugins.PluginIDWorkflowExecutor, &mockExecutor)

mockNs := runtimeMocks.NamespaceMappingConfiguration{}
mockNs.OnGetNamespaceTemplate().Return("{{ project }}")
mockExecutionsConfigProvider := runtimeMocks.NewMockConfigurationProvider(
testutils.GetApplicationConfigWithDefaultDomains(),
runtimeMocks.NewMockQueueConfigurationProvider(
[]runtimeInterfaces.ExecutionQueue{}, []runtimeInterfaces.WorkflowConfig{}),
nil,
runtimeMocks.NewMockTaskResourceConfiguration(resourceDefaults, resourceLimits), nil, &mockNs)
mockExecutionsConfigProvider.(*runtimeMocks.MockConfigurationProvider).AddRegistrationValidationConfiguration(
runtimeMocks.NewMockRegistrationValidationProvider())

execManager := NewExecutionManager(repository, r, mockExecutionsConfigProvider, storageClient, mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{})

execManager.(*ExecutionManager)._clock = mockClock

response, err := execManager.CreateExecution(context.Background(), request, requestedAt)
assert.Nil(t, err)
assert.True(t, proto.Equal(&executionIdentifier, response.Id))
}

func makeExecutionGetFunc(
t *testing.T, closureBytes []byte, startTime *time.Time) repositoryMocks.GetExecutionFunc {
return func(ctx context.Context, input interfaces.Identifier) (models.Execution, error) {
Expand All @@ -1182,7 +1257,7 @@ func makeExecutionGetFunc(
BaseModel: models.BaseModel{
ID: uint(8),
},
Spec: specBytes,
Spec: getExpectedSpecBytes(),
Phase: core.WorkflowExecution_QUEUED.String(),
Closure: closureBytes,
LaunchPlanID: uint(1),
Expand Down Expand Up @@ -1769,7 +1844,7 @@ func TestRecoverExecution_RecoveredChildNode(t *testing.T) {
switch input.Name {
case "name":
return models.Execution{
Spec: specBytes,
Spec: getExpectedSpecBytes(),
Closure: existingClosureBytes,
BaseModel: models.BaseModel{
ID: referencedExecutionID,
Expand Down Expand Up @@ -2148,7 +2223,7 @@ func TestCreateWorkflowEvent(t *testing.T) {
assert.Equal(t, uint(2), execution.WorkflowID)
assert.Equal(t, core.WorkflowExecution_FAILED.String(), execution.Phase)
assert.Equal(t, closureBytes, execution.Closure)
assert.Equal(t, specBytes, execution.Spec)
assert.Equal(t, getExpectedSpecBytes(), execution.Spec)
assert.Equal(t, startTime, *execution.StartedAt)
assert.Equal(t, duration, execution.Duration)
return nil
Expand Down Expand Up @@ -2188,7 +2263,7 @@ func TestCreateWorkflowEvent_TerminalState(t *testing.T) {
BaseModel: models.BaseModel{
ID: uint(8),
},
Spec: specBytes,
Spec: getExpectedSpecBytes(),
Phase: core.WorkflowExecution_FAILED.String(),
}, nil
}
Expand Down Expand Up @@ -2228,7 +2303,7 @@ func TestCreateWorkflowEvent_NoRunningToQueued(t *testing.T) {
Domain: "domain",
Name: "name",
},
Spec: specBytes,
Spec: getExpectedSpecBytes(),
Phase: core.WorkflowExecution_RUNNING.String(),
}, nil
}
Expand Down Expand Up @@ -2264,7 +2339,7 @@ func TestCreateWorkflowEvent_CurrentlyAborting(t *testing.T) {
Domain: "domain",
Name: "name",
},
Spec: specBytes,
Spec: getExpectedSpecBytes(),
Phase: core.WorkflowExecution_ABORTING.String(),
}, nil
}
Expand Down Expand Up @@ -2335,7 +2410,7 @@ func TestCreateWorkflowEvent_StartedRunning(t *testing.T) {
assert.Equal(t, uint(2), execution.WorkflowID)
assert.Equal(t, core.WorkflowExecution_RUNNING.String(), execution.Phase)
assert.Equal(t, closureBytes, execution.Closure)
assert.Equal(t, specBytes, execution.Spec)
assert.Equal(t, getExpectedSpecBytes(), execution.Spec)
assert.Equal(t, occurredAt, *execution.StartedAt)
assert.Equal(t, time.Duration(0), execution.Duration)
assert.Equal(t, occurredAt, *execution.ExecutionUpdatedAt)
Expand Down Expand Up @@ -2377,7 +2452,7 @@ func TestCreateWorkflowEvent_DuplicateRunning(t *testing.T) {
BaseModel: models.BaseModel{
ID: uint(8),
},
Spec: specBytes,
Spec: getExpectedSpecBytes(),
Phase: core.WorkflowExecution_RUNNING.String(),
Closure: closureBytes,
LaunchPlanID: uint(1),
Expand Down Expand Up @@ -2420,7 +2495,7 @@ func TestCreateWorkflowEvent_InvalidPhaseChange(t *testing.T) {
BaseModel: models.BaseModel{
ID: uint(8),
},
Spec: specBytes,
Spec: getExpectedSpecBytes(),
Phase: core.WorkflowExecution_SUCCEEDED.String(),
Closure: closureBytes,
LaunchPlanID: uint(1),
Expand Down Expand Up @@ -2467,7 +2542,7 @@ func TestCreateWorkflowEvent_ClusterReassignmentOnQueued(t *testing.T) {
BaseModel: models.BaseModel{
ID: uint(8),
},
Spec: specBytes,
Spec: getExpectedSpecBytes(),
Phase: core.WorkflowExecution_UNDEFINED.String(),
Closure: closureBytes,
LaunchPlanID: uint(1),
Expand Down Expand Up @@ -2655,7 +2730,7 @@ func TestCreateWorkflowEvent_IncompatibleCluster(t *testing.T) {
BaseModel: models.BaseModel{
ID: uint(8),
},
Spec: specBytes,
Spec: getExpectedSpecBytes(),
Phase: core.WorkflowExecution_RUNNING.String(),
Closure: closureBytes,
LaunchPlanID: uint(1),
Expand Down Expand Up @@ -2714,7 +2789,7 @@ func TestGetExecution(t *testing.T) {
Domain: "domain",
Name: "name",
},
Spec: specBytes,
Spec: getExpectedSpecBytes(),
Phase: phase,
Closure: closureBytes,
LaunchPlanID: uint(1),
Expand All @@ -2732,7 +2807,7 @@ func TestGetExecution(t *testing.T) {
})
assert.NoError(t, err)
assert.True(t, proto.Equal(&executionIdentifier, execution.Id))
assert.True(t, proto.Equal(spec, execution.Spec))
assert.True(t, proto.Equal(getExpectedSpec(), execution.Spec))
assert.True(t, proto.Equal(&closure, execution.Closure))
}

Expand Down Expand Up @@ -2920,7 +2995,7 @@ func TestListExecutions(t *testing.T) {
Domain: domainValue,
Name: "my awesome execution",
},
Spec: specBytes,
Spec: getExpectedSpecBytes(),
Closure: closureBytes,
},
{
Expand All @@ -2933,7 +3008,7 @@ func TestListExecutions(t *testing.T) {
Name: "my other execution",
},
Phase: core.WorkflowExecution_SUCCEEDED.String(),
Spec: specBytes,
Spec: getExpectedSpecBytes(),
Closure: closureBytes,
},
},
Expand Down Expand Up @@ -2966,7 +3041,7 @@ func TestListExecutions(t *testing.T) {
if idx == 0 {
assert.Equal(t, "my awesome execution", execution.Id.Name)
}
assert.True(t, proto.Equal(spec, execution.Spec))
assert.True(t, proto.Equal(getExpectedSpec(), execution.Spec))
assert.True(t, proto.Equal(&closure, execution.Closure))
}
assert.Empty(t, executionList.Token)
Expand Down Expand Up @@ -3150,7 +3225,7 @@ func TestExecutionManager_PublishNotifications(t *testing.T) {
LaunchPlanID: uint(1),
WorkflowID: uint(2),
Closure: execClosureBytes,
Spec: specBytes,
Spec: getExpectedSpecBytes(),
}
assert.Nil(t, myExecManager.publishNotifications(context.Background(), workflowRequest, executionModel))
}
Expand Down Expand Up @@ -3259,7 +3334,7 @@ func TestExecutionManager_TestExecutionManager_PublishNotificationsTransformErro
LaunchPlanID: uint(1),
WorkflowID: uint(2),
Closure: execClosureBytes,
Spec: specBytes,
Spec: getExpectedSpecBytes(),
}
assert.Nil(t, myExecManager.publishNotifications(context.Background(), workflowRequest, executionModel))

Expand Down Expand Up @@ -3486,7 +3561,7 @@ func TestGetExecutionData(t *testing.T) {
Domain: "domain",
Name: "name",
},
Spec: specBytes,
Spec: getExpectedSpecBytes(),
Phase: phase,
Closure: closureBytes,
LaunchPlanID: uint(1),
Expand Down Expand Up @@ -3661,7 +3736,7 @@ func TestGetExecution_Legacy(t *testing.T) {
Domain: "domain",
Name: "name",
},
Spec: getLegacySpecBytes(),
Spec: getExpectedLegacySpecBytes(),
Phase: phase,
Closure: getLegacyClosureBytes(),
LaunchPlanID: uint(1),
Expand All @@ -3678,7 +3753,7 @@ func TestGetExecution_Legacy(t *testing.T) {
})
assert.NoError(t, err)
assert.True(t, proto.Equal(&executionIdentifier, execution.Id))
assert.True(t, proto.Equal(getLegacySpec(), execution.Spec))
assert.True(t, proto.Equal(getExpectedLegacySpec(), execution.Spec))
assert.True(t, proto.Equal(getLegacyClosure(), execution.Closure))
}

Expand Down
Loading

0 comments on commit 7649683

Please sign in to comment.