From eeecf99af9584fa9635b08465be0559f8459682f Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 4 May 2023 19:57:12 -0700 Subject: [PATCH 1/8] Add envs to execution spec Signed-off-by: Kevin Su --- go.mod | 8 +++++--- go.sum | 12 ++++++------ pkg/manager/impl/execution_manager_test.go | 18 ++++++++++++++++++ pkg/manager/impl/shared/iface.go | 2 ++ pkg/manager/impl/util/shared.go | 5 +++++ .../interfaces/application_configuration.go | 9 +++++++++ 6 files changed, 45 insertions(+), 9 deletions(-) diff --git a/go.mod b/go.mod index 6710d33bb..b33cc30e4 100644 --- a/go.mod +++ b/go.mod @@ -13,9 +13,9 @@ require ( github.com/cloudevents/sdk-go/v2 v2.8.0 github.com/coreos/go-oidc v2.2.1+incompatible github.com/evanphx/json-patch v4.12.0+incompatible - github.com/flyteorg/flyteidl v1.3.14 - github.com/flyteorg/flyteplugins v1.0.40 - github.com/flyteorg/flytepropeller v1.1.70 + github.com/flyteorg/flyteidl v1.3.19 + github.com/flyteorg/flyteplugins v1.0.56 + github.com/flyteorg/flytepropeller v1.1.87 github.com/flyteorg/flytestdlib v1.0.15 github.com/flyteorg/stow v0.3.6 github.com/ghodss/yaml v1.0.0 @@ -209,3 +209,5 @@ require ( ) replace github.com/robfig/cron/v3 => github.com/unionai/cron/v3 v3.0.2-0.20210825070134-bfc34418fe84 + +replace github.com/flyteorg/flyteidl => github.com/flyteorg/flyteidl v1.5.2-0.20230504203702-1f2dc1fbedb0 diff --git a/go.sum b/go.sum index f2fe7c70b..090791b7d 100644 --- a/go.sum +++ b/go.sum @@ -312,12 +312,12 @@ github.com/fatih/structs v1.0.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ= github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= -github.com/flyteorg/flyteidl v1.3.14 h1:o5M0g/r6pXTPu5PEurbYxbQmuOu3hqqsaI2M6uvK0N8= -github.com/flyteorg/flyteidl v1.3.14/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM= -github.com/flyteorg/flyteplugins v1.0.40 h1:RTsYingqmqr13qBbi4CB2ArXDHNHUOkAF+HTLJQiQ/s= -github.com/flyteorg/flyteplugins v1.0.40/go.mod h1:qyUPqVspLcLGJpKxVwHDWf+kBpOGuItOxCaF6zAmDio= -github.com/flyteorg/flytepropeller v1.1.70 h1:/d1qqz13rdVADM85ST70eerAdBstJJz9UUB/mNSZi0w= -github.com/flyteorg/flytepropeller v1.1.70/go.mod h1:MezHUJmgPzm4Pu8nIy6LLiEkxNA6buTQ7hInSqCViTY= +github.com/flyteorg/flyteidl v1.5.2-0.20230504203702-1f2dc1fbedb0 h1:0jWt1RKWaNWh8DHyRLex+WaZaotn6zLjMCAjU8xkcn4= +github.com/flyteorg/flyteidl v1.5.2-0.20230504203702-1f2dc1fbedb0/go.mod h1:ckLjB51moX4L0oQml+WTCrPK50zrJf6IZJ6LPC0RB4I= +github.com/flyteorg/flyteplugins v1.0.56 h1:kBTDgTpdSi7wcptk4cMwz5vfh1MU82VaUMMboe1InXw= +github.com/flyteorg/flyteplugins v1.0.56/go.mod h1:aFCKSn8TPzxSAILIiogHtUnHlUCN9+y6Vf+r9R4KZDU= +github.com/flyteorg/flytepropeller v1.1.87 h1:Px7ASDjrWyeVrUb15qXmhw9QK7xPcFjL5Yetr2P6iGM= +github.com/flyteorg/flytepropeller v1.1.87/go.mod h1:rBTB2jJpSZL1SvbgyiVh5Cobh3Azi/FvawXMxqB/uvo= github.com/flyteorg/flytestdlib v1.0.15 h1:kv9jDQmytbE84caY+pkZN8trJU2ouSAmESzpTEhfTt0= github.com/flyteorg/flytestdlib v1.0.15/go.mod h1:ghw/cjY0sEWIIbyCtcJnL/Gt7ZS7gf9SUi0CCPhbz3s= github.com/flyteorg/stow v0.3.6 h1:jt50ciM14qhKBaIrB+ppXXY+SXB59FNREFgTJqCyqIk= diff --git a/pkg/manager/impl/execution_manager_test.go b/pkg/manager/impl/execution_manager_test.go index a3c384312..59d47fae8 100644 --- a/pkg/manager/impl/execution_manager_test.go +++ b/pkg/manager/impl/execution_manager_test.go @@ -4208,6 +4208,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) { requestMaxParallelism := int32(10) requestInterruptible := false requestOverwriteCache := false + requestEnvironmentVariables := map[string]string{"hello": "world"} launchPlanLabels := map[string]string{"launchPlanLabelKey": "launchPlanLabelValue"} launchPlanAnnotations := map[string]string{"launchPlanAnnotationKey": "launchPlanAnnotationValue"} @@ -4217,6 +4218,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) { launchPlanMaxParallelism := int32(50) launchPlanInterruptible := true launchPlanOverwriteCache := true + launchPlanEnvironmentVariables := map[string]string{"foo": "bar"} applicationConfig := runtime.NewConfigurationProvider() @@ -4305,6 +4307,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) { MaxParallelism: requestMaxParallelism, Interruptible: &wrappers.BoolValue{Value: requestInterruptible}, OverwriteCache: requestOverwriteCache, + Envs: &admin.Envs{Values: requestEnvironmentVariables}, }, } execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, nil) @@ -4316,6 +4319,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) { assert.Equal(t, requestOutputLocationPrefix, execConfig.RawOutputDataConfig.OutputLocationPrefix) assert.Equal(t, requestLabels, execConfig.GetLabels().Values) assert.Equal(t, requestAnnotations, execConfig.GetAnnotations().Values) + assert.Equal(t, requestEnvironmentVariables, execConfig.GetEnvs().Values) }) t.Run("request with partial config", func(t *testing.T) { request := &admin.ExecutionCreateRequest{ @@ -4343,6 +4347,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) { MaxParallelism: launchPlanMaxParallelism, Interruptible: &wrappers.BoolValue{Value: launchPlanInterruptible}, OverwriteCache: launchPlanOverwriteCache, + Envs: &admin.Envs{Values: launchPlanEnvironmentVariables}, }, } execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan) @@ -4354,6 +4359,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) { assert.True(t, proto.Equal(launchPlan.Spec.Annotations, execConfig.Annotations)) assert.Equal(t, requestOutputLocationPrefix, execConfig.RawOutputDataConfig.OutputLocationPrefix) assert.Equal(t, requestLabels, execConfig.GetLabels().Values) + assert.Equal(t, launchPlanEnvironmentVariables, execConfig.GetEnvs().Values) }) t.Run("request with empty security context", func(t *testing.T) { request := &admin.ExecutionCreateRequest{ @@ -4381,6 +4387,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) { MaxParallelism: launchPlanMaxParallelism, Interruptible: &wrappers.BoolValue{Value: launchPlanInterruptible}, OverwriteCache: launchPlanOverwriteCache, + Envs: &admin.Envs{Values: launchPlanEnvironmentVariables}, }, } execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan) @@ -4391,6 +4398,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) { assert.Equal(t, launchPlanK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount) assert.Equal(t, launchPlanOutputLocationPrefix, execConfig.RawOutputDataConfig.OutputLocationPrefix) assert.Equal(t, launchPlanLabels, execConfig.GetLabels().Values) + assert.Equal(t, launchPlanEnvironmentVariables, execConfig.GetEnvs().Values) }) t.Run("request with no config", func(t *testing.T) { request := &admin.ExecutionCreateRequest{ @@ -4413,6 +4421,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) { MaxParallelism: launchPlanMaxParallelism, Interruptible: &wrappers.BoolValue{Value: launchPlanInterruptible}, OverwriteCache: launchPlanOverwriteCache, + Envs: &admin.Envs{Values: launchPlanEnvironmentVariables}, }, } execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan) @@ -4424,6 +4433,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) { assert.Equal(t, launchPlanOutputLocationPrefix, execConfig.RawOutputDataConfig.OutputLocationPrefix) assert.Equal(t, launchPlanLabels, execConfig.GetLabels().Values) assert.Equal(t, launchPlanAnnotations, execConfig.GetAnnotations().Values) + assert.Equal(t, launchPlanEnvironmentVariables, execConfig.GetEnvs().Values) }) t.Run("launchplan with partial config", func(t *testing.T) { request := &admin.ExecutionCreateRequest{ @@ -4474,6 +4484,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) { assert.Equal(t, rmOutputLocationPrefix, execConfig.RawOutputDataConfig.OutputLocationPrefix) assert.Equal(t, rmLabels, execConfig.GetLabels().Values) assert.Equal(t, rmAnnotations, execConfig.GetAnnotations().Values) + assert.Nil(t, execConfig.GetEnvs()) }) t.Run("matchable resource partial config", func(t *testing.T) { resourceManager.GetResourceFunc = func(ctx context.Context, @@ -4520,6 +4531,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) { assert.Nil(t, execConfig.GetRawOutputDataConfig()) assert.Nil(t, execConfig.GetLabels()) assert.Equal(t, rmAnnotations, execConfig.GetAnnotations().Values) + assert.Nil(t, execConfig.GetEnvs()) }) t.Run("matchable resource with no config", func(t *testing.T) { resourceManager.GetResourceFunc = func(ctx context.Context, @@ -4557,6 +4569,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) { assert.Nil(t, execConfig.GetRawOutputDataConfig()) assert.Nil(t, execConfig.GetLabels()) assert.Nil(t, execConfig.GetAnnotations()) + assert.Nil(t, execConfig.GetEnvs()) }) t.Run("fetch security context from deprecated config", func(t *testing.T) { resourceManager.GetResourceFunc = func(ctx context.Context, @@ -4599,6 +4612,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) { assert.Nil(t, execConfig.GetRawOutputDataConfig()) assert.Nil(t, execConfig.GetLabels()) assert.Nil(t, execConfig.GetAnnotations()) + assert.Nil(t, execConfig.GetEnvs()) }) t.Run("matchable resource workflow resource", func(t *testing.T) { resourceManager.GetResourceFunc = func(ctx context.Context, @@ -4652,6 +4666,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) { assert.Nil(t, execConfig.GetRawOutputDataConfig()) assert.Nil(t, execConfig.GetLabels()) assert.Nil(t, execConfig.GetAnnotations()) + assert.Nil(t, execConfig.GetEnvs()) }) t.Run("matchable resource failure", func(t *testing.T) { resourceManager.GetResourceFunc = func(ctx context.Context, @@ -4682,6 +4697,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) { assert.Nil(t, execConfig.GetRawOutputDataConfig()) assert.Nil(t, execConfig.GetLabels()) assert.Nil(t, execConfig.GetAnnotations()) + assert.Nil(t, execConfig.GetEnvs()) }) t.Run("application configuration", func(t *testing.T) { resourceManager.GetResourceFunc = func(ctx context.Context, @@ -4790,6 +4806,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) { launchPlan := &admin.LaunchPlan{ Spec: &admin.LaunchPlanSpec{ Interruptible: &wrappers.BoolValue{Value: true}, + Envs: &admin.Envs{Values: map[string]string{"foo": "bar"}}, }, } @@ -4801,6 +4818,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) { assert.Nil(t, execConfig.GetRawOutputDataConfig()) assert.Nil(t, execConfig.GetLabels()) assert.Nil(t, execConfig.GetAnnotations()) + assert.Equal(t, "bar", execConfig.Envs.Values["foo"]) }) t.Run("launch plan with no interruptible override specified", func(t *testing.T) { request := &admin.ExecutionCreateRequest{ diff --git a/pkg/manager/impl/shared/iface.go b/pkg/manager/impl/shared/iface.go index 7baae65a1..98609a5ff 100644 --- a/pkg/manager/impl/shared/iface.go +++ b/pkg/manager/impl/shared/iface.go @@ -24,4 +24,6 @@ type WorkflowExecutionConfigInterface interface { GetInterruptible() *wrappers.BoolValue // GetOverwriteCache indicates a workflow should skip all its cached results and re-compute its output, overwriting any already stored data. GetOverwriteCache() bool + // GetEnvironmentVariables defines environment variables to be set for the execution. + GetEnvs() *admin.Envs } diff --git a/pkg/manager/impl/util/shared.go b/pkg/manager/impl/util/shared.go index bf9490473..31cf89608 100644 --- a/pkg/manager/impl/util/shared.go +++ b/pkg/manager/impl/util/shared.go @@ -325,5 +325,10 @@ func MergeIntoExecConfig(workflowExecConfig admin.WorkflowExecutionConfig, spec workflowExecConfig.OverwriteCache = spec.GetOverwriteCache() } + if (workflowExecConfig.GetEnvs() == nil || len(workflowExecConfig.GetEnvs().Values) == 0) && + (spec.GetEnvs() != nil && len(spec.GetEnvs().Values) > 0) { + workflowExecConfig.Envs = spec.GetEnvs() + } + return workflowExecConfig } diff --git a/pkg/runtime/interfaces/application_configuration.go b/pkg/runtime/interfaces/application_configuration.go index 16b1f921d..7b25e7143 100644 --- a/pkg/runtime/interfaces/application_configuration.go +++ b/pkg/runtime/interfaces/application_configuration.go @@ -94,6 +94,9 @@ type ApplicationConfig struct { // Enabling will use Storage (s3/gcs/etc) to offload static parts of CRDs. UseOffloadedWorkflowClosure bool `json:"useOffloadedWorkflowClosure"` + + // Environment variables to be set for the execution. + Envs map[string]string `json:"envs,omitempty"` } func (a *ApplicationConfig) GetRoleNameKey() string { @@ -166,6 +169,12 @@ func (a *ApplicationConfig) GetOverwriteCache() bool { return a.OverwriteCache } +func (a *ApplicationConfig) GetEnvs() *admin.Envs { + return &admin.Envs{ + Values: a.Envs, + } +} + // GetAsWorkflowExecutionConfig returns the WorkflowExecutionConfig as extracted from this object func (a *ApplicationConfig) GetAsWorkflowExecutionConfig() admin.WorkflowExecutionConfig { // These values should always be set as their fallback values equals to their zero value or nil, From cf4d2c3018bbc8cb6adc2886d6578441bfbd5bc4 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 5 May 2023 01:15:21 -0700 Subject: [PATCH 2/8] update Signed-off-by: Kevin Su --- pkg/manager/impl/execution_manager_test.go | 224 ++++++++++++++++++ pkg/manager/impl/shared/iface.go | 2 +- pkg/manager/impl/testutils/mock_requests.go | 1 + pkg/workflowengine/impl/prepare_execution.go | 1 + .../impl/prepare_execution_test.go | 8 + 5 files changed, 235 insertions(+), 1 deletion(-) diff --git a/pkg/manager/impl/execution_manager_test.go b/pkg/manager/impl/execution_manager_test.go index 59d47fae8..10962b9dc 100644 --- a/pkg/manager/impl/execution_manager_test.go +++ b/pkg/manager/impl/execution_manager_test.go @@ -1083,6 +1083,86 @@ func TestCreateExecutionOverwriteCache(t *testing.T) { } } +func TestCreateExecutionOverwriteEnvs(t *testing.T) { + tests := []struct { + name string + task bool + overwriteEnvs map[string]string + want map[string]string + }{ + { + name: "LaunchPlanDefault", + task: false, + overwriteEnvs: nil, + want: nil, + }, + { + name: "LaunchPlanEnable", + task: false, + overwriteEnvs: map[string]string{"foo": "bar"}, + want: map[string]string{"foo": "bar"}, + }, + { + name: "TaskDefault", + task: false, + overwriteEnvs: nil, + want: nil, + }, + { + name: "TaskEnable", + task: true, + overwriteEnvs: map[string]string{"foo": "bar"}, + want: map[string]string{"foo": "bar"}, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + request := testutils.GetExecutionRequest() + if tt.task { + request.Spec.LaunchPlan.ResourceType = core.ResourceType_TASK + } + request.Spec.Envs.Values = tt.overwriteEnvs + + repository := getMockRepositoryForExecTest() + setDefaultLpCallbackForExecTest(repository) + setDefaultTaskCallbackForExecTest(repository) + + exCreateFunc := func(ctx context.Context, input models.Execution) error { + var spec admin.ExecutionSpec + err := proto.Unmarshal(input.Spec, &spec) + assert.Nil(t, err) + + if tt.task { + assert.Equal(t, uint(0), input.LaunchPlanID) + assert.NotEqual(t, uint(0), input.TaskID) + } else { + assert.NotEqual(t, uint(0), input.LaunchPlanID) + assert.Equal(t, uint(0), input.TaskID) + } + + assert.Equal(t, tt.overwriteEnvs, spec.GetEnvs().Values) + + return nil + } + + repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetCreateCallback(exCreateFunc) + mockExecutor := workflowengineMocks.WorkflowExecutor{} + mockExecutor.OnExecuteMatch(mock.Anything, mock.Anything, mock.Anything).Return(workflowengineInterfaces.ExecutionResponse{}, nil) + mockExecutor.OnID().Return("testMockExecutor") + r := plugins.NewRegistry() + r.RegisterDefault(plugins.PluginIDWorkflowExecutor, &mockExecutor) + execManager := NewExecutionManager(repository, r, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{}) + + _, err := execManager.CreateExecution(context.Background(), request, requestedAt) + assert.Nil(t, err) + }) + } +} + func makeExecutionGetFunc( t *testing.T, closureBytes []byte, startTime *time.Time) repositoryMocks.GetExecutionFunc { return func(ctx context.Context, input interfaces.Identifier) (models.Execution, error) { @@ -1205,6 +1285,39 @@ func makeExecutionOverwriteCacheGetFunc( } } +func makeExecutionOverwriteEnvs( + t *testing.T, closureBytes []byte, startTime *time.Time, overwriteEnvs map[string]string) repositoryMocks.GetExecutionFunc { + return func(ctx context.Context, input interfaces.Identifier) (models.Execution, error) { + assert.Equal(t, "project", input.Project) + assert.Equal(t, "domain", input.Domain) + assert.Equal(t, "name", input.Name) + + request := testutils.GetExecutionRequest() + request.Spec.Envs.Values = overwriteEnvs + + specBytes, err := proto.Marshal(request.Spec) + assert.Nil(t, err) + + return models.Execution{ + ExecutionKey: models.ExecutionKey{ + Project: "project", + Domain: "domain", + Name: "name", + }, + BaseModel: models.BaseModel{ + ID: uint(8), + }, + Spec: specBytes, + Phase: core.WorkflowExecution_QUEUED.String(), + Closure: closureBytes, + LaunchPlanID: uint(1), + WorkflowID: uint(2), + StartedAt: startTime, + Cluster: testCluster, + }, nil + } +} + func TestRelaunchExecution(t *testing.T) { // Set up mocks. repository := getMockRepositoryForExecTest() @@ -1518,6 +1631,57 @@ func TestRelaunchExecutionOverwriteCacheOverride(t *testing.T) { }) } +func TestRelaunchExecutionEnvsOverride(t *testing.T) { + // Set up mocks. + repository := getMockRepositoryForExecTest() + setDefaultLpCallbackForExecTest(repository) + mockExecutor := workflowengineMocks.WorkflowExecutor{} + mockExecutor.OnExecuteMatch(mock.Anything, mock.Anything, mock.Anything).Return(workflowengineInterfaces.ExecutionResponse{}, nil) + mockExecutor.OnID().Return("testMockExecutor") + r := plugins.NewRegistry() + r.RegisterDefault(plugins.PluginIDWorkflowExecutor, &mockExecutor) + execManager := NewExecutionManager(repository, r, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{}) + startTime := time.Now() + startTimeProto, _ := ptypes.TimestampProto(startTime) + existingClosure := admin.ExecutionClosure{ + Phase: core.WorkflowExecution_RUNNING, + StartedAt: startTimeProto, + } + existingClosureBytes, _ := proto.Marshal(&existingClosure) + env := map[string]string{"foo": "bar"} + executionGetFunc := makeExecutionOverwriteEnvs(t, existingClosureBytes, &startTime, env) + repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(executionGetFunc) + + var createCalled bool + exCreateFunc := func(ctx context.Context, input models.Execution) error { + createCalled = true + assert.Equal(t, "relaunchy", input.Name) + assert.Equal(t, "domain", input.Domain) + assert.Equal(t, "project", input.Project) + assert.Equal(t, uint(8), input.SourceExecutionID) + var spec admin.ExecutionSpec + err := proto.Unmarshal(input.Spec, &spec) + assert.Nil(t, err) + assert.Equal(t, admin.ExecutionMetadata_RELAUNCH, spec.Metadata.Mode) + assert.Equal(t, int32(admin.ExecutionMetadata_RELAUNCH), input.Mode) + assert.NotNil(t, spec.GetEnvs()) + assert.Equal(t, spec.GetEnvs().Values, env) + return nil + } + repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetCreateCallback(exCreateFunc) + + _, err := execManager.RelaunchExecution(context.Background(), admin.ExecutionRelaunchRequest{ + Id: &core.WorkflowExecutionIdentifier{ + Project: "project", + Domain: "domain", + Name: "name", + }, + Name: "relaunchy", + }, requestedAt) + assert.Nil(t, err) + assert.True(t, createCalled) +} + func TestRecoverExecution(t *testing.T) { // Set up mocks. repository := getMockRepositoryForExecTest() @@ -1879,6 +2043,66 @@ func TestRecoverExecutionOverwriteCacheOverride(t *testing.T) { assert.True(t, proto.Equal(expectedResponse, response)) } +func TestRecoverExecutionEnvsOverride(t *testing.T) { + // Set up mocks. + repository := getMockRepositoryForExecTest() + setDefaultLpCallbackForExecTest(repository) + mockExecutor := workflowengineMocks.WorkflowExecutor{} + mockExecutor.OnExecuteMatch(mock.Anything, mock.Anything, mock.Anything).Return(workflowengineInterfaces.ExecutionResponse{}, nil) + mockExecutor.OnID().Return("testMockExecutor") + r := plugins.NewRegistry() + r.RegisterDefault(plugins.PluginIDWorkflowExecutor, &mockExecutor) + execManager := NewExecutionManager(repository, r, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{}) + startTime := time.Now() + startTimeProto, _ := ptypes.TimestampProto(startTime) + existingClosure := admin.ExecutionClosure{ + Phase: core.WorkflowExecution_SUCCEEDED, + StartedAt: startTimeProto, + } + existingClosureBytes, _ := proto.Marshal(&existingClosure) + env := map[string]string{"foo": "bar"} + executionGetFunc := makeExecutionOverwriteEnvs(t, existingClosureBytes, &startTime, env) + repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(executionGetFunc) + + exCreateFunc := func(ctx context.Context, input models.Execution) error { + assert.Equal(t, "recovered", input.Name) + assert.Equal(t, "domain", input.Domain) + assert.Equal(t, "project", input.Project) + assert.Equal(t, uint(8), input.SourceExecutionID) + var spec admin.ExecutionSpec + err := proto.Unmarshal(input.Spec, &spec) + assert.Nil(t, err) + assert.Equal(t, admin.ExecutionMetadata_RECOVERED, spec.Metadata.Mode) + assert.Equal(t, int32(admin.ExecutionMetadata_RECOVERED), input.Mode) + assert.NotNil(t, spec.GetEnvs()) + assert.Equal(t, spec.GetEnvs().GetValues(), env) + return nil + } + repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetCreateCallback(exCreateFunc) + + // Issue request. + response, err := execManager.RecoverExecution(context.Background(), admin.ExecutionRecoverRequest{ + Id: &core.WorkflowExecutionIdentifier{ + Project: "project", + Domain: "domain", + Name: "name", + }, + Name: "recovered", + }, requestedAt) + + // And verify response. + assert.Nil(t, err) + + expectedResponse := &admin.ExecutionCreateResponse{ + Id: &core.WorkflowExecutionIdentifier{ + Project: "project", + Domain: "domain", + Name: "recovered", + }, + } + assert.True(t, proto.Equal(expectedResponse, response)) +} + func TestCreateWorkflowEvent(t *testing.T) { repository := repositoryMocks.NewMockRepository() startTime := time.Now() diff --git a/pkg/manager/impl/shared/iface.go b/pkg/manager/impl/shared/iface.go index 98609a5ff..0caba1d50 100644 --- a/pkg/manager/impl/shared/iface.go +++ b/pkg/manager/impl/shared/iface.go @@ -24,6 +24,6 @@ type WorkflowExecutionConfigInterface interface { GetInterruptible() *wrappers.BoolValue // GetOverwriteCache indicates a workflow should skip all its cached results and re-compute its output, overwriting any already stored data. GetOverwriteCache() bool - // GetEnvironmentVariables defines environment variables to be set for the execution. + // GetEnvs defines environment variables to be set for the execution. GetEnvs() *admin.Envs } diff --git a/pkg/manager/impl/testutils/mock_requests.go b/pkg/manager/impl/testutils/mock_requests.go index 2a8d47dd9..8b8473376 100644 --- a/pkg/manager/impl/testutils/mock_requests.go +++ b/pkg/manager/impl/testutils/mock_requests.go @@ -221,6 +221,7 @@ func GetExecutionRequest() admin.ExecutionCreateRequest { }, }, RawOutputDataConfig: &admin.RawOutputDataConfig{OutputLocationPrefix: "default_raw_output"}, + Envs: &admin.Envs{}, }, Inputs: &core.LiteralMap{ Literals: map[string]*core.Literal{ diff --git a/pkg/workflowengine/impl/prepare_execution.go b/pkg/workflowengine/impl/prepare_execution.go index f2a778e27..a7d57a28a 100644 --- a/pkg/workflowengine/impl/prepare_execution.go +++ b/pkg/workflowengine/impl/prepare_execution.go @@ -64,6 +64,7 @@ func addExecutionOverrides(taskPluginOverrides []*admin.PluginOverride, } executionConfig.OverwriteCache = workflowExecutionConfig.GetOverwriteCache() + executionConfig.EnvironmentVariables = workflowExecutionConfig.GetEnvs().Values } if taskResources != nil { var requests = v1alpha1.TaskResourceSpec{} diff --git a/pkg/workflowengine/impl/prepare_execution_test.go b/pkg/workflowengine/impl/prepare_execution_test.go index 38e155636..f3d6ab5a0 100644 --- a/pkg/workflowengine/impl/prepare_execution_test.go +++ b/pkg/workflowengine/impl/prepare_execution_test.go @@ -166,6 +166,14 @@ func TestAddExecutionOverrides(t *testing.T) { addExecutionOverrides(nil, workflowExecutionConfig, nil, nil, workflow) assert.True(t, workflow.ExecutionConfig.OverwriteCache) }) + t.Run("Override environment variables", func(t *testing.T) { + workflowExecutionConfig := &admin.WorkflowExecutionConfig{ + Envs: &admin.Envs{Values: map[string]string{"key": "value"}}, + } + workflow := &v1alpha1.FlyteWorkflow{} + addExecutionOverrides(nil, workflowExecutionConfig, nil, nil, workflow) + assert.Equal(t, workflow.ExecutionConfig.EnvironmentVariables, map[string]string{"key": "value"}) + }) } func TestPrepareFlyteWorkflow(t *testing.T) { From 8075da3b15614c06d728f65c16d1656f1a3b4e2e Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 5 May 2023 01:19:29 -0700 Subject: [PATCH 3/8] update Signed-off-by: Kevin Su --- pkg/manager/impl/execution_manager_test.go | 56 +++++++++++----------- 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/pkg/manager/impl/execution_manager_test.go b/pkg/manager/impl/execution_manager_test.go index 10962b9dc..a19b5083b 100644 --- a/pkg/manager/impl/execution_manager_test.go +++ b/pkg/manager/impl/execution_manager_test.go @@ -1083,36 +1083,36 @@ func TestCreateExecutionOverwriteCache(t *testing.T) { } } -func TestCreateExecutionOverwriteEnvs(t *testing.T) { +func TestCreateExecutionWithEnvs(t *testing.T) { tests := []struct { - name string - task bool - overwriteEnvs map[string]string - want map[string]string + name string + task bool + envs map[string]string + want map[string]string }{ { - name: "LaunchPlanDefault", - task: false, - overwriteEnvs: nil, - want: nil, + name: "LaunchPlanDefault", + task: false, + envs: nil, + want: nil, }, { - name: "LaunchPlanEnable", - task: false, - overwriteEnvs: map[string]string{"foo": "bar"}, - want: map[string]string{"foo": "bar"}, + name: "LaunchPlanEnable", + task: false, + envs: map[string]string{"foo": "bar"}, + want: map[string]string{"foo": "bar"}, }, { - name: "TaskDefault", - task: false, - overwriteEnvs: nil, - want: nil, + name: "TaskDefault", + task: false, + envs: nil, + want: nil, }, { - name: "TaskEnable", - task: true, - overwriteEnvs: map[string]string{"foo": "bar"}, - want: map[string]string{"foo": "bar"}, + name: "TaskEnable", + task: true, + envs: map[string]string{"foo": "bar"}, + want: map[string]string{"foo": "bar"}, }, } @@ -1125,7 +1125,7 @@ func TestCreateExecutionOverwriteEnvs(t *testing.T) { if tt.task { request.Spec.LaunchPlan.ResourceType = core.ResourceType_TASK } - request.Spec.Envs.Values = tt.overwriteEnvs + request.Spec.Envs.Values = tt.envs repository := getMockRepositoryForExecTest() setDefaultLpCallbackForExecTest(repository) @@ -1144,7 +1144,7 @@ func TestCreateExecutionOverwriteEnvs(t *testing.T) { assert.Equal(t, uint(0), input.TaskID) } - assert.Equal(t, tt.overwriteEnvs, spec.GetEnvs().Values) + assert.Equal(t, tt.envs, spec.GetEnvs().Values) return nil } @@ -1285,15 +1285,15 @@ func makeExecutionOverwriteCacheGetFunc( } } -func makeExecutionOverwriteEnvs( - t *testing.T, closureBytes []byte, startTime *time.Time, overwriteEnvs map[string]string) repositoryMocks.GetExecutionFunc { +func makeExecutionWithEnvs( + t *testing.T, closureBytes []byte, startTime *time.Time, envs map[string]string) repositoryMocks.GetExecutionFunc { return func(ctx context.Context, input interfaces.Identifier) (models.Execution, error) { assert.Equal(t, "project", input.Project) assert.Equal(t, "domain", input.Domain) assert.Equal(t, "name", input.Name) request := testutils.GetExecutionRequest() - request.Spec.Envs.Values = overwriteEnvs + request.Spec.Envs.Values = envs specBytes, err := proto.Marshal(request.Spec) assert.Nil(t, err) @@ -1649,7 +1649,7 @@ func TestRelaunchExecutionEnvsOverride(t *testing.T) { } existingClosureBytes, _ := proto.Marshal(&existingClosure) env := map[string]string{"foo": "bar"} - executionGetFunc := makeExecutionOverwriteEnvs(t, existingClosureBytes, &startTime, env) + executionGetFunc := makeExecutionWithEnvs(t, existingClosureBytes, &startTime, env) repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(executionGetFunc) var createCalled bool @@ -2061,7 +2061,7 @@ func TestRecoverExecutionEnvsOverride(t *testing.T) { } existingClosureBytes, _ := proto.Marshal(&existingClosure) env := map[string]string{"foo": "bar"} - executionGetFunc := makeExecutionOverwriteEnvs(t, existingClosureBytes, &startTime, env) + executionGetFunc := makeExecutionWithEnvs(t, existingClosureBytes, &startTime, env) repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(executionGetFunc) exCreateFunc := func(ctx context.Context, input models.Execution) error { From e319c65b5ce5b89cafb3a5694b2e89a53441d8cc Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 5 May 2023 01:47:43 -0700 Subject: [PATCH 4/8] fix tests Signed-off-by: Kevin Su --- pkg/workflowengine/impl/prepare_execution.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/workflowengine/impl/prepare_execution.go b/pkg/workflowengine/impl/prepare_execution.go index a7d57a28a..d97382afc 100644 --- a/pkg/workflowengine/impl/prepare_execution.go +++ b/pkg/workflowengine/impl/prepare_execution.go @@ -64,7 +64,10 @@ func addExecutionOverrides(taskPluginOverrides []*admin.PluginOverride, } executionConfig.OverwriteCache = workflowExecutionConfig.GetOverwriteCache() - executionConfig.EnvironmentVariables = workflowExecutionConfig.GetEnvs().Values + + if workflowExecutionConfig.GetEnvs() != nil { + executionConfig.EnvironmentVariables = workflowExecutionConfig.GetEnvs().Values + } } if taskResources != nil { var requests = v1alpha1.TaskResourceSpec{} From abaf51d4776137f1fcbf1c422d38f50bc208b1c6 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 9 May 2023 15:31:45 -0700 Subject: [PATCH 5/8] update idl Signed-off-by: Kevin Su --- go.mod | 4 +-- go.sum | 4 +-- pkg/manager/impl/execution_manager_test.go | 28 ++++++++++--------- .../interfaces/application_configuration.go | 9 +++++- pkg/workflowengine/impl/prepare_execution.go | 6 +++- 5 files changed, 31 insertions(+), 20 deletions(-) diff --git a/go.mod b/go.mod index b33cc30e4..294214cec 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/cloudevents/sdk-go/v2 v2.8.0 github.com/coreos/go-oidc v2.2.1+incompatible github.com/evanphx/json-patch v4.12.0+incompatible - github.com/flyteorg/flyteidl v1.3.19 + github.com/flyteorg/flyteidl v1.5.3 github.com/flyteorg/flyteplugins v1.0.56 github.com/flyteorg/flytepropeller v1.1.87 github.com/flyteorg/flytestdlib v1.0.15 @@ -209,5 +209,3 @@ require ( ) replace github.com/robfig/cron/v3 => github.com/unionai/cron/v3 v3.0.2-0.20210825070134-bfc34418fe84 - -replace github.com/flyteorg/flyteidl => github.com/flyteorg/flyteidl v1.5.2-0.20230504203702-1f2dc1fbedb0 diff --git a/go.sum b/go.sum index 090791b7d..a5f96a0e7 100644 --- a/go.sum +++ b/go.sum @@ -312,8 +312,8 @@ github.com/fatih/structs v1.0.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ= github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= -github.com/flyteorg/flyteidl v1.5.2-0.20230504203702-1f2dc1fbedb0 h1:0jWt1RKWaNWh8DHyRLex+WaZaotn6zLjMCAjU8xkcn4= -github.com/flyteorg/flyteidl v1.5.2-0.20230504203702-1f2dc1fbedb0/go.mod h1:ckLjB51moX4L0oQml+WTCrPK50zrJf6IZJ6LPC0RB4I= +github.com/flyteorg/flyteidl v1.5.3 h1:qHyU9kvcxGIkXoloi768ayx9FHrs961dZC3WYziGGZA= +github.com/flyteorg/flyteidl v1.5.3/go.mod h1:ckLjB51moX4L0oQml+WTCrPK50zrJf6IZJ6LPC0RB4I= github.com/flyteorg/flyteplugins v1.0.56 h1:kBTDgTpdSi7wcptk4cMwz5vfh1MU82VaUMMboe1InXw= github.com/flyteorg/flyteplugins v1.0.56/go.mod h1:aFCKSn8TPzxSAILIiogHtUnHlUCN9+y6Vf+r9R4KZDU= github.com/flyteorg/flytepropeller v1.1.87 h1:Px7ASDjrWyeVrUb15qXmhw9QK7xPcFjL5Yetr2P6iGM= diff --git a/pkg/manager/impl/execution_manager_test.go b/pkg/manager/impl/execution_manager_test.go index a19b5083b..5de11000d 100644 --- a/pkg/manager/impl/execution_manager_test.go +++ b/pkg/manager/impl/execution_manager_test.go @@ -1087,8 +1087,8 @@ func TestCreateExecutionWithEnvs(t *testing.T) { tests := []struct { name string task bool - envs map[string]string - want map[string]string + envs []*core.KeyValuePair + want []*core.KeyValuePair }{ { name: "LaunchPlanDefault", @@ -1099,8 +1099,8 @@ func TestCreateExecutionWithEnvs(t *testing.T) { { name: "LaunchPlanEnable", task: false, - envs: map[string]string{"foo": "bar"}, - want: map[string]string{"foo": "bar"}, + envs: []*core.KeyValuePair{{Key: "foo", Value: "bar"}}, + want: []*core.KeyValuePair{{Key: "foo", Value: "bar"}}, }, { name: "TaskDefault", @@ -1111,8 +1111,8 @@ func TestCreateExecutionWithEnvs(t *testing.T) { { name: "TaskEnable", task: true, - envs: map[string]string{"foo": "bar"}, - want: map[string]string{"foo": "bar"}, + envs: []*core.KeyValuePair{{Key: "foo", Value: "bar"}}, + want: []*core.KeyValuePair{{Key: "foo", Value: "bar"}}, }, } @@ -1286,7 +1286,7 @@ func makeExecutionOverwriteCacheGetFunc( } func makeExecutionWithEnvs( - t *testing.T, closureBytes []byte, startTime *time.Time, envs map[string]string) repositoryMocks.GetExecutionFunc { + t *testing.T, closureBytes []byte, startTime *time.Time, envs []*core.KeyValuePair) repositoryMocks.GetExecutionFunc { return func(ctx context.Context, input interfaces.Identifier) (models.Execution, error) { assert.Equal(t, "project", input.Project) assert.Equal(t, "domain", input.Domain) @@ -1648,7 +1648,7 @@ func TestRelaunchExecutionEnvsOverride(t *testing.T) { StartedAt: startTimeProto, } existingClosureBytes, _ := proto.Marshal(&existingClosure) - env := map[string]string{"foo": "bar"} + env := []*core.KeyValuePair{{Key: "foo", Value: "bar"}} executionGetFunc := makeExecutionWithEnvs(t, existingClosureBytes, &startTime, env) repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(executionGetFunc) @@ -2060,7 +2060,7 @@ func TestRecoverExecutionEnvsOverride(t *testing.T) { StartedAt: startTimeProto, } existingClosureBytes, _ := proto.Marshal(&existingClosure) - env := map[string]string{"foo": "bar"} + env := []*core.KeyValuePair{{Key: "foo", Value: "bar"}} executionGetFunc := makeExecutionWithEnvs(t, existingClosureBytes, &startTime, env) repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(executionGetFunc) @@ -4432,7 +4432,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) { requestMaxParallelism := int32(10) requestInterruptible := false requestOverwriteCache := false - requestEnvironmentVariables := map[string]string{"hello": "world"} + requestEnvironmentVariables := []*core.KeyValuePair{{Key: "hello", Value: "world"}} launchPlanLabels := map[string]string{"launchPlanLabelKey": "launchPlanLabelValue"} launchPlanAnnotations := map[string]string{"launchPlanAnnotationKey": "launchPlanAnnotationValue"} @@ -4442,7 +4442,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) { launchPlanMaxParallelism := int32(50) launchPlanInterruptible := true launchPlanOverwriteCache := true - launchPlanEnvironmentVariables := map[string]string{"foo": "bar"} + launchPlanEnvironmentVariables := []*core.KeyValuePair{{Key: "foo", Value: "bar"}} applicationConfig := runtime.NewConfigurationProvider() @@ -5030,7 +5030,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) { launchPlan := &admin.LaunchPlan{ Spec: &admin.LaunchPlanSpec{ Interruptible: &wrappers.BoolValue{Value: true}, - Envs: &admin.Envs{Values: map[string]string{"foo": "bar"}}, + Envs: &admin.Envs{Values: []*core.KeyValuePair{{Key: "foo", Value: "bar"}}}, }, } @@ -5042,7 +5042,9 @@ func TestGetExecutionConfigOverrides(t *testing.T) { assert.Nil(t, execConfig.GetRawOutputDataConfig()) assert.Nil(t, execConfig.GetLabels()) assert.Nil(t, execConfig.GetAnnotations()) - assert.Equal(t, "bar", execConfig.Envs.Values["foo"]) + assert.Equal(t, 1, len(execConfig.Envs.Values)) + assert.Equal(t, "foo", execConfig.Envs.Values[0].Key) + assert.Equal(t, "bar", execConfig.Envs.Values[0].Value) }) t.Run("launch plan with no interruptible override specified", func(t *testing.T) { request := &admin.ExecutionCreateRequest{ diff --git a/pkg/runtime/interfaces/application_configuration.go b/pkg/runtime/interfaces/application_configuration.go index 7b25e7143..cf9bf2e9e 100644 --- a/pkg/runtime/interfaces/application_configuration.go +++ b/pkg/runtime/interfaces/application_configuration.go @@ -170,8 +170,15 @@ func (a *ApplicationConfig) GetOverwriteCache() bool { } func (a *ApplicationConfig) GetEnvs() *admin.Envs { + var envs []*core.KeyValuePair + for k, v := range a.Envs { + envs = append(envs, &core.KeyValuePair{ + Key: k, + Value: v, + }) + } return &admin.Envs{ - Values: a.Envs, + Values: envs, } } diff --git a/pkg/workflowengine/impl/prepare_execution.go b/pkg/workflowengine/impl/prepare_execution.go index d97382afc..30b76fd9b 100644 --- a/pkg/workflowengine/impl/prepare_execution.go +++ b/pkg/workflowengine/impl/prepare_execution.go @@ -65,8 +65,12 @@ func addExecutionOverrides(taskPluginOverrides []*admin.PluginOverride, executionConfig.OverwriteCache = workflowExecutionConfig.GetOverwriteCache() + envs := make(map[string]string) if workflowExecutionConfig.GetEnvs() != nil { - executionConfig.EnvironmentVariables = workflowExecutionConfig.GetEnvs().Values + for _, v := range workflowExecutionConfig.GetEnvs().Values { + envs[v.Key] = v.Value + } + executionConfig.EnvironmentVariables = envs } } if taskResources != nil { From ab8101794c0b5c5a47f7a15afe21e8a672176e45 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 9 May 2023 15:42:32 -0700 Subject: [PATCH 6/8] lint Signed-off-by: Kevin Su --- pkg/workflowengine/impl/prepare_execution_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/workflowengine/impl/prepare_execution_test.go b/pkg/workflowengine/impl/prepare_execution_test.go index f3d6ab5a0..60925c08b 100644 --- a/pkg/workflowengine/impl/prepare_execution_test.go +++ b/pkg/workflowengine/impl/prepare_execution_test.go @@ -168,7 +168,7 @@ func TestAddExecutionOverrides(t *testing.T) { }) t.Run("Override environment variables", func(t *testing.T) { workflowExecutionConfig := &admin.WorkflowExecutionConfig{ - Envs: &admin.Envs{Values: map[string]string{"key": "value"}}, + Envs: &admin.Envs{Values: []*core.KeyValuePair{{Key: "key", Value: "value"}}}, } workflow := &v1alpha1.FlyteWorkflow{} addExecutionOverrides(nil, workflowExecutionConfig, nil, nil, workflow) From ed392b54d7215afaf1f2774cd2c1a998c734c20c Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 9 May 2023 16:08:08 -0700 Subject: [PATCH 7/8] fix tests Signed-off-by: Kevin Su --- pkg/manager/impl/execution_manager_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/manager/impl/execution_manager_test.go b/pkg/manager/impl/execution_manager_test.go index 5de11000d..578f3134a 100644 --- a/pkg/manager/impl/execution_manager_test.go +++ b/pkg/manager/impl/execution_manager_test.go @@ -2075,7 +2075,8 @@ func TestRecoverExecutionEnvsOverride(t *testing.T) { assert.Equal(t, admin.ExecutionMetadata_RECOVERED, spec.Metadata.Mode) assert.Equal(t, int32(admin.ExecutionMetadata_RECOVERED), input.Mode) assert.NotNil(t, spec.GetEnvs()) - assert.Equal(t, spec.GetEnvs().GetValues(), env) + assert.Equal(t, spec.GetEnvs().GetValues()[0].Key, env[0].Key) + assert.Equal(t, spec.GetEnvs().GetValues()[0].Value, env[0].Value) return nil } repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetCreateCallback(exCreateFunc) From 4b738f8cc7c9ba7dbb3627d1dd1cdeafc16d609c Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 9 May 2023 16:19:30 -0700 Subject: [PATCH 8/8] fix tests Signed-off-by: Kevin Su --- pkg/manager/impl/execution_manager_test.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/pkg/manager/impl/execution_manager_test.go b/pkg/manager/impl/execution_manager_test.go index 578f3134a..7549e03f5 100644 --- a/pkg/manager/impl/execution_manager_test.go +++ b/pkg/manager/impl/execution_manager_test.go @@ -1143,8 +1143,12 @@ func TestCreateExecutionWithEnvs(t *testing.T) { assert.NotEqual(t, uint(0), input.LaunchPlanID) assert.Equal(t, uint(0), input.TaskID) } - - assert.Equal(t, tt.envs, spec.GetEnvs().Values) + if len(tt.envs) != 0 { + assert.Equal(t, tt.envs[0].Key, spec.GetEnvs().Values[0].Key) + assert.Equal(t, tt.envs[0].Value, spec.GetEnvs().Values[0].Value) + } else { + assert.Nil(t, spec.GetEnvs().GetValues()) + } return nil } @@ -1665,7 +1669,8 @@ func TestRelaunchExecutionEnvsOverride(t *testing.T) { assert.Equal(t, admin.ExecutionMetadata_RELAUNCH, spec.Metadata.Mode) assert.Equal(t, int32(admin.ExecutionMetadata_RELAUNCH), input.Mode) assert.NotNil(t, spec.GetEnvs()) - assert.Equal(t, spec.GetEnvs().Values, env) + assert.Equal(t, spec.GetEnvs().Values[0].Key, env[0].Key) + assert.Equal(t, spec.GetEnvs().Values[0].Value, env[0].Value) return nil } repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetCreateCallback(exCreateFunc)