From 1c68fd078e54a26d66f883d2ff4aa51b280eb33a Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Wed, 27 Mar 2024 14:54:42 +0000 Subject: [PATCH 1/4] Remove shard key in admin-launcher Signed-off-by: Thomas Newton --- .../pkg/controller/nodes/subworkflow/launchplan/admin.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin.go b/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin.go index 29de745acf..8166b95f3a 100644 --- a/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin.go +++ b/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin.go @@ -15,6 +15,7 @@ import ( "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/service" evtErr "github.com/flyteorg/flyte/flytepropeller/events/errors" + "github.com/flyteorg/flyte/flytepropeller/pkg/compiler/transformers/k8s" "github.com/flyteorg/flyte/flytestdlib/cache" stdErr "github.com/flyteorg/flyte/flytestdlib/errors" "github.com/flyteorg/flyte/flytestdlib/logger" @@ -114,6 +115,12 @@ func (a *adminLaunchPlanExecutor) Launch(ctx context.Context, launchCtx LaunchCo }) } + // Remove the ShardKeyLabel entry from labels so that the child launch plan can potentially run on a different + // flytepropeller shard. + logger.Warnf(ctx, "launch labels: %v", launchCtx.Labels) + delete(launchCtx.Labels, k8s.ShardKeyLabel) + logger.Warnf(ctx, "launch labels: %v", launchCtx.Labels) + req := &admin.ExecutionCreateRequest{ Project: executionID.Project, Domain: executionID.Domain, From 0e1f2de9c4e9bef420b260c579388ed682048bc5 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Wed, 27 Mar 2024 15:54:30 +0000 Subject: [PATCH 2/4] Don't mutate existing state Signed-off-by: Thomas Newton --- .../controller/nodes/subworkflow/launchplan/admin.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin.go b/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin.go index 8166b95f3a..050cae8c10 100644 --- a/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin.go +++ b/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin.go @@ -115,11 +115,10 @@ func (a *adminLaunchPlanExecutor) Launch(ctx context.Context, launchCtx LaunchCo }) } - // Remove the ShardKeyLabel entry from labels so that the child launch plan can potentially run on a different - // flytepropeller shard. - logger.Warnf(ctx, "launch labels: %v", launchCtx.Labels) - delete(launchCtx.Labels, k8s.ShardKeyLabel) - logger.Warnf(ctx, "launch labels: %v", launchCtx.Labels) + // Make a copy of the labels with shard-key removed. This ensures that the shard-key is re-computed for each + // instead of being copied from the parent. + labels := launchCtx.Labels + delete(labels, k8s.ShardKeyLabel) req := &admin.ExecutionCreateRequest{ Project: executionID.Project, @@ -134,7 +133,7 @@ func (a *adminLaunchPlanExecutor) Launch(ctx context.Context, launchCtx LaunchCo Principal: launchCtx.Principal, ParentNodeExecution: launchCtx.ParentNodeExecution, }, - Labels: &admin.Labels{Values: launchCtx.Labels}, + Labels: &admin.Labels{Values: labels}, Annotations: &admin.Annotations{Values: launchCtx.Annotations}, SecurityContext: &launchCtx.SecurityContext, MaxParallelism: int32(launchCtx.MaxParallelism), From 73f15b0d446c3c27893cd87e675c89045ddb85b2 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Wed, 27 Mar 2024 16:23:02 +0000 Subject: [PATCH 3/4] Don't mutate state Signed-off-by: Thomas Newton --- .../pkg/controller/nodes/subworkflow/launchplan/admin.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin.go b/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin.go index 050cae8c10..fbe0a8c1a6 100644 --- a/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin.go +++ b/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin.go @@ -117,8 +117,12 @@ func (a *adminLaunchPlanExecutor) Launch(ctx context.Context, launchCtx LaunchCo // Make a copy of the labels with shard-key removed. This ensures that the shard-key is re-computed for each // instead of being copied from the parent. - labels := launchCtx.Labels - delete(labels, k8s.ShardKeyLabel) + labels := make(map[string]string) + for key, value := range launchCtx.Labels { + if key != k8s.ShardKeyLabel { + labels[key] = value + } + } req := &admin.ExecutionCreateRequest{ Project: executionID.Project, From 5d44e4233c4e4efb4c806b2bf57451ef73b4fdc8 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Wed, 27 Mar 2024 16:23:16 +0000 Subject: [PATCH 4/4] Add a test Signed-off-by: Thomas Newton --- .../nodes/subworkflow/launchplan/admin_test.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin_test.go b/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin_test.go index 89bb0e2477..2a442e3262 100644 --- a/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin_test.go +++ b/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin_test.go @@ -2,6 +2,7 @@ package launchplan import ( "context" + "reflect" "testing" "time" @@ -162,10 +163,14 @@ func TestAdminLaunchPlanExecutor_Launch(t *testing.T) { ctx, mock.MatchedBy(func(o *admin.ExecutionCreateRequest) bool { return o.Project == "p" && o.Domain == "d" && o.Name == "n" && o.Spec.Inputs == nil && - o.Spec.Metadata.Mode == admin.ExecutionMetadata_CHILD_WORKFLOW + o.Spec.Metadata.Mode == admin.ExecutionMetadata_CHILD_WORKFLOW && + reflect.DeepEqual(o.Spec.Labels.Values, map[string]string{"foo": "bar"}) // Ensure shard-key was removed. }), ).Return(nil, nil) assert.NoError(t, err) + + var labels = map[string]string{"foo": "bar", "shard-key": "1"} + err = exec.Launch(ctx, LaunchContext{ ParentNodeExecution: &core.NodeExecutionIdentifier{ @@ -176,12 +181,15 @@ func TestAdminLaunchPlanExecutor_Launch(t *testing.T) { Name: "w", }, }, + Labels: labels, }, id, &core.Identifier{}, nil, ) assert.NoError(t, err) + // Ensure we haven't mutated the state of the parent workflow. + assert.True(t, reflect.DeepEqual(labels, map[string]string{"foo": "bar", "shard-key": "1"})) }) t.Run("happy recover", func(t *testing.T) {