From 1bf7e20dfba70cc9b0c850549c2a099d4d1f53f6 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Tue, 18 Apr 2023 09:25:24 -0500 Subject: [PATCH 1/2] maintaining Interruptible and OverwriteCache for reference launchplans Signed-off-by: Daniel Rammer --- .../nodes/subworkflow/launchplan.go | 2 ++ .../nodes/subworkflow/launchplan/admin.go | 30 +++++++++++++------ .../subworkflow/launchplan/launchplan.go | 2 ++ 3 files changed, 25 insertions(+), 9 deletions(-) diff --git a/pkg/controller/nodes/subworkflow/launchplan.go b/pkg/controller/nodes/subworkflow/launchplan.go index ec972d9fd..5b764059d 100644 --- a/pkg/controller/nodes/subworkflow/launchplan.go +++ b/pkg/controller/nodes/subworkflow/launchplan.go @@ -73,6 +73,8 @@ func (l *launchPlanHandler) StartLaunchPlan(ctx context.Context, nCtx handler.No RawOutputDataConfig: nCtx.ExecutionContext().GetRawOutputDataConfig().RawOutputDataConfig, Labels: nCtx.ExecutionContext().GetLabels(), Annotations: nCtx.ExecutionContext().GetAnnotations(), + Interruptible: nCtx.ExecutionContext().GetExecutionConfig().Interruptible, + OverwriteCache: nCtx.ExecutionContext().GetExecutionConfig().OverwriteCache, } if nCtx.ExecutionContext().GetExecutionConfig().RecoveryExecution.WorkflowExecutionIdentifier != nil { diff --git a/pkg/controller/nodes/subworkflow/launchplan/admin.go b/pkg/controller/nodes/subworkflow/launchplan/admin.go index 0329f3aef..2931b343e 100644 --- a/pkg/controller/nodes/subworkflow/launchplan/admin.go +++ b/pkg/controller/nodes/subworkflow/launchplan/admin.go @@ -6,23 +6,25 @@ import ( "fmt" "time" - evtErr "github.com/flyteorg/flytepropeller/events/errors" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service" "github.com/flyteorg/flytestdlib/cache" - "golang.org/x/time/rate" - "k8s.io/client-go/util/workqueue" - stdErr "github.com/flyteorg/flytestdlib/errors" - "github.com/flyteorg/flytestdlib/logger" - "github.com/flyteorg/flytestdlib/promutils" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service" + evtErr "github.com/flyteorg/flytepropeller/events/errors" + + "github.com/golang/protobuf/ptypes/wrappers" + + "golang.org/x/time/rate" + "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + + "k8s.io/client-go/util/workqueue" ) var isRecovery = true @@ -93,6 +95,13 @@ func (a *adminLaunchPlanExecutor) Launch(ctx context.Context, launchCtx LaunchCo } } + var interruptible *wrappers.BoolValue + if launchCtx.Interruptible != nil{ + interruptible = &wrappers.BoolValue{ + Value: *launchCtx.Interruptible, + } + } + req := &admin.ExecutionCreateRequest{ Project: executionID.Project, Domain: executionID.Domain, @@ -111,6 +120,9 @@ func (a *adminLaunchPlanExecutor) Launch(ctx context.Context, launchCtx LaunchCo SecurityContext: &launchCtx.SecurityContext, MaxParallelism: int32(launchCtx.MaxParallelism), RawOutputDataConfig: launchCtx.RawOutputDataConfig, + //ClusterAssignment: launchCtx.ClusterAssignment, TODO @hamersaw - do we need this? + Interruptible: interruptible, + OverwriteCache: launchCtx.OverwriteCache, }, } diff --git a/pkg/controller/nodes/subworkflow/launchplan/launchplan.go b/pkg/controller/nodes/subworkflow/launchplan/launchplan.go index 7d31b6f92..7d6588ca3 100644 --- a/pkg/controller/nodes/subworkflow/launchplan/launchplan.go +++ b/pkg/controller/nodes/subworkflow/launchplan/launchplan.go @@ -28,6 +28,8 @@ type LaunchContext struct { RawOutputDataConfig *admin.RawOutputDataConfig Annotations map[string]string Labels map[string]string + Interruptible *bool + OverwriteCache bool } // Executor interface to be implemented by the remote system that can allow workflow launching capabilities From 53690c8da064caf4955c36bc951f6df4c515f180 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Tue, 18 Apr 2023 09:53:55 -0500 Subject: [PATCH 2/2] fixed lint issues Signed-off-by: Daniel Rammer --- pkg/controller/nodes/subworkflow/launchplan/admin.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/controller/nodes/subworkflow/launchplan/admin.go b/pkg/controller/nodes/subworkflow/launchplan/admin.go index 2931b343e..60499f230 100644 --- a/pkg/controller/nodes/subworkflow/launchplan/admin.go +++ b/pkg/controller/nodes/subworkflow/launchplan/admin.go @@ -96,7 +96,7 @@ func (a *adminLaunchPlanExecutor) Launch(ctx context.Context, launchCtx LaunchCo } var interruptible *wrappers.BoolValue - if launchCtx.Interruptible != nil{ + if launchCtx.Interruptible != nil { interruptible = &wrappers.BoolValue{ Value: *launchCtx.Interruptible, } @@ -120,7 +120,6 @@ func (a *adminLaunchPlanExecutor) Launch(ctx context.Context, launchCtx LaunchCo SecurityContext: &launchCtx.SecurityContext, MaxParallelism: int32(launchCtx.MaxParallelism), RawOutputDataConfig: launchCtx.RawOutputDataConfig, - //ClusterAssignment: launchCtx.ClusterAssignment, TODO @hamersaw - do we need this? Interruptible: interruptible, OverwriteCache: launchCtx.OverwriteCache, },