Skip to content

Commit

Permalink
fix(api): recursively stop parent run using workflow hook (#5906)
Browse files Browse the repository at this point in the history
  • Loading branch information
richardlt authored Aug 24, 2021
1 parent 08b7414 commit 9fee75d
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 42 deletions.
3 changes: 3 additions & 0 deletions engine/api/workflow/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ func LoadAllNamesByProjectIDs(_ context.Context, db gorp.SqlExecutor, projectIDs

// LoadAllByIDs returns all workflows by ids.
func LoadAllByIDs(ctx context.Context, db gorp.SqlExecutor, ids []int64) (sdk.Workflows, error) {
if len(ids) == 0 {
return []sdk.Workflow{}, nil
}
var dao WorkflowDAO
dao.Filters.WorkflowIDs = ids
return dao.LoadAll(ctx, db)
Expand Down
14 changes: 1 addition & 13 deletions engine/api/workflow/execute_outgoing_hook_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"time"

"github.com/go-gorp/gorp"
"github.com/rockbears/log"

"github.com/ovh/cds/engine/cache"
Expand Down Expand Up @@ -81,7 +80,7 @@ loop:
}

// UpdateParentWorkflowRun updates the workflow which triggered the current workflow
func UpdateParentWorkflowRun(ctx context.Context, dbFunc func() *gorp.DbMap, store cache.Store, wr *sdk.WorkflowRun, parentProj sdk.Project, parentWR *sdk.WorkflowRun) (*ProcessorReport, error) {
func UpdateParentWorkflowRun(ctx context.Context, tx gorpmapper.SqlExecutorWithTx, store cache.Store, wr *sdk.WorkflowRun, parentProj sdk.Project, parentWR *sdk.WorkflowRun) (*ProcessorReport, error) {
_, end := telemetry.Span(ctx, "workflow.UpdateParentWorkflowRun")
defer end()

Expand All @@ -96,13 +95,6 @@ func UpdateParentWorkflowRun(ctx context.Context, dbFunc func() *gorp.DbMap, sto
return nil, nil
}

tx, err := dbFunc().Begin()
if err != nil {
return nil, sdk.WrapError(err, "Unable to start transaction")
}

defer tx.Rollback() //nolint

hookrun := parentWR.GetOutgoingHookRun(wr.RootRun().HookEvent.ParentWorkflow.HookRunID)
if hookrun == nil {
return nil, sdk.WrapError(sdk.ErrNotFound, "unable to find hookrun")
Expand All @@ -128,9 +120,5 @@ func UpdateParentWorkflowRun(ctx context.Context, dbFunc func() *gorp.DbMap, sto
return nil, sdk.WrapError(err, "Unable to update outgoing hook run status")
}

if err := tx.Commit(); err != nil {
return nil, sdk.WrapError(err, "Unable to commit transaction")
}

return report, nil
}
5 changes: 1 addition & 4 deletions engine/api/workflow_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,13 +303,10 @@ func (api *API) postWorkflowJobHookCallbackHandler() service.Handler {

go api.WorkflowSendEvent(context.Background(), *proj, report)

report, err = api.updateParentWorkflowRun(ctx, wr)
if err != nil {
if err := api.updateParentWorkflowRun(ctx, wr); err != nil {
return sdk.WithStack(err)
}

go api.WorkflowSendEvent(context.Background(), *proj, report)

return nil
}
}
Expand Down
17 changes: 7 additions & 10 deletions engine/api/workflow_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,13 @@ func (api *API) postWorkflowJobResultHandler() service.Handler {

go api.WorkflowSendEvent(context.Background(), *proj, report)

for i := range report.WorkflowRuns() {
run := &report.WorkflowRuns()[i]
if err := api.updateParentWorkflowRun(ctx, run); err != nil {
return sdk.WithStack(err)
}
}

return nil
}
}
Expand Down Expand Up @@ -622,16 +629,6 @@ func (api *API) postJobResult(ctx context.Context, tx gorpmapper.SqlExecutorWith
return nil, sdk.WrapError(err, "cannot update NodeJobRun %d status", job.ID)
}

for i := range report.WorkflowRuns() {
run := &report.WorkflowRuns()[i]
reportParent, err := api.updateParentWorkflowRun(ctx, run)
if err != nil {
return nil, sdk.WithStack(err)
}

go api.WorkflowSendEvent(context.Background(), *proj, reportParent)
}

return report, nil
}

Expand Down
38 changes: 23 additions & 15 deletions engine/api/workflow_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,51 +493,61 @@ func (api *API) stopWorkflowRun(ctx context.Context, p *sdk.Project, run *sdk.Wo
}

if parentWorkflowRunID == 0 {
report, err := api.updateParentWorkflowRun(ctx, run)
if err != nil {
if err := api.updateParentWorkflowRun(ctx, run); err != nil {
return nil, sdk.WithStack(err)
}
go api.WorkflowSendEvent(context.Background(), *p, report)
}

return report, nil
}

func (api *API) updateParentWorkflowRun(ctx context.Context, run *sdk.WorkflowRun) (*workflow.ProcessorReport, error) {
func (api *API) updateParentWorkflowRun(ctx context.Context, run *sdk.WorkflowRun) error {
if !run.HasParentWorkflow() {
return nil, nil
return nil
}

tx, err := api.mustDB().Begin()
if err != nil {
return sdk.WrapError(err, "unable to start transaction")
}
defer tx.Rollback() //nolint

parentProj, err := project.Load(context.Background(),
api.mustDB(), run.RootRun().HookEvent.ParentWorkflow.Key,
tx, run.RootRun().HookEvent.ParentWorkflow.Key,
project.LoadOptions.WithVariables,
project.LoadOptions.WithIntegrations,
project.LoadOptions.WithApplicationVariables,
project.LoadOptions.WithApplicationWithDeploymentStrategies,
)
if err != nil {
return nil, sdk.WrapError(err, "cannot load project")
return sdk.WrapError(err, "cannot load project")
}

parentWR, err := workflow.LoadRun(ctx,
api.mustDB(),
tx,
run.RootRun().HookEvent.ParentWorkflow.Key,
run.RootRun().HookEvent.ParentWorkflow.Name,
run.RootRun().HookEvent.ParentWorkflow.Run,
workflow.LoadRunOptions{
DisableDetailledNodeRun: false,
})
if err != nil {
return nil, sdk.WrapError(err, "unable to load parent run: %v", run.RootRun().HookEvent)
return sdk.WrapError(err, "unable to load parent run: %v", run.RootRun().HookEvent)
}

report, err := workflow.UpdateParentWorkflowRun(ctx, api.mustDB, api.Cache, run, *parentProj, parentWR)
report, err := workflow.UpdateParentWorkflowRun(ctx, tx, api.Cache, run, *parentProj, parentWR)
if err != nil {
return nil, sdk.WithStack(err)
return sdk.WithStack(err)
}

if err := tx.Commit(); err != nil {
return sdk.WithStack(err)
}

go api.WorkflowSendEvent(context.Background(), *parentProj, report)

return report, nil
// Recursively update the parent run
return api.updateParentWorkflowRun(ctx, parentWR)
}

func (api *API) getWorkflowNodeRunHistoryHandler() service.Handler {
Expand Down Expand Up @@ -1111,11 +1121,9 @@ func (api *API) initWorkflowRun(ctx context.Context, projKey string, wf *sdk.Wor
// Update parent
for i := range report.WorkflowRuns() {
run := &report.WorkflowRuns()[i]
reportParent, err := api.updateParentWorkflowRun(ctx, run)
if err != nil {
if err := api.updateParentWorkflowRun(ctx, run); err != nil {
log.Error(ctx, "unable to update parent workflow run: %v", err)
}
go api.WorkflowSendEvent(context.Background(), *p, reportParent)
}
}

Expand Down

0 comments on commit 9fee75d

Please sign in to comment.