From 9fee75d237678867dab55d6621a95c7a16ccd28a Mon Sep 17 00:00:00 2001 From: Richard LT Date: Tue, 24 Aug 2021 18:51:21 +0200 Subject: [PATCH] fix(api): recursively stop parent run using workflow hook (#5906) --- engine/api/workflow/dao.go | 3 ++ .../api/workflow/execute_outgoing_hook_run.go | 14 +------ engine/api/workflow_hook.go | 5 +-- engine/api/workflow_queue.go | 17 ++++----- engine/api/workflow_run.go | 38 +++++++++++-------- 5 files changed, 35 insertions(+), 42 deletions(-) diff --git a/engine/api/workflow/dao.go b/engine/api/workflow/dao.go index b08daa32fc..c3e577dc2f 100644 --- a/engine/api/workflow/dao.go +++ b/engine/api/workflow/dao.go @@ -49,6 +49,9 @@ func LoadAllNamesByProjectIDs(_ context.Context, db gorp.SqlExecutor, projectIDs // LoadAllByIDs returns all workflows by ids. func LoadAllByIDs(ctx context.Context, db gorp.SqlExecutor, ids []int64) (sdk.Workflows, error) { + if len(ids) == 0 { + return []sdk.Workflow{}, nil + } var dao WorkflowDAO dao.Filters.WorkflowIDs = ids return dao.LoadAll(ctx, db) diff --git a/engine/api/workflow/execute_outgoing_hook_run.go b/engine/api/workflow/execute_outgoing_hook_run.go index 214931c298..f978758944 100644 --- a/engine/api/workflow/execute_outgoing_hook_run.go +++ b/engine/api/workflow/execute_outgoing_hook_run.go @@ -5,7 +5,6 @@ import ( "fmt" "time" - "github.com/go-gorp/gorp" "github.com/rockbears/log" "github.com/ovh/cds/engine/cache" @@ -81,7 +80,7 @@ loop: } // UpdateParentWorkflowRun updates the workflow which triggered the current workflow -func UpdateParentWorkflowRun(ctx context.Context, dbFunc func() *gorp.DbMap, store cache.Store, wr *sdk.WorkflowRun, parentProj sdk.Project, parentWR *sdk.WorkflowRun) (*ProcessorReport, error) { +func UpdateParentWorkflowRun(ctx context.Context, tx gorpmapper.SqlExecutorWithTx, store cache.Store, wr *sdk.WorkflowRun, parentProj sdk.Project, parentWR *sdk.WorkflowRun) (*ProcessorReport, error) { _, end := telemetry.Span(ctx, "workflow.UpdateParentWorkflowRun") defer end() @@ -96,13 +95,6 @@ func UpdateParentWorkflowRun(ctx context.Context, dbFunc func() *gorp.DbMap, sto return nil, nil } - tx, err := dbFunc().Begin() - if err != nil { - return nil, sdk.WrapError(err, "Unable to start transaction") - } - - defer tx.Rollback() //nolint - hookrun := parentWR.GetOutgoingHookRun(wr.RootRun().HookEvent.ParentWorkflow.HookRunID) if hookrun == nil { return nil, sdk.WrapError(sdk.ErrNotFound, "unable to find hookrun") @@ -128,9 +120,5 @@ func UpdateParentWorkflowRun(ctx context.Context, dbFunc func() *gorp.DbMap, sto return nil, sdk.WrapError(err, "Unable to update outgoing hook run status") } - if err := tx.Commit(); err != nil { - return nil, sdk.WrapError(err, "Unable to commit transaction") - } - return report, nil } diff --git a/engine/api/workflow_hook.go b/engine/api/workflow_hook.go index 70a4945437..13bff30490 100644 --- a/engine/api/workflow_hook.go +++ b/engine/api/workflow_hook.go @@ -303,13 +303,10 @@ func (api *API) postWorkflowJobHookCallbackHandler() service.Handler { go api.WorkflowSendEvent(context.Background(), *proj, report) - report, err = api.updateParentWorkflowRun(ctx, wr) - if err != nil { + if err := api.updateParentWorkflowRun(ctx, wr); err != nil { return sdk.WithStack(err) } - go api.WorkflowSendEvent(context.Background(), *proj, report) - return nil } } diff --git a/engine/api/workflow_queue.go b/engine/api/workflow_queue.go index f0fcc1eae9..0d8ae9ca88 100644 --- a/engine/api/workflow_queue.go +++ b/engine/api/workflow_queue.go @@ -478,6 +478,13 @@ func (api *API) postWorkflowJobResultHandler() service.Handler { go api.WorkflowSendEvent(context.Background(), *proj, report) + for i := range report.WorkflowRuns() { + run := &report.WorkflowRuns()[i] + if err := api.updateParentWorkflowRun(ctx, run); err != nil { + return sdk.WithStack(err) + } + } + return nil } } @@ -622,16 +629,6 @@ func (api *API) postJobResult(ctx context.Context, tx gorpmapper.SqlExecutorWith return nil, sdk.WrapError(err, "cannot update NodeJobRun %d status", job.ID) } - for i := range report.WorkflowRuns() { - run := &report.WorkflowRuns()[i] - reportParent, err := api.updateParentWorkflowRun(ctx, run) - if err != nil { - return nil, sdk.WithStack(err) - } - - go api.WorkflowSendEvent(context.Background(), *proj, reportParent) - } - return report, nil } diff --git a/engine/api/workflow_run.go b/engine/api/workflow_run.go index 8f33cd729d..4075822e8c 100644 --- a/engine/api/workflow_run.go +++ b/engine/api/workflow_run.go @@ -493,34 +493,38 @@ func (api *API) stopWorkflowRun(ctx context.Context, p *sdk.Project, run *sdk.Wo } if parentWorkflowRunID == 0 { - report, err := api.updateParentWorkflowRun(ctx, run) - if err != nil { + if err := api.updateParentWorkflowRun(ctx, run); err != nil { return nil, sdk.WithStack(err) } - go api.WorkflowSendEvent(context.Background(), *p, report) } return report, nil } -func (api *API) updateParentWorkflowRun(ctx context.Context, run *sdk.WorkflowRun) (*workflow.ProcessorReport, error) { +func (api *API) updateParentWorkflowRun(ctx context.Context, run *sdk.WorkflowRun) error { if !run.HasParentWorkflow() { - return nil, nil + return nil } + tx, err := api.mustDB().Begin() + if err != nil { + return sdk.WrapError(err, "unable to start transaction") + } + defer tx.Rollback() //nolint + parentProj, err := project.Load(context.Background(), - api.mustDB(), run.RootRun().HookEvent.ParentWorkflow.Key, + tx, run.RootRun().HookEvent.ParentWorkflow.Key, project.LoadOptions.WithVariables, project.LoadOptions.WithIntegrations, project.LoadOptions.WithApplicationVariables, project.LoadOptions.WithApplicationWithDeploymentStrategies, ) if err != nil { - return nil, sdk.WrapError(err, "cannot load project") + return sdk.WrapError(err, "cannot load project") } parentWR, err := workflow.LoadRun(ctx, - api.mustDB(), + tx, run.RootRun().HookEvent.ParentWorkflow.Key, run.RootRun().HookEvent.ParentWorkflow.Name, run.RootRun().HookEvent.ParentWorkflow.Run, @@ -528,16 +532,22 @@ func (api *API) updateParentWorkflowRun(ctx context.Context, run *sdk.WorkflowRu DisableDetailledNodeRun: false, }) if err != nil { - return nil, sdk.WrapError(err, "unable to load parent run: %v", run.RootRun().HookEvent) + return sdk.WrapError(err, "unable to load parent run: %v", run.RootRun().HookEvent) } - report, err := workflow.UpdateParentWorkflowRun(ctx, api.mustDB, api.Cache, run, *parentProj, parentWR) + report, err := workflow.UpdateParentWorkflowRun(ctx, tx, api.Cache, run, *parentProj, parentWR) if err != nil { - return nil, sdk.WithStack(err) + return sdk.WithStack(err) + } + + if err := tx.Commit(); err != nil { + return sdk.WithStack(err) } + go api.WorkflowSendEvent(context.Background(), *parentProj, report) - return report, nil + // Recursively update the parent run + return api.updateParentWorkflowRun(ctx, parentWR) } func (api *API) getWorkflowNodeRunHistoryHandler() service.Handler { @@ -1111,11 +1121,9 @@ func (api *API) initWorkflowRun(ctx context.Context, projKey string, wf *sdk.Wor // Update parent for i := range report.WorkflowRuns() { run := &report.WorkflowRuns()[i] - reportParent, err := api.updateParentWorkflowRun(ctx, run) - if err != nil { + if err := api.updateParentWorkflowRun(ctx, run); err != nil { log.Error(ctx, "unable to update parent workflow run: %v", err) } - go api.WorkflowSendEvent(context.Background(), *p, reportParent) } }