Skip to content

Commit

Permalink
Pass Trigger launch plans to Artifact service (#11)
Browse files Browse the repository at this point in the history
Signed-off-by: Yee Hing Tong <[email protected]>
  • Loading branch information
wild-endeavor authored Sep 16, 2023
1 parent cff0609 commit 35b5c81
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 6 deletions.
18 changes: 18 additions & 0 deletions flyteadmin/pkg/artifacts/registry.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package artifacts

import (
"fmt"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/artifact"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flytestdlib/logger"
Expand Down Expand Up @@ -50,6 +52,22 @@ func (a *ArtifactRegistry) RegisterArtifactConsumer(ctx context.Context, id *cor
logger.Debugf(ctx, "Registered artifact consumer [%+v]", id)
}

func (a *ArtifactRegistry) RegisterTrigger(ctx context.Context, plan *admin.LaunchPlan) error {
if a.client == nil {
logger.Debugf(ctx, "Artifact client not configured, skipping trigger [%+v]", plan)
return fmt.Errorf("artifact client not configured")
}
_, err := a.client.CreateTrigger(ctx, &artifact.CreateTriggerRequest{
TriggerLaunchPlan: plan,
})
if err != nil {
logger.Errorf(ctx, "Failed to register trigger for [%+v] with err: %v", plan.Id, err)
return err
}
logger.Debugf(ctx, "Registered trigger for [%+v]", plan.Id)
return nil
}

func (a *ArtifactRegistry) GetClient() artifact.ArtifactRegistryClient {
return a.client
}
Expand Down
12 changes: 6 additions & 6 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -845,17 +845,17 @@ func (m *ExecutionManager) fillInTemplateArgs(ctx context.Context, query core.Ar
domain = ak.GetDomain()
}

var partitions map[string]string
var partitions map[string]*core.PartitionValue

if artifactID.GetPartitions() != nil {
partitions = make(map[string]string, len(artifactID.GetPartitions().Value))
for k, v := range artifactID.GetPartitions().Value {
newValue, err := m.templateInputString(ctx, v, inputs, metadata)
if artifactID.GetPartitions() != nil && artifactID.GetPartitions().GetValue() != nil {
partitions = make(map[string]*core.PartitionValue, len(artifactID.GetPartitions().Value))
for k, v := range artifactID.GetPartitions().GetValue() {
newValue, err := m.templateInputString(ctx, v.StaticValue, inputs, metadata)
if err != nil {
logger.Errorf(ctx, "Failed to template input string [%s] [%v]", v, err)
return query, err
}
partitions[k] = newValue
partitions[k] = &core.PartitionValue{StaticValue: newValue}
}
}
return core.ArtifactQuery{
Expand Down
10 changes: 10 additions & 0 deletions flyteadmin/pkg/manager/impl/launch_plan_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,16 @@ func (m *LaunchPlanManager) CreateLaunchPlan(
return nil, err
}

// The presence of this field indicates that this is a trigger launch plan
// Return true and send this request over to the artifact registry instead
if launchPlan.Spec.GetEntityMetadata() != nil && launchPlan.Spec.GetEntityMetadata().GetLaunchConditions() != nil {
err := m.artifactRegistry.RegisterTrigger(ctx, &launchPlan)
if err != nil {
return nil, err
}
return &admin.LaunchPlanCreateResponse{}, nil
}

existingLaunchPlanModel, err := util.GetLaunchPlanModel(ctx, m.db, *request.Id)
if err == nil {
if bytes.Equal(existingLaunchPlanModel.Digest, launchPlanDigest) {
Expand Down

0 comments on commit 35b5c81

Please sign in to comment.