From 35b5c81830ac0b1f781c62b236766a69cd8edbdf Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Sat, 16 Sep 2023 13:01:36 -0700 Subject: [PATCH] Pass Trigger launch plans to Artifact service (#11) Signed-off-by: Yee Hing Tong --- flyteadmin/pkg/artifacts/registry.go | 18 ++++++++++++++++++ .../pkg/manager/impl/execution_manager.go | 12 ++++++------ .../pkg/manager/impl/launch_plan_manager.go | 10 ++++++++++ 3 files changed, 34 insertions(+), 6 deletions(-) diff --git a/flyteadmin/pkg/artifacts/registry.go b/flyteadmin/pkg/artifacts/registry.go index d727debf8d..efee8db2e6 100644 --- a/flyteadmin/pkg/artifacts/registry.go +++ b/flyteadmin/pkg/artifacts/registry.go @@ -1,6 +1,8 @@ package artifacts import ( + "fmt" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/artifact" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flytestdlib/logger" @@ -50,6 +52,22 @@ func (a *ArtifactRegistry) RegisterArtifactConsumer(ctx context.Context, id *cor logger.Debugf(ctx, "Registered artifact consumer [%+v]", id) } +func (a *ArtifactRegistry) RegisterTrigger(ctx context.Context, plan *admin.LaunchPlan) error { + if a.client == nil { + logger.Debugf(ctx, "Artifact client not configured, skipping trigger [%+v]", plan) + return fmt.Errorf("artifact client not configured") + } + _, err := a.client.CreateTrigger(ctx, &artifact.CreateTriggerRequest{ + TriggerLaunchPlan: plan, + }) + if err != nil { + logger.Errorf(ctx, "Failed to register trigger for [%+v] with err: %v", plan.Id, err) + return err + } + logger.Debugf(ctx, "Registered trigger for [%+v]", plan.Id) + return nil +} + func (a *ArtifactRegistry) GetClient() artifact.ArtifactRegistryClient { return a.client } diff --git a/flyteadmin/pkg/manager/impl/execution_manager.go b/flyteadmin/pkg/manager/impl/execution_manager.go index 8dafbace87..39ffafa93b 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager.go +++ b/flyteadmin/pkg/manager/impl/execution_manager.go @@ -845,17 +845,17 @@ func (m *ExecutionManager) fillInTemplateArgs(ctx context.Context, query core.Ar domain = ak.GetDomain() } - var partitions map[string]string + var partitions map[string]*core.PartitionValue - if artifactID.GetPartitions() != nil { - partitions = make(map[string]string, len(artifactID.GetPartitions().Value)) - for k, v := range artifactID.GetPartitions().Value { - newValue, err := m.templateInputString(ctx, v, inputs, metadata) + if artifactID.GetPartitions() != nil && artifactID.GetPartitions().GetValue() != nil { + partitions = make(map[string]*core.PartitionValue, len(artifactID.GetPartitions().Value)) + for k, v := range artifactID.GetPartitions().GetValue() { + newValue, err := m.templateInputString(ctx, v.StaticValue, inputs, metadata) if err != nil { logger.Errorf(ctx, "Failed to template input string [%s] [%v]", v, err) return query, err } - partitions[k] = newValue + partitions[k] = &core.PartitionValue{StaticValue: newValue} } } return core.ArtifactQuery{ diff --git a/flyteadmin/pkg/manager/impl/launch_plan_manager.go b/flyteadmin/pkg/manager/impl/launch_plan_manager.go index bf7912e539..4a4e46240d 100644 --- a/flyteadmin/pkg/manager/impl/launch_plan_manager.go +++ b/flyteadmin/pkg/manager/impl/launch_plan_manager.go @@ -90,6 +90,16 @@ func (m *LaunchPlanManager) CreateLaunchPlan( return nil, err } + // The presence of this field indicates that this is a trigger launch plan + // Return true and send this request over to the artifact registry instead + if launchPlan.Spec.GetEntityMetadata() != nil && launchPlan.Spec.GetEntityMetadata().GetLaunchConditions() != nil { + err := m.artifactRegistry.RegisterTrigger(ctx, &launchPlan) + if err != nil { + return nil, err + } + return &admin.LaunchPlanCreateResponse{}, nil + } + existingLaunchPlanModel, err := util.GetLaunchPlanModel(ctx, m.db, *request.Id) if err == nil { if bytes.Equal(existingLaunchPlanModel.Digest, launchPlanDigest) {