From 9b4dea854da8af50655c45b07509f0d1e60f9c9d Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 26 May 2023 17:10:55 -0700 Subject: [PATCH 01/26] test Signed-off-by: Kevin Su --- go.mod | 2 + go.sum | 2 - pkg/manager/impl/execution_manager.go | 112 +++++++++++------- pkg/repositories/config/migrations.go | 10 ++ pkg/repositories/gormimpl/execution_repo.go | 26 +++- pkg/repositories/interfaces/execution_repo.go | 2 +- pkg/repositories/models/execution.go | 5 + 7 files changed, 104 insertions(+), 55 deletions(-) diff --git a/go.mod b/go.mod index 46ef4b0de..a5e192231 100644 --- a/go.mod +++ b/go.mod @@ -213,3 +213,5 @@ replace github.com/robfig/cron/v3 => github.com/unionai/cron/v3 v3.0.2-0.2022091 // Retracted versions // This was published in error when attempting to create 1.5.1 Flyte release. retract v1.1.94 + +replace github.com/flyteorg/flyteidl => /Users/kevin/git/flyteidl diff --git a/go.sum b/go.sum index 255fa8893..f19b0395d 100644 --- a/go.sum +++ b/go.sum @@ -312,8 +312,6 @@ github.com/fatih/structs v1.0.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ= github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= -github.com/flyteorg/flyteidl v1.5.7 h1:voAxMMFsKOseNFSlCyRGlpegqtQXtJjyxgsQzZg4tts= -github.com/flyteorg/flyteidl v1.5.7/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og= github.com/flyteorg/flyteplugins v1.0.56 h1:kBTDgTpdSi7wcptk4cMwz5vfh1MU82VaUMMboe1InXw= github.com/flyteorg/flyteplugins v1.0.56/go.mod h1:aFCKSn8TPzxSAILIiogHtUnHlUCN9+y6Vf+r9R4KZDU= github.com/flyteorg/flytepropeller v1.1.87 h1:Px7ASDjrWyeVrUb15qXmhw9QK7xPcFjL5Yetr2P6iGM= diff --git a/pkg/manager/impl/execution_manager.go b/pkg/manager/impl/execution_manager.go index 8a6b7287b..c95766c33 100644 --- a/pkg/manager/impl/execution_manager.go +++ b/pkg/manager/impl/execution_manager.go @@ -450,7 +450,7 @@ func (m *ExecutionManager) getClusterAssignment(ctx context.Context, request *ad func (m *ExecutionManager) launchSingleTaskExecution( ctx context.Context, request admin.ExecutionCreateRequest, requestedAt time.Time) ( - context.Context, *models.Execution, error) { + context.Context, *models.Execution, []models.ExecutionTags, error) { taskModel, err := m.db.TaskRepo().Get(ctx, repositoryInterfaces.Identifier{ Project: request.Spec.LaunchPlan.Project, @@ -459,11 +459,11 @@ func (m *ExecutionManager) launchSingleTaskExecution( Version: request.Spec.LaunchPlan.Version, }) if err != nil { - return nil, nil, err + return nil, nil, nil, err } task, err := transformers.FromTaskModel(taskModel) if err != nil { - return nil, nil, err + return nil, nil, nil, err } // Prepare a skeleton workflow @@ -472,15 +472,15 @@ func (m *ExecutionManager) launchSingleTaskExecution( util.CreateOrGetWorkflowModel(ctx, request, m.db, m.workflowManager, m.namedEntityManager, taskIdentifier, &task) if err != nil { logger.Debugf(ctx, "Failed to created skeleton workflow for [%+v] with err: %v", taskIdentifier, err) - return nil, nil, err + return nil, nil, nil, err } workflow, err := transformers.FromWorkflowModel(*workflowModel) if err != nil { - return nil, nil, err + return nil, nil, nil, err } closure, err := util.FetchAndGetWorkflowClosure(ctx, m.storageClient, workflowModel.RemoteClosureIdentifier) if err != nil { - return nil, nil, err + return nil, nil, nil, err } closure.CreatedAt = workflow.Closure.CreatedAt workflow.Closure = closure @@ -488,7 +488,7 @@ func (m *ExecutionManager) launchSingleTaskExecution( launchPlan, err := util.CreateOrGetLaunchPlan(ctx, m.db, m.config, taskIdentifier, workflow.Closure.CompiledWorkflow.Primary.Template.Interface, workflowModel.ID, request.Spec) if err != nil { - return nil, nil, err + return nil, nil, nil, err } executionInputs, err := validation.CheckAndFetchInputsForExecution( @@ -500,7 +500,7 @@ func (m *ExecutionManager) launchSingleTaskExecution( logger.Debugf(ctx, "Failed to CheckAndFetchInputsForExecution with request.Inputs: %+v"+ "fixed inputs: %+v and expected inputs: %+v with err %v", request.Inputs, launchPlan.Spec.FixedInputs, launchPlan.Closure.ExpectedInputs, err) - return nil, nil, err + return nil, nil, nil, err } name := util.GetExecutionName(request) @@ -524,7 +524,7 @@ func (m *ExecutionManager) launchSingleTaskExecution( var sourceExecutionID uint parentNodeExecutionID, sourceExecutionID, err = m.getInheritedExecMetadata(ctx, requestSpec, &workflowExecutionID) if err != nil { - return nil, nil, err + return nil, nil, nil, err } // Dynamically assign task resource defaults. @@ -538,15 +538,15 @@ func (m *ExecutionManager) launchSingleTaskExecution( inputsURI, err := common.OffloadLiteralMap(ctx, m.storageClient, request.Inputs, workflowExecutionID.Project, workflowExecutionID.Domain, workflowExecutionID.Name, shared.Inputs) if err != nil { - return nil, nil, err + return nil, nil, nil, err } userInputsURI, err := common.OffloadLiteralMap(ctx, m.storageClient, request.Inputs, workflowExecutionID.Project, workflowExecutionID.Domain, workflowExecutionID.Name, shared.UserInputs) if err != nil { - return nil, nil, err + return nil, nil, nil, err } executionConfig, err := m.getExecutionConfig(ctx, &request, nil) if err != nil { - return nil, nil, err + return nil, nil, nil, err } var labels map[string]string @@ -556,7 +556,7 @@ func (m *ExecutionManager) launchSingleTaskExecution( labels, err = m.addProjectLabels(ctx, request.Project, labels) if err != nil { - return nil, nil, err + return nil, nil, nil, err } var annotations map[string]string @@ -571,7 +571,7 @@ func (m *ExecutionManager) launchSingleTaskExecution( clusterAssignment, err := m.getClusterAssignment(ctx, &request) if err != nil { - return nil, nil, err + return nil, nil, nil, err } executionParameters := workflowengineInterfaces.ExecutionParameters{ @@ -589,7 +589,7 @@ func (m *ExecutionManager) launchSingleTaskExecution( overrides, err := m.addPluginOverrides(ctx, &workflowExecutionID, workflowExecutionID.Name, "") if err != nil { - return nil, nil, err + return nil, nil, nil, err } if overrides != nil { executionParameters.TaskPluginOverrides = overrides @@ -614,7 +614,7 @@ func (m *ExecutionManager) launchSingleTaskExecution( m.systemMetrics.PropellerFailures.Inc() logger.Infof(ctx, "Failed to execute workflow %+v with execution id %+v and inputs %+v with err %v", request, workflowExecutionID, request.Inputs, err) - return nil, nil, err + return nil, nil, nil, err } executionCreatedAt := time.Now() acceptanceDelay := executionCreatedAt.Sub(requestedAt) @@ -655,10 +655,10 @@ func (m *ExecutionManager) launchSingleTaskExecution( if err != nil { logger.Infof(ctx, "Failed to create execution model in transformer for id: [%+v] with err: %v", workflowExecutionID, err) - return nil, nil, err + return nil, nil, nil, err } m.userMetrics.WorkflowExecutionInputBytes.Observe(float64(proto.Size(request.Inputs))) - return ctx, executionModel, nil + return ctx, executionModel, nil, nil } func resolveAuthRole(request *admin.ExecutionCreateRequest, launchPlan *admin.LaunchPlan) *admin.AuthRole { @@ -706,13 +706,28 @@ func resolveSecurityCtx(ctx context.Context, executionConfigSecurityCtx *core.Se } } +func createTagsModel(project, domain, name string, tags []string) []models.ExecutionTags { + tagsModel := make([]models.ExecutionTags, len(tags)) + for _, tag := range tags { + tagsModel = append(tagsModel, models.ExecutionTags{ + ExecutionKey: models.ExecutionKey{ + Project: project, + Domain: domain, + Name: name, + }, + Tag: tag, + }) + } + return tagsModel +} + func (m *ExecutionManager) launchExecutionAndPrepareModel( ctx context.Context, request admin.ExecutionCreateRequest, requestedAt time.Time) ( - context.Context, *models.Execution, error) { + context.Context, *models.Execution, []models.ExecutionTags, error) { err := validation.ValidateExecutionRequest(ctx, request, m.db, m.config.ApplicationConfiguration()) if err != nil { logger.Debugf(ctx, "Failed to validate ExecutionCreateRequest %+v with err %v", request, err) - return nil, nil, err + return nil, nil, nil, err } if request.Spec.LaunchPlan.ResourceType == core.ResourceType_TASK { logger.Debugf(ctx, "Launching single task execution with [%+v]", request.Spec.LaunchPlan) @@ -722,12 +737,12 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( launchPlanModel, err := util.GetLaunchPlanModel(ctx, m.db, *request.Spec.LaunchPlan) if err != nil { logger.Debugf(ctx, "Failed to get launch plan model for ExecutionCreateRequest %+v with err %v", request, err) - return nil, nil, err + return nil, nil, nil, err } launchPlan, err := transformers.FromLaunchPlanModel(launchPlanModel) if err != nil { logger.Debugf(ctx, "Failed to transform launch plan model %+v with err %v", launchPlanModel, err) - return nil, nil, err + return nil, nil, nil, err } executionInputs, err := validation.CheckAndFetchInputsForExecution( request.Inputs, @@ -739,23 +754,23 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( logger.Debugf(ctx, "Failed to CheckAndFetchInputsForExecution with request.Inputs: %+v"+ "fixed inputs: %+v and expected inputs: %+v with err %v", request.Inputs, launchPlan.Spec.FixedInputs, launchPlan.Closure.ExpectedInputs, err) - return nil, nil, err + return nil, nil, nil, err } workflowModel, err := util.GetWorkflowModel(ctx, m.db, *launchPlan.Spec.WorkflowId) if err != nil { logger.Debugf(ctx, "Failed to get workflow with id %+v with err %v", launchPlan.Spec.WorkflowId, err) - return nil, nil, err + return nil, nil, nil, err } workflow, err := transformers.FromWorkflowModel(workflowModel) if err != nil { logger.Debugf(ctx, "Failed to get workflow with id %+v with err %v", launchPlan.Spec.WorkflowId, err) - return nil, nil, err + return nil, nil, nil, err } closure, err := util.FetchAndGetWorkflowClosure(ctx, m.storageClient, workflowModel.RemoteClosureIdentifier) if err != nil { logger.Debugf(ctx, "Failed to get workflow with id %+v with err %v", launchPlan.Spec.WorkflowId, err) - return nil, nil, err + return nil, nil, nil, err } closure.CreatedAt = workflow.Closure.CreatedAt workflow.Closure = closure @@ -778,7 +793,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( var sourceExecutionID uint parentNodeExecutionID, sourceExecutionID, err = m.getInheritedExecMetadata(ctx, requestSpec, &workflowExecutionID) if err != nil { - return nil, nil, err + return nil, nil, nil, err } // Dynamically assign task resource defaults. @@ -792,16 +807,16 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( inputsURI, err := common.OffloadLiteralMap(ctx, m.storageClient, executionInputs, workflowExecutionID.Project, workflowExecutionID.Domain, workflowExecutionID.Name, shared.Inputs) if err != nil { - return nil, nil, err + return nil, nil, nil, err } userInputsURI, err := common.OffloadLiteralMap(ctx, m.storageClient, request.Inputs, workflowExecutionID.Project, workflowExecutionID.Domain, workflowExecutionID.Name, shared.UserInputs) if err != nil { - return nil, nil, err + return nil, nil, nil, err } executionConfig, err := m.getExecutionConfig(ctx, &request, launchPlan) if err != nil { - return nil, nil, err + return nil, nil, nil, err } namespace := common.GetNamespaceName( @@ -809,15 +824,15 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( labels, err := resolveStringMap(executionConfig.GetLabels(), launchPlan.Spec.Labels, "labels", m.config.RegistrationValidationConfiguration().GetMaxLabelEntries()) if err != nil { - return nil, nil, err + return nil, nil, nil, err } labels, err = m.addProjectLabels(ctx, request.Project, labels) if err != nil { - return nil, nil, err + return nil, nil, nil, err } annotations, err := resolveStringMap(executionConfig.GetAnnotations(), launchPlan.Spec.Annotations, "annotations", m.config.RegistrationValidationConfiguration().GetMaxAnnotationEntries()) if err != nil { - return nil, nil, err + return nil, nil, nil, err } var rawOutputDataConfig *admin.RawOutputDataConfig if executionConfig.RawOutputDataConfig != nil { @@ -826,7 +841,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( clusterAssignment, err := m.getClusterAssignment(ctx, &request) if err != nil { - return nil, nil, err + return nil, nil, nil, err } executionParameters := workflowengineInterfaces.ExecutionParameters{ @@ -844,7 +859,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( overrides, err := m.addPluginOverrides(ctx, &workflowExecutionID, launchPlan.GetSpec().WorkflowId.Name, launchPlan.Id.Name) if err != nil { - return nil, nil, err + return nil, nil, nil, err } if overrides != nil { executionParameters.TaskPluginOverrides = overrides @@ -870,7 +885,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( m.systemMetrics.PropellerFailures.Inc() logger.Infof(ctx, "Failed to execute workflow %+v with execution id %+v and inputs %+v with err %v", request, workflowExecutionID, executionInputs, err) - return nil, nil, err + return nil, nil, nil, err } executionCreatedAt := time.Now() acceptanceDelay := executionCreatedAt.Sub(requestedAt) @@ -911,20 +926,22 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( if err != nil { logger.Infof(ctx, "Failed to create execution model in transformer for id: [%+v] with err: %v", workflowExecutionID, err) - return nil, nil, err + return nil, nil, nil, err } - return ctx, executionModel, nil + tagsModel := createTagsModel(request.Project, request.Domain, request.Name, request.Spec.Tags) + + return ctx, executionModel, tagsModel, nil } // Inserts an execution model into the database store and emits platform metrics. func (m *ExecutionManager) createExecutionModel( - ctx context.Context, executionModel *models.Execution) (*core.WorkflowExecutionIdentifier, error) { + ctx context.Context, executionModel *models.Execution, tagsModel []models.ExecutionTags) (*core.WorkflowExecutionIdentifier, error) { workflowExecutionIdentifier := core.WorkflowExecutionIdentifier{ Project: executionModel.ExecutionKey.Project, Domain: executionModel.ExecutionKey.Domain, Name: executionModel.ExecutionKey.Name, } - err := m.db.ExecutionRepo().Create(ctx, *executionModel) + err := m.db.ExecutionRepo().Create(ctx, *executionModel, tagsModel) if err != nil { logger.Debugf(ctx, "failed to save newly created execution [%+v] with id %+v to db with err %v", workflowExecutionIdentifier, workflowExecutionIdentifier, err) @@ -945,12 +962,13 @@ func (m *ExecutionManager) CreateExecution( request.Inputs = request.GetSpec().GetInputs() } var executionModel *models.Execution + var tagsModel []models.ExecutionTags var err error - ctx, executionModel, err = m.launchExecutionAndPrepareModel(ctx, request, requestedAt) + ctx, executionModel, tagsModel, err = m.launchExecutionAndPrepareModel(ctx, request, requestedAt) if err != nil { return nil, err } - workflowExecutionIdentifier, err := m.createExecutionModel(ctx, executionModel) + workflowExecutionIdentifier, err := m.createExecutionModel(ctx, executionModel, tagsModel) if err != nil { return nil, err } @@ -995,7 +1013,8 @@ func (m *ExecutionManager) RelaunchExecution( executionSpec.Metadata.ReferenceExecution = existingExecution.Id executionSpec.OverwriteCache = request.GetOverwriteCache() var executionModel *models.Execution - ctx, executionModel, err = m.launchExecutionAndPrepareModel(ctx, admin.ExecutionCreateRequest{ + var tagsModel []models.ExecutionTags + ctx, executionModel, tagsModel, err = m.launchExecutionAndPrepareModel(ctx, admin.ExecutionCreateRequest{ Project: request.Id.Project, Domain: request.Id.Domain, Name: request.Name, @@ -1006,7 +1025,7 @@ func (m *ExecutionManager) RelaunchExecution( return nil, err } executionModel.SourceExecutionID = existingExecutionModel.ID - workflowExecutionIdentifier, err := m.createExecutionModel(ctx, executionModel) + workflowExecutionIdentifier, err := m.createExecutionModel(ctx, executionModel, tagsModel) if err != nil { return nil, err } @@ -1046,7 +1065,8 @@ func (m *ExecutionManager) RecoverExecution( executionSpec.Metadata.Mode = admin.ExecutionMetadata_RECOVERED executionSpec.Metadata.ReferenceExecution = existingExecution.Id var executionModel *models.Execution - ctx, executionModel, err = m.launchExecutionAndPrepareModel(ctx, admin.ExecutionCreateRequest{ + var tagsModel []models.ExecutionTags + ctx, executionModel, tagsModel, err = m.launchExecutionAndPrepareModel(ctx, admin.ExecutionCreateRequest{ Project: request.Id.Project, Domain: request.Id.Domain, Name: request.Name, @@ -1057,7 +1077,7 @@ func (m *ExecutionManager) RecoverExecution( return nil, err } executionModel.SourceExecutionID = existingExecutionModel.ID - workflowExecutionIdentifier, err := m.createExecutionModel(ctx, executionModel) + workflowExecutionIdentifier, err := m.createExecutionModel(ctx, executionModel, tagsModel) if err != nil { return nil, err } diff --git a/pkg/repositories/config/migrations.go b/pkg/repositories/config/migrations.go index c24f0a2bd..d6af88be0 100644 --- a/pkg/repositories/config/migrations.go +++ b/pkg/repositories/config/migrations.go @@ -445,6 +445,16 @@ var LegacyMigrations = []*gormigrate.Migration{ return tx.Model(&models.Execution{}).Migrator().DropColumn(&models.Execution{}, "launch_entity") }, }, + // Create execution tags table + { + ID: "2023-05-26-execution-tags", + Migrate: func(tx *gorm.DB) error { + return tx.AutoMigrate(&models.ExecutionTags{}) + }, + Rollback: func(tx *gorm.DB) error { + return tx.Migrator().DropTable("execution_tags") + }, + }, } var NoopMigrations = []*gormigrate.Migration{ diff --git a/pkg/repositories/gormimpl/execution_repo.go b/pkg/repositories/gormimpl/execution_repo.go index e300dcd33..cc9d6d603 100644 --- a/pkg/repositories/gormimpl/execution_repo.go +++ b/pkg/repositories/gormimpl/execution_repo.go @@ -22,14 +22,28 @@ type ExecutionRepo struct { metrics gormMetrics } -func (r *ExecutionRepo) Create(ctx context.Context, input models.Execution) error { +func (r *ExecutionRepo) Create(ctx context.Context, input models.Execution, tags []models.ExecutionTags) error { timer := r.metrics.CreateDuration.Start() - tx := r.db.Omit("id").Create(&input) + // tx := r.db.Omit("id").Create(&input) + // if tx.Error != nil { + // return r.errorTransformer.ToFlyteAdminError(tx.Error) + // } + err := r.db.Transaction(func(_ *gorm.DB) error { + if len(tags) != 0 { + tx := r.db.Create(tags) + if tx.Error != nil { + return r.errorTransformer.ToFlyteAdminError(tx.Error) + } + } + tx := r.db.Omit("id").Create(&input) + if tx.Error != nil { + return r.errorTransformer.ToFlyteAdminError(tx.Error) + } + + return nil + }) timer.Stop() - if tx.Error != nil { - return r.errorTransformer.ToFlyteAdminError(tx.Error) - } - return nil + return err } func (r *ExecutionRepo) Get(ctx context.Context, input interfaces.Identifier) (models.Execution, error) { diff --git a/pkg/repositories/interfaces/execution_repo.go b/pkg/repositories/interfaces/execution_repo.go index 6a65d3d3f..a8b9bcc70 100644 --- a/pkg/repositories/interfaces/execution_repo.go +++ b/pkg/repositories/interfaces/execution_repo.go @@ -9,7 +9,7 @@ import ( // Defines the interface for interacting with workflow execution models. type ExecutionRepoInterface interface { // Inserts a workflow execution model into the database store. - Create(ctx context.Context, input models.Execution) error + Create(ctx context.Context, input models.Execution, tags []models.ExecutionTags) error // This updates only an existing execution model with all non-empty fields in the input. Update(ctx context.Context, execution models.Execution) error // Returns a matching execution if it exists. diff --git a/pkg/repositories/models/execution.go b/pkg/repositories/models/execution.go index 1c5e1300d..c74b8f0f0 100644 --- a/pkg/repositories/models/execution.go +++ b/pkg/repositories/models/execution.go @@ -61,3 +61,8 @@ type Execution struct { // The resource type of the entity used to launch the execution, one of 'launch_plan' or 'task' LaunchEntity string } + +type ExecutionTags struct { + ExecutionKey + Tag string `gorm:"primary_key;column:execution_tag" valid:"length(0|255)"` +} From 83c785754fed7f60f8fbf897547f7625894210c0 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Sat, 27 May 2023 23:25:44 -0700 Subject: [PATCH 02/26] Execution tags Signed-off-by: Kevin Su --- pkg/manager/impl/execution_manager.go | 113 ++++++++---------- pkg/repositories/config/migrations.go | 16 ++- pkg/repositories/gormimpl/common.go | 2 + pkg/repositories/gormimpl/execution_repo.go | 35 ++---- pkg/repositories/interfaces/execution_repo.go | 2 +- pkg/repositories/models/execution.go | 8 +- pkg/repositories/transformers/execution.go | 7 ++ 7 files changed, 89 insertions(+), 94 deletions(-) diff --git a/pkg/manager/impl/execution_manager.go b/pkg/manager/impl/execution_manager.go index c95766c33..a286dbb3e 100644 --- a/pkg/manager/impl/execution_manager.go +++ b/pkg/manager/impl/execution_manager.go @@ -450,7 +450,7 @@ func (m *ExecutionManager) getClusterAssignment(ctx context.Context, request *ad func (m *ExecutionManager) launchSingleTaskExecution( ctx context.Context, request admin.ExecutionCreateRequest, requestedAt time.Time) ( - context.Context, *models.Execution, []models.ExecutionTags, error) { + context.Context, *models.Execution, error) { taskModel, err := m.db.TaskRepo().Get(ctx, repositoryInterfaces.Identifier{ Project: request.Spec.LaunchPlan.Project, @@ -459,11 +459,11 @@ func (m *ExecutionManager) launchSingleTaskExecution( Version: request.Spec.LaunchPlan.Version, }) if err != nil { - return nil, nil, nil, err + return nil, nil, err } task, err := transformers.FromTaskModel(taskModel) if err != nil { - return nil, nil, nil, err + return nil, nil, err } // Prepare a skeleton workflow @@ -472,15 +472,15 @@ func (m *ExecutionManager) launchSingleTaskExecution( util.CreateOrGetWorkflowModel(ctx, request, m.db, m.workflowManager, m.namedEntityManager, taskIdentifier, &task) if err != nil { logger.Debugf(ctx, "Failed to created skeleton workflow for [%+v] with err: %v", taskIdentifier, err) - return nil, nil, nil, err + return nil, nil, err } workflow, err := transformers.FromWorkflowModel(*workflowModel) if err != nil { - return nil, nil, nil, err + return nil, nil, err } closure, err := util.FetchAndGetWorkflowClosure(ctx, m.storageClient, workflowModel.RemoteClosureIdentifier) if err != nil { - return nil, nil, nil, err + return nil, nil, err } closure.CreatedAt = workflow.Closure.CreatedAt workflow.Closure = closure @@ -488,7 +488,7 @@ func (m *ExecutionManager) launchSingleTaskExecution( launchPlan, err := util.CreateOrGetLaunchPlan(ctx, m.db, m.config, taskIdentifier, workflow.Closure.CompiledWorkflow.Primary.Template.Interface, workflowModel.ID, request.Spec) if err != nil { - return nil, nil, nil, err + return nil, nil, err } executionInputs, err := validation.CheckAndFetchInputsForExecution( @@ -500,7 +500,7 @@ func (m *ExecutionManager) launchSingleTaskExecution( logger.Debugf(ctx, "Failed to CheckAndFetchInputsForExecution with request.Inputs: %+v"+ "fixed inputs: %+v and expected inputs: %+v with err %v", request.Inputs, launchPlan.Spec.FixedInputs, launchPlan.Closure.ExpectedInputs, err) - return nil, nil, nil, err + return nil, nil, err } name := util.GetExecutionName(request) @@ -524,7 +524,7 @@ func (m *ExecutionManager) launchSingleTaskExecution( var sourceExecutionID uint parentNodeExecutionID, sourceExecutionID, err = m.getInheritedExecMetadata(ctx, requestSpec, &workflowExecutionID) if err != nil { - return nil, nil, nil, err + return nil, nil, err } // Dynamically assign task resource defaults. @@ -538,15 +538,15 @@ func (m *ExecutionManager) launchSingleTaskExecution( inputsURI, err := common.OffloadLiteralMap(ctx, m.storageClient, request.Inputs, workflowExecutionID.Project, workflowExecutionID.Domain, workflowExecutionID.Name, shared.Inputs) if err != nil { - return nil, nil, nil, err + return nil, nil, err } userInputsURI, err := common.OffloadLiteralMap(ctx, m.storageClient, request.Inputs, workflowExecutionID.Project, workflowExecutionID.Domain, workflowExecutionID.Name, shared.UserInputs) if err != nil { - return nil, nil, nil, err + return nil, nil, err } executionConfig, err := m.getExecutionConfig(ctx, &request, nil) if err != nil { - return nil, nil, nil, err + return nil, nil, err } var labels map[string]string @@ -556,7 +556,7 @@ func (m *ExecutionManager) launchSingleTaskExecution( labels, err = m.addProjectLabels(ctx, request.Project, labels) if err != nil { - return nil, nil, nil, err + return nil, nil, err } var annotations map[string]string @@ -571,7 +571,7 @@ func (m *ExecutionManager) launchSingleTaskExecution( clusterAssignment, err := m.getClusterAssignment(ctx, &request) if err != nil { - return nil, nil, nil, err + return nil, nil, err } executionParameters := workflowengineInterfaces.ExecutionParameters{ @@ -589,7 +589,7 @@ func (m *ExecutionManager) launchSingleTaskExecution( overrides, err := m.addPluginOverrides(ctx, &workflowExecutionID, workflowExecutionID.Name, "") if err != nil { - return nil, nil, nil, err + return nil, nil, err } if overrides != nil { executionParameters.TaskPluginOverrides = overrides @@ -614,7 +614,7 @@ func (m *ExecutionManager) launchSingleTaskExecution( m.systemMetrics.PropellerFailures.Inc() logger.Infof(ctx, "Failed to execute workflow %+v with execution id %+v and inputs %+v with err %v", request, workflowExecutionID, request.Inputs, err) - return nil, nil, nil, err + return nil, nil, err } executionCreatedAt := time.Now() acceptanceDelay := executionCreatedAt.Sub(requestedAt) @@ -655,10 +655,10 @@ func (m *ExecutionManager) launchSingleTaskExecution( if err != nil { logger.Infof(ctx, "Failed to create execution model in transformer for id: [%+v] with err: %v", workflowExecutionID, err) - return nil, nil, nil, err + return nil, nil, err } m.userMetrics.WorkflowExecutionInputBytes.Observe(float64(proto.Size(request.Inputs))) - return ctx, executionModel, nil, nil + return ctx, executionModel, nil } func resolveAuthRole(request *admin.ExecutionCreateRequest, launchPlan *admin.LaunchPlan) *admin.AuthRole { @@ -706,28 +706,13 @@ func resolveSecurityCtx(ctx context.Context, executionConfigSecurityCtx *core.Se } } -func createTagsModel(project, domain, name string, tags []string) []models.ExecutionTags { - tagsModel := make([]models.ExecutionTags, len(tags)) - for _, tag := range tags { - tagsModel = append(tagsModel, models.ExecutionTags{ - ExecutionKey: models.ExecutionKey{ - Project: project, - Domain: domain, - Name: name, - }, - Tag: tag, - }) - } - return tagsModel -} - func (m *ExecutionManager) launchExecutionAndPrepareModel( ctx context.Context, request admin.ExecutionCreateRequest, requestedAt time.Time) ( - context.Context, *models.Execution, []models.ExecutionTags, error) { + context.Context, *models.Execution, error) { err := validation.ValidateExecutionRequest(ctx, request, m.db, m.config.ApplicationConfiguration()) if err != nil { logger.Debugf(ctx, "Failed to validate ExecutionCreateRequest %+v with err %v", request, err) - return nil, nil, nil, err + return nil, nil, err } if request.Spec.LaunchPlan.ResourceType == core.ResourceType_TASK { logger.Debugf(ctx, "Launching single task execution with [%+v]", request.Spec.LaunchPlan) @@ -737,12 +722,12 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( launchPlanModel, err := util.GetLaunchPlanModel(ctx, m.db, *request.Spec.LaunchPlan) if err != nil { logger.Debugf(ctx, "Failed to get launch plan model for ExecutionCreateRequest %+v with err %v", request, err) - return nil, nil, nil, err + return nil, nil, err } launchPlan, err := transformers.FromLaunchPlanModel(launchPlanModel) if err != nil { logger.Debugf(ctx, "Failed to transform launch plan model %+v with err %v", launchPlanModel, err) - return nil, nil, nil, err + return nil, nil, err } executionInputs, err := validation.CheckAndFetchInputsForExecution( request.Inputs, @@ -754,23 +739,23 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( logger.Debugf(ctx, "Failed to CheckAndFetchInputsForExecution with request.Inputs: %+v"+ "fixed inputs: %+v and expected inputs: %+v with err %v", request.Inputs, launchPlan.Spec.FixedInputs, launchPlan.Closure.ExpectedInputs, err) - return nil, nil, nil, err + return nil, nil, err } workflowModel, err := util.GetWorkflowModel(ctx, m.db, *launchPlan.Spec.WorkflowId) if err != nil { logger.Debugf(ctx, "Failed to get workflow with id %+v with err %v", launchPlan.Spec.WorkflowId, err) - return nil, nil, nil, err + return nil, nil, err } workflow, err := transformers.FromWorkflowModel(workflowModel) if err != nil { logger.Debugf(ctx, "Failed to get workflow with id %+v with err %v", launchPlan.Spec.WorkflowId, err) - return nil, nil, nil, err + return nil, nil, err } closure, err := util.FetchAndGetWorkflowClosure(ctx, m.storageClient, workflowModel.RemoteClosureIdentifier) if err != nil { logger.Debugf(ctx, "Failed to get workflow with id %+v with err %v", launchPlan.Spec.WorkflowId, err) - return nil, nil, nil, err + return nil, nil, err } closure.CreatedAt = workflow.Closure.CreatedAt workflow.Closure = closure @@ -793,7 +778,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( var sourceExecutionID uint parentNodeExecutionID, sourceExecutionID, err = m.getInheritedExecMetadata(ctx, requestSpec, &workflowExecutionID) if err != nil { - return nil, nil, nil, err + return nil, nil, err } // Dynamically assign task resource defaults. @@ -807,16 +792,16 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( inputsURI, err := common.OffloadLiteralMap(ctx, m.storageClient, executionInputs, workflowExecutionID.Project, workflowExecutionID.Domain, workflowExecutionID.Name, shared.Inputs) if err != nil { - return nil, nil, nil, err + return nil, nil, err } userInputsURI, err := common.OffloadLiteralMap(ctx, m.storageClient, request.Inputs, workflowExecutionID.Project, workflowExecutionID.Domain, workflowExecutionID.Name, shared.UserInputs) if err != nil { - return nil, nil, nil, err + return nil, nil, err } executionConfig, err := m.getExecutionConfig(ctx, &request, launchPlan) if err != nil { - return nil, nil, nil, err + return nil, nil, err } namespace := common.GetNamespaceName( @@ -824,15 +809,15 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( labels, err := resolveStringMap(executionConfig.GetLabels(), launchPlan.Spec.Labels, "labels", m.config.RegistrationValidationConfiguration().GetMaxLabelEntries()) if err != nil { - return nil, nil, nil, err + return nil, nil, err } labels, err = m.addProjectLabels(ctx, request.Project, labels) if err != nil { - return nil, nil, nil, err + return nil, nil, err } annotations, err := resolveStringMap(executionConfig.GetAnnotations(), launchPlan.Spec.Annotations, "annotations", m.config.RegistrationValidationConfiguration().GetMaxAnnotationEntries()) if err != nil { - return nil, nil, nil, err + return nil, nil, err } var rawOutputDataConfig *admin.RawOutputDataConfig if executionConfig.RawOutputDataConfig != nil { @@ -841,7 +826,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( clusterAssignment, err := m.getClusterAssignment(ctx, &request) if err != nil { - return nil, nil, nil, err + return nil, nil, err } executionParameters := workflowengineInterfaces.ExecutionParameters{ @@ -859,7 +844,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( overrides, err := m.addPluginOverrides(ctx, &workflowExecutionID, launchPlan.GetSpec().WorkflowId.Name, launchPlan.Id.Name) if err != nil { - return nil, nil, nil, err + return nil, nil, err } if overrides != nil { executionParameters.TaskPluginOverrides = overrides @@ -885,7 +870,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( m.systemMetrics.PropellerFailures.Inc() logger.Infof(ctx, "Failed to execute workflow %+v with execution id %+v and inputs %+v with err %v", request, workflowExecutionID, executionInputs, err) - return nil, nil, nil, err + return nil, nil, err } executionCreatedAt := time.Now() acceptanceDelay := executionCreatedAt.Sub(requestedAt) @@ -922,26 +907,26 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( SecurityContext: executionConfig.SecurityContext, LaunchEntity: launchPlan.Id.ResourceType, Namespace: namespace, + Tags: request.Spec.Tags, }) if err != nil { logger.Infof(ctx, "Failed to create execution model in transformer for id: [%+v] with err: %v", workflowExecutionID, err) - return nil, nil, nil, err + return nil, nil, err } - tagsModel := createTagsModel(request.Project, request.Domain, request.Name, request.Spec.Tags) - return ctx, executionModel, tagsModel, nil + return ctx, executionModel, nil } // Inserts an execution model into the database store and emits platform metrics. func (m *ExecutionManager) createExecutionModel( - ctx context.Context, executionModel *models.Execution, tagsModel []models.ExecutionTags) (*core.WorkflowExecutionIdentifier, error) { + ctx context.Context, executionModel *models.Execution) (*core.WorkflowExecutionIdentifier, error) { workflowExecutionIdentifier := core.WorkflowExecutionIdentifier{ Project: executionModel.ExecutionKey.Project, Domain: executionModel.ExecutionKey.Domain, Name: executionModel.ExecutionKey.Name, } - err := m.db.ExecutionRepo().Create(ctx, *executionModel, tagsModel) + err := m.db.ExecutionRepo().Create(ctx, *executionModel) if err != nil { logger.Debugf(ctx, "failed to save newly created execution [%+v] with id %+v to db with err %v", workflowExecutionIdentifier, workflowExecutionIdentifier, err) @@ -962,13 +947,12 @@ func (m *ExecutionManager) CreateExecution( request.Inputs = request.GetSpec().GetInputs() } var executionModel *models.Execution - var tagsModel []models.ExecutionTags var err error - ctx, executionModel, tagsModel, err = m.launchExecutionAndPrepareModel(ctx, request, requestedAt) + ctx, executionModel, err = m.launchExecutionAndPrepareModel(ctx, request, requestedAt) if err != nil { return nil, err } - workflowExecutionIdentifier, err := m.createExecutionModel(ctx, executionModel, tagsModel) + workflowExecutionIdentifier, err := m.createExecutionModel(ctx, executionModel) if err != nil { return nil, err } @@ -1013,8 +997,7 @@ func (m *ExecutionManager) RelaunchExecution( executionSpec.Metadata.ReferenceExecution = existingExecution.Id executionSpec.OverwriteCache = request.GetOverwriteCache() var executionModel *models.Execution - var tagsModel []models.ExecutionTags - ctx, executionModel, tagsModel, err = m.launchExecutionAndPrepareModel(ctx, admin.ExecutionCreateRequest{ + ctx, executionModel, err = m.launchExecutionAndPrepareModel(ctx, admin.ExecutionCreateRequest{ Project: request.Id.Project, Domain: request.Id.Domain, Name: request.Name, @@ -1025,7 +1008,7 @@ func (m *ExecutionManager) RelaunchExecution( return nil, err } executionModel.SourceExecutionID = existingExecutionModel.ID - workflowExecutionIdentifier, err := m.createExecutionModel(ctx, executionModel, tagsModel) + workflowExecutionIdentifier, err := m.createExecutionModel(ctx, executionModel) if err != nil { return nil, err } @@ -1065,8 +1048,7 @@ func (m *ExecutionManager) RecoverExecution( executionSpec.Metadata.Mode = admin.ExecutionMetadata_RECOVERED executionSpec.Metadata.ReferenceExecution = existingExecution.Id var executionModel *models.Execution - var tagsModel []models.ExecutionTags - ctx, executionModel, tagsModel, err = m.launchExecutionAndPrepareModel(ctx, admin.ExecutionCreateRequest{ + ctx, executionModel, err = m.launchExecutionAndPrepareModel(ctx, admin.ExecutionCreateRequest{ Project: request.Id.Project, Domain: request.Id.Domain, Name: request.Name, @@ -1077,7 +1059,7 @@ func (m *ExecutionManager) RecoverExecution( return nil, err } executionModel.SourceExecutionID = existingExecutionModel.ID - workflowExecutionIdentifier, err := m.createExecutionModel(ctx, executionModel, tagsModel) + workflowExecutionIdentifier, err := m.createExecutionModel(ctx, executionModel) if err != nil { return nil, err } @@ -1498,6 +1480,7 @@ func (m *ExecutionManager) ListExecutions( execution.Spec.Inputs = nil execution.Closure.ComputedInputs = nil } + // END TO BE DELETED var token string if len(executionList) == int(request.Limit) { diff --git a/pkg/repositories/config/migrations.go b/pkg/repositories/config/migrations.go index d6af88be0..df529b422 100644 --- a/pkg/repositories/config/migrations.go +++ b/pkg/repositories/config/migrations.go @@ -445,16 +445,26 @@ var LegacyMigrations = []*gormigrate.Migration{ return tx.Model(&models.Execution{}).Migrator().DropColumn(&models.Execution{}, "launch_entity") }, }, - // Create execution tags table + // Create execution tags table. { - ID: "2023-05-26-execution-tags", + ID: "2023-05-27-execution_tags", Migrate: func(tx *gorm.DB) error { - return tx.AutoMigrate(&models.ExecutionTags{}) + return tx.AutoMigrate(&models.ExecutionTag{}) }, Rollback: func(tx *gorm.DB) error { return tx.Migrator().DropTable("execution_tags") }, }, + // Add execution <-> execution-tags join table. + { + ID: "2023-05-28-execution_execution_tags", + Migrate: func(tx *gorm.DB) error { + return tx.AutoMigrate(&models.Execution{}) + }, + Rollback: func(tx *gorm.DB) error { + return tx.Migrator().DropTable("execution_execution_tags") + }, + }, } var NoopMigrations = []*gormigrate.Migration{ diff --git a/pkg/repositories/gormimpl/common.go b/pkg/repositories/gormimpl/common.go index 69dada2f0..79a2886e9 100644 --- a/pkg/repositories/gormimpl/common.go +++ b/pkg/repositories/gormimpl/common.go @@ -29,6 +29,8 @@ const taskTableName = "tasks" const workflowTableName = "workflows" const descriptionEntityTableName = "description_entities" +const executionTagsTableName = "execution_tags" + const limit = "limit" const filters = "filters" diff --git a/pkg/repositories/gormimpl/execution_repo.go b/pkg/repositories/gormimpl/execution_repo.go index cc9d6d603..e49950b36 100644 --- a/pkg/repositories/gormimpl/execution_repo.go +++ b/pkg/repositories/gormimpl/execution_repo.go @@ -22,31 +22,22 @@ type ExecutionRepo struct { metrics gormMetrics } -func (r *ExecutionRepo) Create(ctx context.Context, input models.Execution, tags []models.ExecutionTags) error { +var leftJoinExecutionToTags = fmt.Sprintf( + "LEFT JOIN %s ON %s.project = %s.execution_project AND %s.domain = %s.execution_domain AND %s.name = %s.execution_name", executionTagsTableName, executionTagsTableName, executionTableName, + executionTagsTableName, executionTableName, + executionTagsTableName, executionTableName) + +func (r *ExecutionRepo) Create(ctx context.Context, input models.Execution) error { timer := r.metrics.CreateDuration.Start() - // tx := r.db.Omit("id").Create(&input) - // if tx.Error != nil { - // return r.errorTransformer.ToFlyteAdminError(tx.Error) - // } - err := r.db.Transaction(func(_ *gorm.DB) error { - if len(tags) != 0 { - tx := r.db.Create(tags) - if tx.Error != nil { - return r.errorTransformer.ToFlyteAdminError(tx.Error) - } - } - tx := r.db.Omit("id").Create(&input) - if tx.Error != nil { - return r.errorTransformer.ToFlyteAdminError(tx.Error) - } - - return nil - }) + tx := r.db.Omit("id").Create(&input) timer.Stop() - return err + if tx.Error != nil { + return r.errorTransformer.ToFlyteAdminError(tx.Error) + } + return nil } -func (r *ExecutionRepo) Get(ctx context.Context, input interfaces.Identifier) (models.Execution, error) { +func (r *ExecutionRepo) Get(_ context.Context, input interfaces.Identifier) (models.Execution, error) { var execution models.Execution timer := r.metrics.GetDuration.Start() tx := r.db.Where(&models.Execution{ @@ -80,7 +71,7 @@ func (r *ExecutionRepo) Update(ctx context.Context, execution models.Execution) return nil } -func (r *ExecutionRepo) List(ctx context.Context, input interfaces.ListResourceInput) ( +func (r *ExecutionRepo) List(_ context.Context, input interfaces.ListResourceInput) ( interfaces.ExecutionCollectionOutput, error) { var err error // First validate input. diff --git a/pkg/repositories/interfaces/execution_repo.go b/pkg/repositories/interfaces/execution_repo.go index a8b9bcc70..6a65d3d3f 100644 --- a/pkg/repositories/interfaces/execution_repo.go +++ b/pkg/repositories/interfaces/execution_repo.go @@ -9,7 +9,7 @@ import ( // Defines the interface for interacting with workflow execution models. type ExecutionRepoInterface interface { // Inserts a workflow execution model into the database store. - Create(ctx context.Context, input models.Execution, tags []models.ExecutionTags) error + Create(ctx context.Context, input models.Execution) error // This updates only an existing execution model with all non-empty fields in the input. Update(ctx context.Context, execution models.Execution) error // Returns a matching execution if it exists. diff --git a/pkg/repositories/models/execution.go b/pkg/repositories/models/execution.go index c74b8f0f0..fd3f94fd6 100644 --- a/pkg/repositories/models/execution.go +++ b/pkg/repositories/models/execution.go @@ -60,9 +60,11 @@ type Execution struct { State *int32 `gorm:"index;default:0"` // The resource type of the entity used to launch the execution, one of 'launch_plan' or 'task' LaunchEntity string + + Tags []ExecutionTag `gorm:"many2many:execution_execution_tags;"` } -type ExecutionTags struct { - ExecutionKey - Tag string `gorm:"primary_key;column:execution_tag" valid:"length(0|255)"` +type ExecutionTag struct { + BaseModel + Name string `gorm:"primary_key;column:name" valid:"length(0|255)"` } diff --git a/pkg/repositories/transformers/execution.go b/pkg/repositories/transformers/execution.go index f9e626ebf..b35312480 100644 --- a/pkg/repositories/transformers/execution.go +++ b/pkg/repositories/transformers/execution.go @@ -45,6 +45,7 @@ type CreateExecutionModelInput struct { SecurityContext *core.SecurityContext LaunchEntity core.ResourceType Namespace string + Tags []string } type ExecutionTransformerOptions struct { @@ -99,6 +100,11 @@ func CreateExecutionModel(input CreateExecutionModelInput) (*models.Execution, e } activeExecution := int32(admin.ExecutionState_EXECUTION_ACTIVE) + tags := make([]models.ExecutionTag, len(input.Tags)) + for i, tag := range input.Tags { + tags[i] = models.ExecutionTag{Name: tag} + } + executionModel := &models.Execution{ ExecutionKey: models.ExecutionKey{ Project: input.WorkflowExecutionID.Project, @@ -119,6 +125,7 @@ func CreateExecutionModel(input CreateExecutionModelInput) (*models.Execution, e User: requestSpec.Metadata.Principal, State: &activeExecution, LaunchEntity: strings.ToLower(input.LaunchEntity.String()), + Tags: tags, } // A reference launch entity can be one of either or a task OR launch plan. Traditionally, workflows are executed // with a reference launch plan which is why this behavior is the default below. From 88b77bbdcc00dff0d5f1731d41d3e75d083d4bfc Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 29 May 2023 04:45:30 -0700 Subject: [PATCH 03/26] Add tags filter Signed-off-by: Kevin Su --- pkg/common/entity.go | 24 +++++++------ pkg/manager/impl/util/filters.go | 22 ++++++------ pkg/repositories/gormimpl/common.go | 23 +++++++------ pkg/repositories/gormimpl/execution_repo.go | 19 +++++++++-- .../gormimpl/execution_repo_test.go | 34 +++++++++++++++++++ 5 files changed, 87 insertions(+), 35 deletions(-) diff --git a/pkg/common/entity.go b/pkg/common/entity.go index 65d3161cf..9aaaf5169 100644 --- a/pkg/common/entity.go +++ b/pkg/common/entity.go @@ -7,17 +7,19 @@ import ( type Entity = string const ( - Execution = "e" - LaunchPlan = "l" - NodeExecution = "ne" - NodeExecutionEvent = "nee" - Task = "t" - TaskExecution = "te" - Workflow = "w" - NamedEntity = "nen" - NamedEntityMetadata = "nem" - Project = "p" - Signal = "s" + Execution = "e" + LaunchPlan = "l" + NodeExecution = "ne" + NodeExecutionEvent = "nee" + Task = "t" + TaskExecution = "te" + Workflow = "w" + NamedEntity = "nen" + NamedEntityMetadata = "nem" + Project = "p" + Signal = "s" + ExecutionTag = "et" + ExecutionExecutionTag = "eet" ) // ResourceTypeToEntity maps a resource type to an entity suitable for use with Database filters diff --git a/pkg/manager/impl/util/filters.go b/pkg/manager/impl/util/filters.go index e52bfb8b1..abd85d4c7 100644 --- a/pkg/manager/impl/util/filters.go +++ b/pkg/manager/impl/util/filters.go @@ -52,16 +52,18 @@ const filterFieldEntityPrefixFmt = "%s." const secondsFormat = "%vs" var filterFieldEntityPrefix = map[string]common.Entity{ - "task": common.Task, - "workflow": common.Workflow, - "launch_plan": common.LaunchPlan, - "execution": common.Execution, - "node_execution": common.NodeExecution, - "task_execution": common.TaskExecution, - "entities": common.NamedEntity, - "named_entity_metadata": common.NamedEntityMetadata, - "project": common.Project, - "signal": common.Signal, + "task": common.Task, + "workflow": common.Workflow, + "launch_plan": common.LaunchPlan, + "execution": common.Execution, + "node_execution": common.NodeExecution, + "task_execution": common.TaskExecution, + "entities": common.NamedEntity, + "named_entity_metadata": common.NamedEntityMetadata, + "project": common.Project, + "signal": common.Signal, + "execution_tag": common.ExecutionTag, + "execution_execution_tag": common.ExecutionExecutionTag, } func parseField(field string, primaryEntity common.Entity) (common.Entity, string) { diff --git a/pkg/repositories/gormimpl/common.go b/pkg/repositories/gormimpl/common.go index 79a2886e9..1378b8f73 100644 --- a/pkg/repositories/gormimpl/common.go +++ b/pkg/repositories/gormimpl/common.go @@ -29,7 +29,7 @@ const taskTableName = "tasks" const workflowTableName = "workflows" const descriptionEntityTableName = "description_entities" -const executionTagsTableName = "execution_tags" +const executionTagsTableName = "execution_execution_tags" const limit = "limit" const filters = "filters" @@ -37,16 +37,17 @@ const filters = "filters" var identifierGroupBy = fmt.Sprintf("%s, %s, %s", Project, Domain, Name) var entityToTableName = map[common.Entity]string{ - common.Execution: "executions", - common.LaunchPlan: "launch_plans", - common.NodeExecution: "node_executions", - common.NodeExecutionEvent: "node_execution_events", - common.Task: "tasks", - common.TaskExecution: "task_executions", - common.Workflow: "workflows", - common.NamedEntity: "entities", - common.NamedEntityMetadata: "named_entity_metadata", - common.Signal: "signals", + common.Execution: "executions", + common.LaunchPlan: "launch_plans", + common.NodeExecution: "node_executions", + common.NodeExecutionEvent: "node_execution_events", + common.Task: "tasks", + common.TaskExecution: "task_executions", + common.Workflow: "workflows", + common.NamedEntity: "entities", + common.NamedEntityMetadata: "named_entity_metadata", + common.Signal: "signals", + common.ExecutionExecutionTag: "execution_execution_tags", } var innerJoinExecToNodeExec = fmt.Sprintf( diff --git a/pkg/repositories/gormimpl/execution_repo.go b/pkg/repositories/gormimpl/execution_repo.go index e49950b36..73f7deb22 100644 --- a/pkg/repositories/gormimpl/execution_repo.go +++ b/pkg/repositories/gormimpl/execution_repo.go @@ -22,8 +22,8 @@ type ExecutionRepo struct { metrics gormMetrics } -var leftJoinExecutionToTags = fmt.Sprintf( - "LEFT JOIN %s ON %s.project = %s.execution_project AND %s.domain = %s.execution_domain AND %s.name = %s.execution_name", executionTagsTableName, executionTagsTableName, executionTableName, +var joinExecutionToTags = fmt.Sprintf( + "JOIN %s ON %s.execution_project = %s.execution_project AND %s.execution_domain = %s.execution_domain AND %s.execution_name = %s.execution_name", executionTagsTableName, executionTagsTableName, executionTableName, executionTagsTableName, executionTableName, executionTagsTableName, executionTableName) @@ -79,7 +79,7 @@ func (r *ExecutionRepo) List(_ context.Context, input interfaces.ListResourceInp return interfaces.ExecutionCollectionOutput{}, err } var executions []models.Execution - tx := r.db.Limit(input.Limit).Offset(input.Offset) + tx := r.db.Limit(input.Limit).Offset(input.Offset).Preload("Tags") // And add join condition as required by user-specified filters (which can potentially include join table attrs). if ok := input.JoinTableEntities[common.LaunchPlan]; ok { tx = tx.Joins(fmt.Sprintf("INNER JOIN %s ON %s.launch_plan_id = %s.id", @@ -94,6 +94,19 @@ func (r *ExecutionRepo) List(_ context.Context, input interfaces.ListResourceInp taskTableName, executionTableName, taskTableName)) } + if ok := input.JoinTableEntities[common.ExecutionExecutionTag]; ok { + tx = tx.Joins(fmt.Sprintf("INNER JOIN %s ON %s.execution_name = %s.execution_name", + executionTagsTableName, executionTableName, executionTagsTableName)) + } + + //tx = tx.Where("execution_name IN (?)", r.db.Table("execution_execution_tags"). + // Select("execution_name"). + // Where("execution_tag_name IN ?", []string{"mlfloow", "name2"})) + + // tx = tx.Joins(fmt.Sprintf("INNER JOIN %s ON %s.execution_name = %s.execution_name", + // executionTagsTableName, executionTableName, executionTagsTableName)) + // tx = tx.Where(fmt.Sprintf("%s.execution_tag_name IN ?", executionTagsTableName), []string{"mlflow", "name2"}) + // Apply filters tx, err = applyScopedFilters(tx, input.InlineFilters, input.MapFilters) if err != nil { diff --git a/pkg/repositories/gormimpl/execution_repo_test.go b/pkg/repositories/gormimpl/execution_repo_test.go index daf73f6e5..c62af4957 100644 --- a/pkg/repositories/gormimpl/execution_repo_test.go +++ b/pkg/repositories/gormimpl/execution_repo_test.go @@ -268,6 +268,37 @@ func TestListExecutions_Order(t *testing.T) { }) assert.NoError(t, err) assert.True(t, mockQuery.Triggered) + // vals := []string{"SuperAwesomeProject", "AnotherAwesomeProject"} + // filter, err := NewRepeatedValueFilter(Workflow, ValueIn, "project", vals) +} + +func TestListExecutions_WithTags(t *testing.T) { + executionRepo := NewExecutionRepo(GetDbForTest(t), errors.NewTestErrorTransformer(), mockScope.NewTestScope()) + + executions := make([]map[string]interface{}, 0) + GlobalMock := mocket.Catcher.Reset() + // Only match on queries that include ordering by name + mockQuery := GlobalMock.NewMock().WithQuery(`name asc`) + mockQuery.WithReply(executions) + + sortParameter, _ := common.NewSortParameter(admin.Sort{ + Direction: admin.Sort_ASCENDING, + Key: "name", + }) + vals := []string{"tag1", "tag2"} + tagFilter, err := common.NewRepeatedValueFilter(common.ExecutionTag, common.ValueIn, "execution_tag_name", vals) + _, err = executionRepo.List(context.Background(), interfaces.ListResourceInput{ + SortParameter: sortParameter, + InlineFilters: []common.InlineFilter{ + getEqualityFilter(common.Task, "project", project), + getEqualityFilter(common.Task, "domain", domain), + getEqualityFilter(common.Task, "name", name), + tagFilter, + }, + Limit: 20, + }) + assert.NoError(t, err) + assert.True(t, mockQuery.Triggered) } func TestListExecutions_MissingParameters(t *testing.T) { @@ -315,6 +346,8 @@ func TestListExecutionsForWorkflow(t *testing.T) { // Only match on queries that append expected filters GlobalMock.NewMock().WithQuery(`SELECT "executions"."id","executions"."created_at","executions"."updated_at","executions"."deleted_at","executions"."execution_project","executions"."execution_domain","executions"."execution_name","executions"."launch_plan_id","executions"."workflow_id","executions"."task_id","executions"."phase","executions"."closure","executions"."spec","executions"."started_at","executions"."execution_created_at","executions"."execution_updated_at","executions"."duration","executions"."abort_cause","executions"."mode","executions"."source_execution_id","executions"."parent_node_execution_id","executions"."cluster","executions"."inputs_uri","executions"."user_inputs_uri","executions"."error_kind","executions"."error_code","executions"."user","executions"."state","executions"."launch_entity" FROM "executions" INNER JOIN workflows ON executions.workflow_id = workflows.id INNER JOIN tasks ON executions.task_id = tasks.id WHERE executions.execution_project = $1 AND executions.execution_domain = $2 AND executions.execution_name = $3 AND workflows.name = $4 AND tasks.name = $5 LIMIT 20`).WithReply(executions) + vals := []string{"tag1", "tag2"} + tagFilter, err := common.NewRepeatedValueFilter(common.ExecutionTag, common.ValueIn, "execution_tag_name", vals) collection, err := executionRepo.List(context.Background(), interfaces.ListResourceInput{ InlineFilters: []common.InlineFilter{ getEqualityFilter(common.Execution, "project", project), @@ -322,6 +355,7 @@ func TestListExecutionsForWorkflow(t *testing.T) { getEqualityFilter(common.Execution, "name", "1"), getEqualityFilter(common.Workflow, "name", "workflow_name"), getEqualityFilter(common.Task, "name", "task_name"), + tagFilter, }, Limit: 20, JoinTableEntities: map[common.Entity]bool{ From 54c9c41129a0214a62f3c71003a4ce8f14fcdf9b Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 31 May 2023 23:03:59 -0700 Subject: [PATCH 04/26] Add execution tags table Signed-off-by: Kevin Su --- pkg/repositories/config/migrations.go | 2 +- pkg/repositories/models/execution.go | 4 ++-- pkg/repositories/transformers/execution.go | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/repositories/config/migrations.go b/pkg/repositories/config/migrations.go index df529b422..0490e1e80 100644 --- a/pkg/repositories/config/migrations.go +++ b/pkg/repositories/config/migrations.go @@ -449,7 +449,7 @@ var LegacyMigrations = []*gormigrate.Migration{ { ID: "2023-05-27-execution_tags", Migrate: func(tx *gorm.DB) error { - return tx.AutoMigrate(&models.ExecutionTag{}) + return tx.AutoMigrate(&models.AdminTag{}) }, Rollback: func(tx *gorm.DB) error { return tx.Migrator().DropTable("execution_tags") diff --git a/pkg/repositories/models/execution.go b/pkg/repositories/models/execution.go index fd3f94fd6..7f2f0dbcf 100644 --- a/pkg/repositories/models/execution.go +++ b/pkg/repositories/models/execution.go @@ -61,10 +61,10 @@ type Execution struct { // The resource type of the entity used to launch the execution, one of 'launch_plan' or 'task' LaunchEntity string - Tags []ExecutionTag `gorm:"many2many:execution_execution_tags;"` + Tags []AdminTag `gorm:"many2many:execution_admin_tags;"` } -type ExecutionTag struct { +type AdminTag struct { BaseModel Name string `gorm:"primary_key;column:name" valid:"length(0|255)"` } diff --git a/pkg/repositories/transformers/execution.go b/pkg/repositories/transformers/execution.go index b35312480..a4f7d690e 100644 --- a/pkg/repositories/transformers/execution.go +++ b/pkg/repositories/transformers/execution.go @@ -100,9 +100,9 @@ func CreateExecutionModel(input CreateExecutionModelInput) (*models.Execution, e } activeExecution := int32(admin.ExecutionState_EXECUTION_ACTIVE) - tags := make([]models.ExecutionTag, len(input.Tags)) + tags := make([]models.AdminTag, len(input.Tags)) for i, tag := range input.Tags { - tags[i] = models.ExecutionTag{Name: tag} + tags[i] = models.AdminTag{Name: tag} } executionModel := &models.Execution{ From d4a0f6a382c97331bb928404249ca2ac320e18ff Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 1 Jun 2023 13:26:02 -0700 Subject: [PATCH 05/26] update tests Signed-off-by: Kevin Su --- pkg/common/entity.go | 26 +++++++++---------- pkg/manager/impl/util/filters.go | 24 ++++++++--------- pkg/repositories/config/migrations.go | 12 ++++----- pkg/repositories/gormimpl/common.go | 25 +++++++++--------- pkg/repositories/gormimpl/execution_repo.go | 16 ++++++------ .../gormimpl/execution_repo_test.go | 12 ++++----- 6 files changed, 56 insertions(+), 59 deletions(-) diff --git a/pkg/common/entity.go b/pkg/common/entity.go index 9aaaf5169..cbb96e53d 100644 --- a/pkg/common/entity.go +++ b/pkg/common/entity.go @@ -7,19 +7,19 @@ import ( type Entity = string const ( - Execution = "e" - LaunchPlan = "l" - NodeExecution = "ne" - NodeExecutionEvent = "nee" - Task = "t" - TaskExecution = "te" - Workflow = "w" - NamedEntity = "nen" - NamedEntityMetadata = "nem" - Project = "p" - Signal = "s" - ExecutionTag = "et" - ExecutionExecutionTag = "eet" + Execution = "e" + LaunchPlan = "l" + NodeExecution = "ne" + NodeExecutionEvent = "nee" + Task = "t" + TaskExecution = "te" + Workflow = "w" + NamedEntity = "nen" + NamedEntityMetadata = "nem" + Project = "p" + Signal = "s" + AdminTag = "at" + ExecutionAdminTag = "eat" ) // ResourceTypeToEntity maps a resource type to an entity suitable for use with Database filters diff --git a/pkg/manager/impl/util/filters.go b/pkg/manager/impl/util/filters.go index abd85d4c7..70a65db70 100644 --- a/pkg/manager/impl/util/filters.go +++ b/pkg/manager/impl/util/filters.go @@ -52,18 +52,18 @@ const filterFieldEntityPrefixFmt = "%s." const secondsFormat = "%vs" var filterFieldEntityPrefix = map[string]common.Entity{ - "task": common.Task, - "workflow": common.Workflow, - "launch_plan": common.LaunchPlan, - "execution": common.Execution, - "node_execution": common.NodeExecution, - "task_execution": common.TaskExecution, - "entities": common.NamedEntity, - "named_entity_metadata": common.NamedEntityMetadata, - "project": common.Project, - "signal": common.Signal, - "execution_tag": common.ExecutionTag, - "execution_execution_tag": common.ExecutionExecutionTag, + "task": common.Task, + "workflow": common.Workflow, + "launch_plan": common.LaunchPlan, + "execution": common.Execution, + "node_execution": common.NodeExecution, + "task_execution": common.TaskExecution, + "entities": common.NamedEntity, + "named_entity_metadata": common.NamedEntityMetadata, + "project": common.Project, + "signal": common.Signal, + "admin_tag": common.AdminTag, + "execution_admin_tag": common.ExecutionAdminTag, } func parseField(field string, primaryEntity common.Entity) (common.Entity, string) { diff --git a/pkg/repositories/config/migrations.go b/pkg/repositories/config/migrations.go index 0490e1e80..925064612 100644 --- a/pkg/repositories/config/migrations.go +++ b/pkg/repositories/config/migrations.go @@ -445,24 +445,24 @@ var LegacyMigrations = []*gormigrate.Migration{ return tx.Model(&models.Execution{}).Migrator().DropColumn(&models.Execution{}, "launch_entity") }, }, - // Create execution tags table. + // Create admin tags table. { - ID: "2023-05-27-execution_tags", + ID: "2023-05-31-admin_tags", Migrate: func(tx *gorm.DB) error { return tx.AutoMigrate(&models.AdminTag{}) }, Rollback: func(tx *gorm.DB) error { - return tx.Migrator().DropTable("execution_tags") + return tx.Migrator().DropTable("admin_tags") }, }, - // Add execution <-> execution-tags join table. + // Add execution <-> admin_tags join table. { - ID: "2023-05-28-execution_execution_tags", + ID: "2023-05-31-execution_admin_tags", Migrate: func(tx *gorm.DB) error { return tx.AutoMigrate(&models.Execution{}) }, Rollback: func(tx *gorm.DB) error { - return tx.Migrator().DropTable("execution_execution_tags") + return tx.Migrator().DropTable("execution_admin_tags") }, }, } diff --git a/pkg/repositories/gormimpl/common.go b/pkg/repositories/gormimpl/common.go index 1378b8f73..2c7d9e121 100644 --- a/pkg/repositories/gormimpl/common.go +++ b/pkg/repositories/gormimpl/common.go @@ -28,8 +28,7 @@ const taskExecutionTableName = "task_executions" const taskTableName = "tasks" const workflowTableName = "workflows" const descriptionEntityTableName = "description_entities" - -const executionTagsTableName = "execution_execution_tags" +const executionAdminTagsTableName = "execution_admin_tags" const limit = "limit" const filters = "filters" @@ -37,17 +36,17 @@ const filters = "filters" var identifierGroupBy = fmt.Sprintf("%s, %s, %s", Project, Domain, Name) var entityToTableName = map[common.Entity]string{ - common.Execution: "executions", - common.LaunchPlan: "launch_plans", - common.NodeExecution: "node_executions", - common.NodeExecutionEvent: "node_execution_events", - common.Task: "tasks", - common.TaskExecution: "task_executions", - common.Workflow: "workflows", - common.NamedEntity: "entities", - common.NamedEntityMetadata: "named_entity_metadata", - common.Signal: "signals", - common.ExecutionExecutionTag: "execution_execution_tags", + common.Execution: "executions", + common.LaunchPlan: "launch_plans", + common.NodeExecution: "node_executions", + common.NodeExecutionEvent: "node_execution_events", + common.Task: "tasks", + common.TaskExecution: "task_executions", + common.Workflow: "workflows", + common.NamedEntity: "entities", + common.NamedEntityMetadata: "named_entity_metadata", + common.Signal: "signals", + common.ExecutionAdminTag: "execution_admin_tags", } var innerJoinExecToNodeExec = fmt.Sprintf( diff --git a/pkg/repositories/gormimpl/execution_repo.go b/pkg/repositories/gormimpl/execution_repo.go index 73f7deb22..ac6b1d13c 100644 --- a/pkg/repositories/gormimpl/execution_repo.go +++ b/pkg/repositories/gormimpl/execution_repo.go @@ -23,9 +23,9 @@ type ExecutionRepo struct { } var joinExecutionToTags = fmt.Sprintf( - "JOIN %s ON %s.execution_project = %s.execution_project AND %s.execution_domain = %s.execution_domain AND %s.execution_name = %s.execution_name", executionTagsTableName, executionTagsTableName, executionTableName, - executionTagsTableName, executionTableName, - executionTagsTableName, executionTableName) + "JOIN %s ON %s.execution_project = %s.execution_project AND %s.execution_domain = %s.execution_domain AND %s.execution_name = %s.execution_name", executionAdminTagsTableName, executionAdminTagsTableName, executionTableName, + executionAdminTagsTableName, executionTableName, + executionAdminTagsTableName, executionTableName) func (r *ExecutionRepo) Create(ctx context.Context, input models.Execution) error { timer := r.metrics.CreateDuration.Start() @@ -79,7 +79,7 @@ func (r *ExecutionRepo) List(_ context.Context, input interfaces.ListResourceInp return interfaces.ExecutionCollectionOutput{}, err } var executions []models.Execution - tx := r.db.Limit(input.Limit).Offset(input.Offset).Preload("Tags") + tx := r.db.Limit(input.Limit).Offset(input.Offset) // And add join condition as required by user-specified filters (which can potentially include join table attrs). if ok := input.JoinTableEntities[common.LaunchPlan]; ok { tx = tx.Joins(fmt.Sprintf("INNER JOIN %s ON %s.launch_plan_id = %s.id", @@ -94,9 +94,9 @@ func (r *ExecutionRepo) List(_ context.Context, input interfaces.ListResourceInp taskTableName, executionTableName, taskTableName)) } - if ok := input.JoinTableEntities[common.ExecutionExecutionTag]; ok { + if ok := input.JoinTableEntities[common.ExecutionAdminTag]; ok { tx = tx.Joins(fmt.Sprintf("INNER JOIN %s ON %s.execution_name = %s.execution_name", - executionTagsTableName, executionTableName, executionTagsTableName)) + executionAdminTagsTableName, executionTableName, executionAdminTagsTableName)) } //tx = tx.Where("execution_name IN (?)", r.db.Table("execution_execution_tags"). @@ -104,8 +104,8 @@ func (r *ExecutionRepo) List(_ context.Context, input interfaces.ListResourceInp // Where("execution_tag_name IN ?", []string{"mlfloow", "name2"})) // tx = tx.Joins(fmt.Sprintf("INNER JOIN %s ON %s.execution_name = %s.execution_name", - // executionTagsTableName, executionTableName, executionTagsTableName)) - // tx = tx.Where(fmt.Sprintf("%s.execution_tag_name IN ?", executionTagsTableName), []string{"mlflow", "name2"}) + // executionAdminTagsTableName, executionTableName, executionAdminTagsTableName)) + // tx = tx.Where(fmt.Sprintf("%s.execution_tag_name IN ?", executionAdminTagsTableName), []string{"mlflow", "name2"}) // Apply filters tx, err = applyScopedFilters(tx, input.InlineFilters, input.MapFilters) diff --git a/pkg/repositories/gormimpl/execution_repo_test.go b/pkg/repositories/gormimpl/execution_repo_test.go index c62af4957..ca08098a9 100644 --- a/pkg/repositories/gormimpl/execution_repo_test.go +++ b/pkg/repositories/gormimpl/execution_repo_test.go @@ -268,8 +268,6 @@ func TestListExecutions_Order(t *testing.T) { }) assert.NoError(t, err) assert.True(t, mockQuery.Triggered) - // vals := []string{"SuperAwesomeProject", "AnotherAwesomeProject"} - // filter, err := NewRepeatedValueFilter(Workflow, ValueIn, "project", vals) } func TestListExecutions_WithTags(t *testing.T) { @@ -286,7 +284,7 @@ func TestListExecutions_WithTags(t *testing.T) { Key: "name", }) vals := []string{"tag1", "tag2"} - tagFilter, err := common.NewRepeatedValueFilter(common.ExecutionTag, common.ValueIn, "execution_tag_name", vals) + tagFilter, err := common.NewRepeatedValueFilter(common.ExecutionAdminTag, common.ValueIn, "admin_tag_name", vals) _, err = executionRepo.List(context.Background(), interfaces.ListResourceInput{ SortParameter: sortParameter, InlineFilters: []common.InlineFilter{ @@ -337,17 +335,16 @@ func TestListExecutionsForWorkflow(t *testing.T) { StartedAt: &executionStartedAt, Duration: time.Hour, LaunchEntity: "launch_plan", + Tags: []models.AdminTag{{Name: "tag1"}, {Name: "tag2"}}, }) executions = append(executions, execution) GlobalMock := mocket.Catcher.Reset() GlobalMock.Logging = true - // Only match on queries that append expected filters - GlobalMock.NewMock().WithQuery(`SELECT "executions"."id","executions"."created_at","executions"."updated_at","executions"."deleted_at","executions"."execution_project","executions"."execution_domain","executions"."execution_name","executions"."launch_plan_id","executions"."workflow_id","executions"."task_id","executions"."phase","executions"."closure","executions"."spec","executions"."started_at","executions"."execution_created_at","executions"."execution_updated_at","executions"."duration","executions"."abort_cause","executions"."mode","executions"."source_execution_id","executions"."parent_node_execution_id","executions"."cluster","executions"."inputs_uri","executions"."user_inputs_uri","executions"."error_kind","executions"."error_code","executions"."user","executions"."state","executions"."launch_entity" FROM "executions" INNER JOIN workflows ON executions.workflow_id = workflows.id INNER JOIN tasks ON executions.task_id = tasks.id WHERE executions.execution_project = $1 AND executions.execution_domain = $2 AND executions.execution_name = $3 AND workflows.name = $4 AND tasks.name = $5 LIMIT 20`).WithReply(executions) - + GlobalMock.NewMock().WithQuery(`SELECT "executions"."id","executions"."created_at","executions"."updated_at","executions"."deleted_at","executions"."execution_project","executions"."execution_domain","executions"."execution_name","executions"."launch_plan_id","executions"."workflow_id","executions"."task_id","executions"."phase","executions"."closure","executions"."spec","executions"."started_at","executions"."execution_created_at","executions"."execution_updated_at","executions"."duration","executions"."abort_cause","executions"."mode","executions"."source_execution_id","executions"."parent_node_execution_id","executions"."cluster","executions"."inputs_uri","executions"."user_inputs_uri","executions"."error_kind","executions"."error_code","executions"."user","executions"."state","executions"."launch_entity" FROM "executions" INNER JOIN workflows ON executions.workflow_id = workflows.id INNER JOIN tasks ON executions.task_id = tasks.id WHERE executions.execution_project = $1 AND executions.execution_domain = $2 AND executions.execution_name = $3 AND workflows.name = $4 AND tasks.name = $5 AND execution_admin_tags.execution_tag_name in ($6,$7) LIMIT 20`).WithReply(executions) vals := []string{"tag1", "tag2"} - tagFilter, err := common.NewRepeatedValueFilter(common.ExecutionTag, common.ValueIn, "execution_tag_name", vals) + tagFilter, err := common.NewRepeatedValueFilter(common.ExecutionAdminTag, common.ValueIn, "execution_tag_name", vals) collection, err := executionRepo.List(context.Background(), interfaces.ListResourceInput{ InlineFilters: []common.InlineFilter{ getEqualityFilter(common.Execution, "project", project), @@ -363,6 +360,7 @@ func TestListExecutionsForWorkflow(t *testing.T) { common.Task: true, }, }) + assert.NoError(t, err) assert.NotEmpty(t, collection) assert.NotEmpty(t, collection.Executions) From 9a5649c8624337fee50845ca50b5986be371d442 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 1 Jun 2023 13:30:04 -0700 Subject: [PATCH 06/26] nit Signed-off-by: Kevin Su --- pkg/repositories/gormimpl/execution_repo.go | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/pkg/repositories/gormimpl/execution_repo.go b/pkg/repositories/gormimpl/execution_repo.go index ac6b1d13c..efbac2dff 100644 --- a/pkg/repositories/gormimpl/execution_repo.go +++ b/pkg/repositories/gormimpl/execution_repo.go @@ -22,11 +22,6 @@ type ExecutionRepo struct { metrics gormMetrics } -var joinExecutionToTags = fmt.Sprintf( - "JOIN %s ON %s.execution_project = %s.execution_project AND %s.execution_domain = %s.execution_domain AND %s.execution_name = %s.execution_name", executionAdminTagsTableName, executionAdminTagsTableName, executionTableName, - executionAdminTagsTableName, executionTableName, - executionAdminTagsTableName, executionTableName) - func (r *ExecutionRepo) Create(ctx context.Context, input models.Execution) error { timer := r.metrics.CreateDuration.Start() tx := r.db.Omit("id").Create(&input) @@ -99,14 +94,6 @@ func (r *ExecutionRepo) List(_ context.Context, input interfaces.ListResourceInp executionAdminTagsTableName, executionTableName, executionAdminTagsTableName)) } - //tx = tx.Where("execution_name IN (?)", r.db.Table("execution_execution_tags"). - // Select("execution_name"). - // Where("execution_tag_name IN ?", []string{"mlfloow", "name2"})) - - // tx = tx.Joins(fmt.Sprintf("INNER JOIN %s ON %s.execution_name = %s.execution_name", - // executionAdminTagsTableName, executionTableName, executionAdminTagsTableName)) - // tx = tx.Where(fmt.Sprintf("%s.execution_tag_name IN ?", executionAdminTagsTableName), []string{"mlflow", "name2"}) - // Apply filters tx, err = applyScopedFilters(tx, input.InlineFilters, input.MapFilters) if err != nil { From 2c9dd1a40c7e03590804ab334e866de53bb7eb04 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 1 Jun 2023 14:12:04 -0700 Subject: [PATCH 07/26] update Signed-off-by: Kevin Su --- pkg/repositories/config/migrations.go | 4 ++-- pkg/repositories/gormimpl/common.go | 2 ++ pkg/repositories/gormimpl/execution_repo.go | 4 +++- pkg/repositories/models/execution.go | 5 +++-- 4 files changed, 10 insertions(+), 5 deletions(-) diff --git a/pkg/repositories/config/migrations.go b/pkg/repositories/config/migrations.go index 925064612..4830da5a7 100644 --- a/pkg/repositories/config/migrations.go +++ b/pkg/repositories/config/migrations.go @@ -447,7 +447,7 @@ var LegacyMigrations = []*gormigrate.Migration{ }, // Create admin tags table. { - ID: "2023-05-31-admin_tags", + ID: "2023-06-01-admin_tags", Migrate: func(tx *gorm.DB) error { return tx.AutoMigrate(&models.AdminTag{}) }, @@ -457,7 +457,7 @@ var LegacyMigrations = []*gormigrate.Migration{ }, // Add execution <-> admin_tags join table. { - ID: "2023-05-31-execution_admin_tags", + ID: "2023-06-01-execution_admin_tags", Migrate: func(tx *gorm.DB) error { return tx.AutoMigrate(&models.Execution{}) }, diff --git a/pkg/repositories/gormimpl/common.go b/pkg/repositories/gormimpl/common.go index 2c7d9e121..c022bd973 100644 --- a/pkg/repositories/gormimpl/common.go +++ b/pkg/repositories/gormimpl/common.go @@ -28,6 +28,7 @@ const taskExecutionTableName = "task_executions" const taskTableName = "tasks" const workflowTableName = "workflows" const descriptionEntityTableName = "description_entities" +const AdminTagsTableName = "admin_tags" const executionAdminTagsTableName = "execution_admin_tags" const limit = "limit" @@ -46,6 +47,7 @@ var entityToTableName = map[common.Entity]string{ common.NamedEntity: "entities", common.NamedEntityMetadata: "named_entity_metadata", common.Signal: "signals", + common.AdminTag: "admin_tags", common.ExecutionAdminTag: "execution_admin_tags", } diff --git a/pkg/repositories/gormimpl/execution_repo.go b/pkg/repositories/gormimpl/execution_repo.go index efbac2dff..b128a2805 100644 --- a/pkg/repositories/gormimpl/execution_repo.go +++ b/pkg/repositories/gormimpl/execution_repo.go @@ -89,9 +89,11 @@ func (r *ExecutionRepo) List(_ context.Context, input interfaces.ListResourceInp taskTableName, executionTableName, taskTableName)) } - if ok := input.JoinTableEntities[common.ExecutionAdminTag]; ok { + if ok := input.JoinTableEntities[common.AdminTag]; ok { tx = tx.Joins(fmt.Sprintf("INNER JOIN %s ON %s.execution_name = %s.execution_name", executionAdminTagsTableName, executionTableName, executionAdminTagsTableName)) + tx = tx.Joins(fmt.Sprintf("INNER JOIN %s ON %s.id = %s.admin_tag_id", + AdminTagsTableName, AdminTagsTableName, executionAdminTagsTableName)) } // Apply filters diff --git a/pkg/repositories/models/execution.go b/pkg/repositories/models/execution.go index 7f2f0dbcf..c159727eb 100644 --- a/pkg/repositories/models/execution.go +++ b/pkg/repositories/models/execution.go @@ -1,6 +1,7 @@ package models import ( + "gorm.io/gorm" "time" "github.com/flyteorg/flytestdlib/storage" @@ -65,6 +66,6 @@ type Execution struct { } type AdminTag struct { - BaseModel - Name string `gorm:"primary_key;column:name" valid:"length(0|255)"` + gorm.Model + Name string `valid:"length(0|255)"` } From c6311784f34f9af5d269d0c50f6ea8ac470f818e Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 1 Jun 2023 14:26:52 -0700 Subject: [PATCH 08/26] update Signed-off-by: Kevin Su --- go.mod | 2 +- go.sum | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index a5e192231..bb7e02526 100644 --- a/go.mod +++ b/go.mod @@ -214,4 +214,4 @@ replace github.com/robfig/cron/v3 => github.com/unionai/cron/v3 v3.0.2-0.2022091 // This was published in error when attempting to create 1.5.1 Flyte release. retract v1.1.94 -replace github.com/flyteorg/flyteidl => /Users/kevin/git/flyteidl +replace github.com/flyteorg/flyteidl => github.com/flyteorg/flyteidl v1.5.9-0.20230601212420-1d66e2afad4d diff --git a/go.sum b/go.sum index f19b0395d..aff3d3717 100644 --- a/go.sum +++ b/go.sum @@ -312,6 +312,8 @@ github.com/fatih/structs v1.0.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ= github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/flyteorg/flyteidl v1.5.9-0.20230601212420-1d66e2afad4d h1:3wgeLYVRYZTM5v2GO03Tg65HU6G1VIxPSerx9x3Xc2k= +github.com/flyteorg/flyteidl v1.5.9-0.20230601212420-1d66e2afad4d/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og= github.com/flyteorg/flyteplugins v1.0.56 h1:kBTDgTpdSi7wcptk4cMwz5vfh1MU82VaUMMboe1InXw= github.com/flyteorg/flyteplugins v1.0.56/go.mod h1:aFCKSn8TPzxSAILIiogHtUnHlUCN9+y6Vf+r9R4KZDU= github.com/flyteorg/flytepropeller v1.1.87 h1:Px7ASDjrWyeVrUb15qXmhw9QK7xPcFjL5Yetr2P6iGM= From 9071a4493b3657a1f3bc2b2b20941ebb7910efc8 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 7 Jun 2023 13:07:00 -0700 Subject: [PATCH 09/26] update migration Signed-off-by: Kevin Su --- pkg/repositories/config/migrations.go | 106 +++++++++++++++++++++----- pkg/repositories/models/execution.go | 2 +- 2 files changed, 87 insertions(+), 21 deletions(-) diff --git a/pkg/repositories/config/migrations.go b/pkg/repositories/config/migrations.go index 4830da5a7..2aed033f0 100644 --- a/pkg/repositories/config/migrations.go +++ b/pkg/repositories/config/migrations.go @@ -445,26 +445,6 @@ var LegacyMigrations = []*gormigrate.Migration{ return tx.Model(&models.Execution{}).Migrator().DropColumn(&models.Execution{}, "launch_entity") }, }, - // Create admin tags table. - { - ID: "2023-06-01-admin_tags", - Migrate: func(tx *gorm.DB) error { - return tx.AutoMigrate(&models.AdminTag{}) - }, - Rollback: func(tx *gorm.DB) error { - return tx.Migrator().DropTable("admin_tags") - }, - }, - // Add execution <-> admin_tags join table. - { - ID: "2023-06-01-execution_admin_tags", - Migrate: func(tx *gorm.DB) error { - return tx.AutoMigrate(&models.Execution{}) - }, - Rollback: func(tx *gorm.DB) error { - return tx.Migrator().DropTable("execution_admin_tags") - }, - }, } var NoopMigrations = []*gormigrate.Migration{ @@ -1103,6 +1083,92 @@ var NoopMigrations = []*gormigrate.Migration{ return nil }, }, + + { + ID: "pg-noop-2023-06-07-admin-tags", + Migrate: func(tx *gorm.DB) error { + type AdminTag struct { + gorm.Model + Name string `valid:"length(0|255)"` + } + + return tx.AutoMigrate(&AdminTag{}) + }, + Rollback: func(tx *gorm.DB) error { + return nil + }, + }, + + { + ID: "pg-noop-2023-06-07-noop-execution-admin-tags", + Migrate: func(tx *gorm.DB) error { + type AdminTag struct { + gorm.Model + Name string `valid:"length(0|255)"` + } + + type ExecutionKey struct { + Project string `gorm:"primary_key;column:execution_project" valid:"length(0|255)"` + Domain string `gorm:"primary_key;column:execution_domain" valid:"length(0|255)"` + Name string `gorm:"primary_key;column:execution_name" valid:"length(0|255)"` + } + + type Execution struct { + ID uint `gorm:"index;autoIncrement;not null"` + CreatedAt time.Time + UpdatedAt time.Time + DeletedAt *time.Time `gorm:"index"` + ExecutionKey + LaunchPlanID uint `gorm:"index"` + WorkflowID uint `gorm:"index"` + TaskID uint `gorm:"index"` + Phase string `valid:"length(0|255)"` + Closure []byte + Spec []byte `gorm:"not null"` + StartedAt *time.Time + // Corresponds to the CreatedAt field in the Execution closure. + // Prefixed with Execution to avoid clashes with gorm.Model CreatedAt + ExecutionCreatedAt *time.Time `gorm:"index:idx_executions_created_at"` + // Corresponds to the UpdatedAt field in the Execution closure + // Prefixed with Execution to avoid clashes with gorm.Model UpdatedAt + ExecutionUpdatedAt *time.Time + Duration time.Duration + // In the case of an aborted execution this string may be non-empty. + // It should be ignored for any other value of phase other than aborted. + AbortCause string `valid:"length(0|255)"` + // Corresponds to the execution mode used to trigger this execution + Mode int32 + // The "parent" execution (if there is one) that is related to this execution. + SourceExecutionID uint + // The parent node execution if this was launched by a node + ParentNodeExecutionID uint + // Cluster where execution was triggered + Cluster string `valid:"length(0|255)"` + // Offloaded location of inputs LiteralMap. These are the inputs evaluated and contain applied defaults. + InputsURI storage.DataReference + // User specified inputs. This map might be incomplete and not include defaults applied + UserInputsURI storage.DataReference + // Execution Error Kind. nullable + ErrorKind *string `gorm:"index"` + // Execution Error Code nullable + ErrorCode *string `valid:"length(0|255)"` + // The user responsible for launching this execution. + // This is also stored in the spec but promoted as a column for filtering. + User string `gorm:"index" valid:"length(0|255)"` + // GORM doesn't save the zero value for ints, so we use a pointer for the State field + State *int32 `gorm:"index;default:0"` + // The resource type of the entity used to launch the execution, one of 'launch_plan' or 'task' + LaunchEntity string + // Tags associated with the execution + Tags []AdminTag `gorm:"many2many:execution_admin_tags;"` + } + + return tx.AutoMigrate(&Execution{}) + }, + Rollback: func(tx *gorm.DB) error { + return nil + }, + }, } var Migrations = append(LegacyMigrations, NoopMigrations...) diff --git a/pkg/repositories/models/execution.go b/pkg/repositories/models/execution.go index c159727eb..564f77d8d 100644 --- a/pkg/repositories/models/execution.go +++ b/pkg/repositories/models/execution.go @@ -61,7 +61,7 @@ type Execution struct { State *int32 `gorm:"index;default:0"` // The resource type of the entity used to launch the execution, one of 'launch_plan' or 'task' LaunchEntity string - + // Tags associated with the execution Tags []AdminTag `gorm:"many2many:execution_admin_tags;"` } From f23f34228c02b3117ce30307f7ff89ab6d794daa Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 7 Jun 2023 13:20:19 -0700 Subject: [PATCH 10/26] lint Signed-off-by: Kevin Su --- pkg/repositories/gormimpl/execution_repo_test.go | 2 ++ pkg/repositories/models/execution.go | 3 ++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/repositories/gormimpl/execution_repo_test.go b/pkg/repositories/gormimpl/execution_repo_test.go index ca08098a9..17cb85777 100644 --- a/pkg/repositories/gormimpl/execution_repo_test.go +++ b/pkg/repositories/gormimpl/execution_repo_test.go @@ -285,6 +285,7 @@ func TestListExecutions_WithTags(t *testing.T) { }) vals := []string{"tag1", "tag2"} tagFilter, err := common.NewRepeatedValueFilter(common.ExecutionAdminTag, common.ValueIn, "admin_tag_name", vals) + assert.NoError(t, err) _, err = executionRepo.List(context.Background(), interfaces.ListResourceInput{ SortParameter: sortParameter, InlineFilters: []common.InlineFilter{ @@ -345,6 +346,7 @@ func TestListExecutionsForWorkflow(t *testing.T) { GlobalMock.NewMock().WithQuery(`SELECT "executions"."id","executions"."created_at","executions"."updated_at","executions"."deleted_at","executions"."execution_project","executions"."execution_domain","executions"."execution_name","executions"."launch_plan_id","executions"."workflow_id","executions"."task_id","executions"."phase","executions"."closure","executions"."spec","executions"."started_at","executions"."execution_created_at","executions"."execution_updated_at","executions"."duration","executions"."abort_cause","executions"."mode","executions"."source_execution_id","executions"."parent_node_execution_id","executions"."cluster","executions"."inputs_uri","executions"."user_inputs_uri","executions"."error_kind","executions"."error_code","executions"."user","executions"."state","executions"."launch_entity" FROM "executions" INNER JOIN workflows ON executions.workflow_id = workflows.id INNER JOIN tasks ON executions.task_id = tasks.id WHERE executions.execution_project = $1 AND executions.execution_domain = $2 AND executions.execution_name = $3 AND workflows.name = $4 AND tasks.name = $5 AND execution_admin_tags.execution_tag_name in ($6,$7) LIMIT 20`).WithReply(executions) vals := []string{"tag1", "tag2"} tagFilter, err := common.NewRepeatedValueFilter(common.ExecutionAdminTag, common.ValueIn, "execution_tag_name", vals) + assert.NoError(t, err) collection, err := executionRepo.List(context.Background(), interfaces.ListResourceInput{ InlineFilters: []common.InlineFilter{ getEqualityFilter(common.Execution, "project", project), diff --git a/pkg/repositories/models/execution.go b/pkg/repositories/models/execution.go index 564f77d8d..115690c52 100644 --- a/pkg/repositories/models/execution.go +++ b/pkg/repositories/models/execution.go @@ -1,9 +1,10 @@ package models import ( - "gorm.io/gorm" "time" + "gorm.io/gorm" + "github.com/flyteorg/flytestdlib/storage" ) From 199b22965d859d39a8b18c7a004ebc27ded76813 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 14 Jun 2023 13:24:05 -0700 Subject: [PATCH 11/26] use gorm size Signed-off-by: Kevin Su --- pkg/manager/impl/execution_manager.go | 1 - pkg/repositories/config/migrations.go | 4 ++-- pkg/repositories/transformers/execution.go | 5 ++--- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/pkg/manager/impl/execution_manager.go b/pkg/manager/impl/execution_manager.go index a286dbb3e..7ad4fbbd5 100644 --- a/pkg/manager/impl/execution_manager.go +++ b/pkg/manager/impl/execution_manager.go @@ -907,7 +907,6 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( SecurityContext: executionConfig.SecurityContext, LaunchEntity: launchPlan.Id.ResourceType, Namespace: namespace, - Tags: request.Spec.Tags, }) if err != nil { logger.Infof(ctx, "Failed to create execution model in transformer for id: [%+v] with err: %v", diff --git a/pkg/repositories/config/migrations.go b/pkg/repositories/config/migrations.go index 2aed033f0..320e60f4d 100644 --- a/pkg/repositories/config/migrations.go +++ b/pkg/repositories/config/migrations.go @@ -1089,7 +1089,7 @@ var NoopMigrations = []*gormigrate.Migration{ Migrate: func(tx *gorm.DB) error { type AdminTag struct { gorm.Model - Name string `valid:"length(0|255)"` + Name string `gorm:"size:255"` } return tx.AutoMigrate(&AdminTag{}) @@ -1104,7 +1104,7 @@ var NoopMigrations = []*gormigrate.Migration{ Migrate: func(tx *gorm.DB) error { type AdminTag struct { gorm.Model - Name string `valid:"length(0|255)"` + Name string `gorm:"size:255"` } type ExecutionKey struct { diff --git a/pkg/repositories/transformers/execution.go b/pkg/repositories/transformers/execution.go index 3349674df..abd77413e 100644 --- a/pkg/repositories/transformers/execution.go +++ b/pkg/repositories/transformers/execution.go @@ -45,7 +45,6 @@ type CreateExecutionModelInput struct { SecurityContext *core.SecurityContext LaunchEntity core.ResourceType Namespace string - Tags []string } type ExecutionTransformerOptions struct { @@ -100,8 +99,8 @@ func CreateExecutionModel(input CreateExecutionModelInput) (*models.Execution, e } activeExecution := int32(admin.ExecutionState_EXECUTION_ACTIVE) - tags := make([]models.AdminTag, len(input.Tags)) - for i, tag := range input.Tags { + tags := make([]models.AdminTag, len(input.RequestSpec.Tags)) + for i, tag := range input.RequestSpec.Tags { tags[i] = models.AdminTag{Name: tag} } From a262ebc1b4d173db1dd9d78b7c629649a92b86e1 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 14 Jun 2023 14:04:27 -0700 Subject: [PATCH 12/26] Add tests Signed-off-by: Kevin Su --- pkg/manager/impl/testutils/mock_requests.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/manager/impl/testutils/mock_requests.go b/pkg/manager/impl/testutils/mock_requests.go index 8b8473376..fda9482e1 100644 --- a/pkg/manager/impl/testutils/mock_requests.go +++ b/pkg/manager/impl/testutils/mock_requests.go @@ -222,6 +222,7 @@ func GetExecutionRequest() admin.ExecutionCreateRequest { }, RawOutputDataConfig: &admin.RawOutputDataConfig{OutputLocationPrefix: "default_raw_output"}, Envs: &admin.Envs{}, + Tags: []string{"tag1", "tag2"}, }, Inputs: &core.LiteralMap{ Literals: map[string]*core.Literal{ From 381569d68aa710e933fab96e6e248eee1571ac3e Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 5 Jul 2023 06:21:07 -0700 Subject: [PATCH 13/26] bump idl Signed-off-by: Kevin Su --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 22a409f68..bf7debb86 100644 --- a/go.mod +++ b/go.mod @@ -214,4 +214,4 @@ replace github.com/robfig/cron/v3 => github.com/unionai/cron/v3 v3.0.2-0.2022091 // This was published in error when attempting to create 1.5.1 Flyte release. retract v1.1.94 -replace github.com/flyteorg/flyteidl => github.com/flyteorg/flyteidl v1.5.9-0.20230601212420-1d66e2afad4d +replace github.com/flyteorg/flyteidl => github.com/flyteorg/flyteidl v1.5.13-0.20230705131852-ba27680aa8b3 diff --git a/go.sum b/go.sum index 993ff09e1..862ee8961 100644 --- a/go.sum +++ b/go.sum @@ -312,8 +312,8 @@ github.com/fatih/structs v1.0.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ= github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= -github.com/flyteorg/flyteidl v1.5.9-0.20230601212420-1d66e2afad4d h1:3wgeLYVRYZTM5v2GO03Tg65HU6G1VIxPSerx9x3Xc2k= -github.com/flyteorg/flyteidl v1.5.9-0.20230601212420-1d66e2afad4d/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og= +github.com/flyteorg/flyteidl v1.5.13-0.20230705131852-ba27680aa8b3 h1:UmrrH3OVeyiCuE+R6v+Ka8mDOuyfTlnajf2TYWS+9UM= +github.com/flyteorg/flyteidl v1.5.13-0.20230705131852-ba27680aa8b3/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og= github.com/flyteorg/flyteplugins v1.0.67 h1:d2FXpwxQwX/k4YdmhuusykOemHb/cUTPEob4WBmdpjE= github.com/flyteorg/flyteplugins v1.0.67/go.mod h1:HHt4nKDKVwrZPKDsj99dNtDSIJL378xNotYMA3a/TFA= github.com/flyteorg/flytepropeller v1.1.98 h1:Zk2ENYB9VZRT5tFUIFjm+aCkr0TU2EuyJ5gh52fpLoA= From 8624ea946eaafeeef2786949c07b18d6cf27277f Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 5 Jul 2023 06:29:06 -0700 Subject: [PATCH 14/26] bump idl Signed-off-by: Kevin Su --- go.mod | 2 -- go.sum | 4 ++-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index bf7debb86..2a469a5c3 100644 --- a/go.mod +++ b/go.mod @@ -213,5 +213,3 @@ replace github.com/robfig/cron/v3 => github.com/unionai/cron/v3 v3.0.2-0.2022091 // Retracted versions // This was published in error when attempting to create 1.5.1 Flyte release. retract v1.1.94 - -replace github.com/flyteorg/flyteidl => github.com/flyteorg/flyteidl v1.5.13-0.20230705131852-ba27680aa8b3 diff --git a/go.sum b/go.sum index 862ee8961..819fb160b 100644 --- a/go.sum +++ b/go.sum @@ -312,8 +312,8 @@ github.com/fatih/structs v1.0.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ= github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= -github.com/flyteorg/flyteidl v1.5.13-0.20230705131852-ba27680aa8b3 h1:UmrrH3OVeyiCuE+R6v+Ka8mDOuyfTlnajf2TYWS+9UM= -github.com/flyteorg/flyteidl v1.5.13-0.20230705131852-ba27680aa8b3/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og= +github.com/flyteorg/flyteidl v1.5.11 h1:Xcb17YqNstl+dHQsK+o0Ac+1l1U154wXivg28O3C5l0= +github.com/flyteorg/flyteidl v1.5.11/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og= github.com/flyteorg/flyteplugins v1.0.67 h1:d2FXpwxQwX/k4YdmhuusykOemHb/cUTPEob4WBmdpjE= github.com/flyteorg/flyteplugins v1.0.67/go.mod h1:HHt4nKDKVwrZPKDsj99dNtDSIJL378xNotYMA3a/TFA= github.com/flyteorg/flytepropeller v1.1.98 h1:Zk2ENYB9VZRT5tFUIFjm+aCkr0TU2EuyJ5gh52fpLoA= From 772ddb0a8c72f5b0d633a64b4f47b6131b5df089 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 5 Jul 2023 23:42:08 -0700 Subject: [PATCH 15/26] bump idl Signed-off-by: Kevin Su --- go.mod | 2 ++ go.sum | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index 2a469a5c3..7c7417514 100644 --- a/go.mod +++ b/go.mod @@ -213,3 +213,5 @@ replace github.com/robfig/cron/v3 => github.com/unionai/cron/v3 v3.0.2-0.2022091 // Retracted versions // This was published in error when attempting to create 1.5.1 Flyte release. retract v1.1.94 + +replace github.com/flyteorg/flyteidl => github.com/flyteorg/flyteidl v1.5.13-0.20230706064018-2340cc31a60b diff --git a/go.sum b/go.sum index 819fb160b..57a3229f5 100644 --- a/go.sum +++ b/go.sum @@ -312,8 +312,8 @@ github.com/fatih/structs v1.0.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ= github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= -github.com/flyteorg/flyteidl v1.5.11 h1:Xcb17YqNstl+dHQsK+o0Ac+1l1U154wXivg28O3C5l0= -github.com/flyteorg/flyteidl v1.5.11/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og= +github.com/flyteorg/flyteidl v1.5.13-0.20230706064018-2340cc31a60b h1:swzL6DNqsm2p98hNHSpqH2IDtdSFOv/DTpDCWth1lEA= +github.com/flyteorg/flyteidl v1.5.13-0.20230706064018-2340cc31a60b/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og= github.com/flyteorg/flyteplugins v1.0.67 h1:d2FXpwxQwX/k4YdmhuusykOemHb/cUTPEob4WBmdpjE= github.com/flyteorg/flyteplugins v1.0.67/go.mod h1:HHt4nKDKVwrZPKDsj99dNtDSIJL378xNotYMA3a/TFA= github.com/flyteorg/flytepropeller v1.1.98 h1:Zk2ENYB9VZRT5tFUIFjm+aCkr0TU2EuyJ5gh52fpLoA= From 2a304c613fd711ba9349c9631b056248cf566ee1 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 19 Jul 2023 16:57:05 -0700 Subject: [PATCH 16/26] address comment Signed-off-by: Kevin Su --- pkg/repositories/config/migrations.go | 31 ++++++++++++++------------- pkg/repositories/models/execution.go | 2 +- 2 files changed, 17 insertions(+), 16 deletions(-) diff --git a/pkg/repositories/config/migrations.go b/pkg/repositories/config/migrations.go index 577f4d3e7..b5f2c5027 100644 --- a/pkg/repositories/config/migrations.go +++ b/pkg/repositories/config/migrations.go @@ -1085,7 +1085,21 @@ var NoopMigrations = []*gormigrate.Migration{ }, { - ID: "pg-noop-2023-06-07-admin-tags", + ID: "2023-06-19-id-sequence-to-bigint", + Migrate: func(tx *gorm.DB) error { + db, err := tx.DB() + if err != nil { + return err + } + return alterIDSequenceType(db) + }, + Rollback: func(tx *gorm.DB) error { + return nil + }, + }, + + { + ID: "2023-07-19-admin-tags", Migrate: func(tx *gorm.DB) error { type AdminTag struct { gorm.Model @@ -1100,7 +1114,7 @@ var NoopMigrations = []*gormigrate.Migration{ }, { - ID: "pg-noop-2023-06-07-noop-execution-admin-tags", + ID: "2023-07-19-noop-execution-admin-tags", Migrate: func(tx *gorm.DB) error { type AdminTag struct { gorm.Model @@ -1166,19 +1180,6 @@ var NoopMigrations = []*gormigrate.Migration{ return tx.AutoMigrate(&Execution{}) }, }, - { - ID: "2023-06-19-id-sequence-to-bigint", - Migrate: func(tx *gorm.DB) error { - db, err := tx.DB() - if err != nil { - return err - } - return alterIDSequenceType(db) - }, - Rollback: func(tx *gorm.DB) error { - return nil - }, - }, } var Migrations = append(LegacyMigrations, NoopMigrations...) diff --git a/pkg/repositories/models/execution.go b/pkg/repositories/models/execution.go index 115690c52..4b3ea76af 100644 --- a/pkg/repositories/models/execution.go +++ b/pkg/repositories/models/execution.go @@ -68,5 +68,5 @@ type Execution struct { type AdminTag struct { gorm.Model - Name string `valid:"length(0|255)"` + Name string } From 8b6e2c4d7184c7a34ef6c4da92e280474d3d0f1a Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 20 Jul 2023 22:02:04 -0700 Subject: [PATCH 17/26] Set tag_name column as unique Signed-off-by: Kevin Su --- pkg/repositories/config/migrations.go | 8 ++++---- pkg/repositories/models/execution.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/repositories/config/migrations.go b/pkg/repositories/config/migrations.go index b5f2c5027..4873cd89e 100644 --- a/pkg/repositories/config/migrations.go +++ b/pkg/repositories/config/migrations.go @@ -1099,11 +1099,11 @@ var NoopMigrations = []*gormigrate.Migration{ }, { - ID: "2023-07-19-admin-tags", + ID: "2023-07-20-admin-tags", Migrate: func(tx *gorm.DB) error { type AdminTag struct { gorm.Model - Name string `gorm:"size:255"` + Name string `gorm:"index:,unique;size:255"` } return tx.AutoMigrate(&AdminTag{}) @@ -1114,11 +1114,11 @@ var NoopMigrations = []*gormigrate.Migration{ }, { - ID: "2023-07-19-noop-execution-admin-tags", + ID: "2023-07-20-noop-execution-admin-tags", Migrate: func(tx *gorm.DB) error { type AdminTag struct { gorm.Model - Name string `gorm:"size:255"` + Name string `gorm:"index:,unique;size:255"` } type ExecutionKey struct { diff --git a/pkg/repositories/models/execution.go b/pkg/repositories/models/execution.go index 4b3ea76af..287486b1b 100644 --- a/pkg/repositories/models/execution.go +++ b/pkg/repositories/models/execution.go @@ -68,5 +68,5 @@ type Execution struct { type AdminTag struct { gorm.Model - Name string + Name string `gorm:"index:,unique;size:255"` } From 7624e120111ddd731c8a4a229e4ac9b2c59d9919 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 21 Jul 2023 23:39:43 -0700 Subject: [PATCH 18/26] Add integration tests Signed-off-by: Kevin Su --- tests/execution_test.go | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/tests/execution_test.go b/tests/execution_test.go index f42133c7f..a8d5e9b82 100644 --- a/tests/execution_test.go +++ b/tests/execution_test.go @@ -185,6 +185,14 @@ func populateWorkflowExecutionsForTestingOnly() { db.Exec(`INSERT INTO workflows ("id", "project", "domain", "name", "version", "remote_closure_identifier") ` + `VALUES (4, 'project2', 'domain2', 'name2', 'version1', 's3://foo')`) + // Insert dummy tags + db.Exec(`INSERT INTO admin_tags ("id", "name") ` + `VALUES (1, 'hello')`) + db.Exec(`INSERT INTO admin_tags ("id", "name") ` + `VALUES (2, 'flyte')`) + db.Exec(`INSERT INTO execution_admin_tags ("execution_project", "execution_domain", "execution_name", "admin_tag_id") ` + `VALUES ('project1', 'domain1', 'name1', 1)`) + db.Exec(`INSERT INTO execution_admin_tags ("execution_project", "execution_domain", "execution_name", "admin_tag_id") ` + `VALUES ('project1', 'domain1', 'name1', 2)`) + db.Exec(`INSERT INTO execution_admin_tags ("execution_project", "execution_domain", "execution_name", "admin_tag_id") ` + `VALUES ('project1', 'domain1', 'name3', 2)`) + db.Exec(`INSERT INTO execution_admin_tags ("execution_project", "execution_domain", "execution_name", "admin_tag_id") ` + `VALUES ('project1', 'domain1', 'name4', 1)`) + for _, statement := range insertExecutionStatements { db.Exec(statement) } @@ -209,6 +217,26 @@ func TestListWorkflowExecutions(t *testing.T) { assert.Equal(t, len(resp.Executions), 4) } +func TestListWorkflowExecutionsWithTags(t *testing.T) { + truncateAllTablesForTestingOnly() + populateWorkflowExecutionsForTestingOnly() + + ctx := context.Background() + client, conn := GetTestAdminServiceClient() + defer conn.Close() + + resp, err := client.ListExecutions(ctx, &admin.ResourceListRequest{ + Id: &admin.NamedEntityIdentifier{ + Project: "project1", + Domain: "domain1", + }, + Limit: 5, + Filters: "value_in(admin_tag.name, hello)", + }) + assert.Nil(t, err) + assert.Equal(t, len(resp.Executions), 2) +} + func TestListWorkflowExecutions_Filters(t *testing.T) { truncateAllTablesForTestingOnly() populateWorkflowExecutionsForTestingOnly() From 6e67d456a7c2c043ed2d26768ceb946feeb59332 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 1 Aug 2023 13:11:23 -0700 Subject: [PATCH 19/26] Update the tests Signed-off-by: Kevin Su --- tests/bootstrap.go | 4 ++++ tests/execution_test.go | 24 ++++++++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/tests/bootstrap.go b/tests/bootstrap.go index b5a3477b7..11a9b7aba 100644 --- a/tests/bootstrap.go +++ b/tests/bootstrap.go @@ -72,6 +72,8 @@ func truncateAllTablesForTestingOnly() { TruncateResources := fmt.Sprintf("TRUNCATE TABLE resources;") TruncateSchedulableEntities := fmt.Sprintf("TRUNCATE TABLE schedulable_entities;") TruncateSchedulableEntitiesSnapshots := fmt.Sprintf("TRUNCATE TABLE schedule_entities_snapshots;") + TruncateAdminTags := fmt.Sprintf("TRUNCATE TABLE admin_tags;") + TruncateExecutionAdminTags := fmt.Sprintf("TRUNCATE TABLE execution_admin_tags;") ctx := context.Background() db, err := repositories.GetDB(ctx, getDbConfig(), getLoggerConfig()) if err != nil { @@ -100,6 +102,8 @@ func truncateAllTablesForTestingOnly() { db.Exec(TruncateResources) db.Exec(TruncateSchedulableEntities) db.Exec(TruncateSchedulableEntitiesSnapshots) + db.Exec(TruncateAdminTags) + db.Exec(TruncateExecutionAdminTags) } func populateWorkflowExecutionForTestingOnly(project, domain, name string) { diff --git a/tests/execution_test.go b/tests/execution_test.go index a8d5e9b82..c8734213b 100644 --- a/tests/execution_test.go +++ b/tests/execution_test.go @@ -237,6 +237,30 @@ func TestListWorkflowExecutionsWithTags(t *testing.T) { assert.Equal(t, len(resp.Executions), 2) } +func TestCreateWorkflowExecutionsWithTags(t *testing.T) { + truncateAllTablesForTestingOnly() + populateWorkflowExecutionsForTestingOnly() + + ctx := context.Background() + client, conn := GetTestAdminServiceClient() + defer conn.Close() + + resp, err := client.CreateExecution(ctx, &admin.ExecutionCreateRequest{ + Project: "project1", + Domain: "domain1", + Name: "name5", + Spec: &admin.ExecutionSpec{Tags: []string{"hello", "flyte"}}, + }) + assert.Nil(t, err) + resp, err := client.CreateExecution(ctx, &admin.ExecutionCreateRequest{ + Project: "project1", + Domain: "domain1", + Name: "name6", + Spec: &admin.ExecutionSpec{Tags: []string{"hello", "flyte"}}, + }) + assert.Nil(t, err) +} + func TestListWorkflowExecutions_Filters(t *testing.T) { truncateAllTablesForTestingOnly() populateWorkflowExecutionsForTestingOnly() From 57df98d62c3b5797d064a56e3d386df15ae9410d Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 1 Aug 2023 13:38:24 -0700 Subject: [PATCH 20/26] nit Signed-off-by: Kevin Su --- tests/execution_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/execution_test.go b/tests/execution_test.go index c8734213b..82f061959 100644 --- a/tests/execution_test.go +++ b/tests/execution_test.go @@ -245,14 +245,14 @@ func TestCreateWorkflowExecutionsWithTags(t *testing.T) { client, conn := GetTestAdminServiceClient() defer conn.Close() - resp, err := client.CreateExecution(ctx, &admin.ExecutionCreateRequest{ + _, err := client.CreateExecution(ctx, &admin.ExecutionCreateRequest{ Project: "project1", Domain: "domain1", Name: "name5", Spec: &admin.ExecutionSpec{Tags: []string{"hello", "flyte"}}, }) assert.Nil(t, err) - resp, err := client.CreateExecution(ctx, &admin.ExecutionCreateRequest{ + _, err = client.CreateExecution(ctx, &admin.ExecutionCreateRequest{ Project: "project1", Domain: "domain1", Name: "name6", From 7be8f373798f170bba8a0746e7052e75bdf2c6f9 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 1 Aug 2023 14:36:36 -0700 Subject: [PATCH 21/26] nit Signed-off-by: Kevin Su --- tests/execution_test.go | 24 ------------------------ 1 file changed, 24 deletions(-) diff --git a/tests/execution_test.go b/tests/execution_test.go index 82f061959..a8d5e9b82 100644 --- a/tests/execution_test.go +++ b/tests/execution_test.go @@ -237,30 +237,6 @@ func TestListWorkflowExecutionsWithTags(t *testing.T) { assert.Equal(t, len(resp.Executions), 2) } -func TestCreateWorkflowExecutionsWithTags(t *testing.T) { - truncateAllTablesForTestingOnly() - populateWorkflowExecutionsForTestingOnly() - - ctx := context.Background() - client, conn := GetTestAdminServiceClient() - defer conn.Close() - - _, err := client.CreateExecution(ctx, &admin.ExecutionCreateRequest{ - Project: "project1", - Domain: "domain1", - Name: "name5", - Spec: &admin.ExecutionSpec{Tags: []string{"hello", "flyte"}}, - }) - assert.Nil(t, err) - _, err = client.CreateExecution(ctx, &admin.ExecutionCreateRequest{ - Project: "project1", - Domain: "domain1", - Name: "name6", - Spec: &admin.ExecutionSpec{Tags: []string{"hello", "flyte"}}, - }) - assert.Nil(t, err) -} - func TestListWorkflowExecutions_Filters(t *testing.T) { truncateAllTablesForTestingOnly() populateWorkflowExecutionsForTestingOnly() From 4768d1cf3098f2ac988ff34a486727af833a987e Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 4 Aug 2023 15:28:23 -0700 Subject: [PATCH 22/26] BeforeCreate Signed-off-by: Kevin Su --- pkg/repositories/models/execution.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pkg/repositories/models/execution.go b/pkg/repositories/models/execution.go index 287486b1b..19b207546 100644 --- a/pkg/repositories/models/execution.go +++ b/pkg/repositories/models/execution.go @@ -1,6 +1,7 @@ package models import ( + "gorm.io/gorm/clause" "time" "gorm.io/gorm" @@ -70,3 +71,11 @@ type AdminTag struct { gorm.Model Name string `gorm:"index:,unique;size:255"` } + +func (b *AdminTag) BeforeCreate(tx *gorm.DB) (err error) { + tx.Statement.AddClause(clause.OnConflict{ + Columns: []clause.Column{{Name: "name"}}, // key column + DoUpdates: clause.AssignmentColumns([]string{"name"}), // column needed to be updated + }) + return nil +} From a1ed4abc033c997e0084138683e6b679d3dea3f8 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 4 Aug 2023 15:29:35 -0700 Subject: [PATCH 23/26] Update migration ID Signed-off-by: Kevin Su --- pkg/repositories/config/migrations.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/repositories/config/migrations.go b/pkg/repositories/config/migrations.go index 4873cd89e..2764b358f 100644 --- a/pkg/repositories/config/migrations.go +++ b/pkg/repositories/config/migrations.go @@ -1099,7 +1099,7 @@ var NoopMigrations = []*gormigrate.Migration{ }, { - ID: "2023-07-20-admin-tags", + ID: "2023-08-04-admin-tags", Migrate: func(tx *gorm.DB) error { type AdminTag struct { gorm.Model @@ -1114,7 +1114,7 @@ var NoopMigrations = []*gormigrate.Migration{ }, { - ID: "2023-07-20-noop-execution-admin-tags", + ID: "2023-08-04-noop-migration-execution-admin-tags", Migrate: func(tx *gorm.DB) error { type AdminTag struct { gorm.Model From 25b90f89f884cae62c03684773eec9552362a874 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 4 Aug 2023 15:39:02 -0700 Subject: [PATCH 24/26] lint Signed-off-by: Kevin Su --- pkg/repositories/models/execution.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/repositories/models/execution.go b/pkg/repositories/models/execution.go index 19b207546..931a3b720 100644 --- a/pkg/repositories/models/execution.go +++ b/pkg/repositories/models/execution.go @@ -1,9 +1,10 @@ package models import ( - "gorm.io/gorm/clause" "time" + "gorm.io/gorm/clause" + "gorm.io/gorm" "github.com/flyteorg/flytestdlib/storage" From 4c280e5237f80fdd027994d0cc6207b33b6e9407 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 4 Aug 2023 16:46:34 -0700 Subject: [PATCH 25/26] bump idl Signed-off-by: Kevin Su --- go.mod | 4 +--- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/go.mod b/go.mod index f76a08765..cf3c05f25 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/cloudevents/sdk-go/v2 v2.8.0 github.com/coreos/go-oidc v2.2.1+incompatible github.com/evanphx/json-patch v4.12.0+incompatible - github.com/flyteorg/flyteidl v1.5.11 + github.com/flyteorg/flyteidl v1.5.14 github.com/flyteorg/flyteplugins v1.0.67 github.com/flyteorg/flytepropeller v1.1.98 github.com/flyteorg/flytestdlib v1.0.20 @@ -213,5 +213,3 @@ replace github.com/robfig/cron/v3 => github.com/unionai/cron/v3 v3.0.2-0.2022091 // Retracted versions // This was published in error when attempting to create 1.5.1 Flyte release. retract v1.1.94 - -replace github.com/flyteorg/flyteidl => github.com/flyteorg/flyteidl v1.5.13-0.20230706064018-2340cc31a60b diff --git a/go.sum b/go.sum index 6eff1d905..9006c5c51 100644 --- a/go.sum +++ b/go.sum @@ -293,8 +293,8 @@ github.com/fatih/structs v1.0.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ= github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= -github.com/flyteorg/flyteidl v1.5.13-0.20230706064018-2340cc31a60b h1:swzL6DNqsm2p98hNHSpqH2IDtdSFOv/DTpDCWth1lEA= -github.com/flyteorg/flyteidl v1.5.13-0.20230706064018-2340cc31a60b/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og= +github.com/flyteorg/flyteidl v1.5.14 h1:+3ewipoOp82fPyIVgvvrMq1lorl5Kz3Lh6sh/a9+loI= +github.com/flyteorg/flyteidl v1.5.14/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og= github.com/flyteorg/flyteplugins v1.0.67 h1:d2FXpwxQwX/k4YdmhuusykOemHb/cUTPEob4WBmdpjE= github.com/flyteorg/flyteplugins v1.0.67/go.mod h1:HHt4nKDKVwrZPKDsj99dNtDSIJL378xNotYMA3a/TFA= github.com/flyteorg/flytepropeller v1.1.98 h1:Zk2ENYB9VZRT5tFUIFjm+aCkr0TU2EuyJ5gh52fpLoA= From 4af4f4fd686a2ff58148d7789ab38383af5f2d55 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 7 Aug 2023 12:12:23 -0700 Subject: [PATCH 26/26] update migration id Signed-off-by: Kevin Su --- pkg/repositories/config/migrations.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/repositories/config/migrations.go b/pkg/repositories/config/migrations.go index 2764b358f..dab390a04 100644 --- a/pkg/repositories/config/migrations.go +++ b/pkg/repositories/config/migrations.go @@ -1114,7 +1114,7 @@ var NoopMigrations = []*gormigrate.Migration{ }, { - ID: "2023-08-04-noop-migration-execution-admin-tags", + ID: "2023-08-04-execution-admin-tags", // A join table used to associate executions with tags Migrate: func(tx *gorm.DB) error { type AdminTag struct { gorm.Model