Skip to content

Commit

Permalink
fix(api): remove deacklock on database (#5945)
Browse files Browse the repository at this point in the history
Signed-off-by: francois  samin <[email protected]>
  • Loading branch information
fsamin authored Sep 20, 2021
1 parent a752427 commit ef9d729
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 9 deletions.
10 changes: 8 additions & 2 deletions engine/api/workflow/dao_node_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,9 @@ func UpdateNodeRun(db gorp.SqlExecutor, n *sdk.WorkflowNodeRun) error {

// GetNodeRunBuildCommits gets commits for given node run and return current vcs info
func GetNodeRunBuildCommits(ctx context.Context, db gorpmapper.SqlExecutorWithTx, store cache.Store, proj sdk.Project, wf sdk.Workflow, wNodeName string, number int64, nodeRun *sdk.WorkflowNodeRun, app *sdk.Application, env *sdk.Environment) ([]sdk.VCSCommit, sdk.BuildNumberAndHash, error) {
ctx, end := telemetry.Span(ctx, "workflow.GetNodeRunBuildCommits")
defer end()

var cur sdk.BuildNumberAndHash
if app == nil {
log.Debug(ctx, "GetNodeRunBuildCommits> No app linked")
Expand Down Expand Up @@ -791,8 +794,11 @@ func PreviousNodeRunVCSInfos(ctx context.Context, db gorp.SqlExecutor, projectKe
return previous, nil
}

func updateNodeRunCommits(db gorp.SqlExecutor, id int64, commits []sdk.VCSCommit) error {
log.Debug(context.TODO(), "updateNodeRunCommits> Updating %d commits for workflow_node_run #%d", len(commits), id)
func updateNodeRunCommits(ctx context.Context, db gorp.SqlExecutor, id int64, commits []sdk.VCSCommit) error {
ctx, end := telemetry.Span(ctx, "workflow.updateNodeRunCommits")
defer end()

log.Debug(ctx, "updateNodeRunCommits> Updating %d commits for workflow_node_run #%d", len(commits), id)
commitsBtes, errMarshal := json.Marshal(commits)
if errMarshal != nil {
return sdk.WrapError(errMarshal, "updateNodeRunCommits> Unable to marshal commits")
Expand Down
28 changes: 21 additions & 7 deletions engine/api/workflow/resync_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,17 +97,26 @@ func ResyncNodeRunsWithCommits(ctx context.Context, db *gorp.DbMap, store cache.
continue
}

go func(nr sdk.WorkflowNodeRun) {
go func(nrID int64) {
tx, err := db.Begin()
if err != nil {
ctx := log.ContextWithStackTrace(ctx, err)
log.Error(ctx, "ResyncNodeRuns> Cannot begin db tx: %v", sdk.WithStack(err))
return
}
defer tx.Rollback() // nolint

wr, errL := LoadAndLockRunByID(tx, nr.WorkflowRunID, LoadRunOptions{})
if errL != nil {
log.Error(ctx, "ResyncNodeRuns> Unable to load workflowRun by id %d: %v", nr.WorkflowRunID, errL)
nr, err := LoadAndLockNodeRunByID(ctx, tx, nrID)
if err != nil {
ctx := log.ContextWithStackTrace(ctx, err)
log.Error(ctx, "ResyncNodeRuns> Unable to load workflowNodeRun by id %d: %v", nrID, err)
return
}

wr, err := LoadRunByID(tx, nr.WorkflowRunID, LoadRunOptions{})
if err != nil {
ctx := log.ContextWithStackTrace(ctx, err)
log.Error(ctx, "ResyncNodeRuns> Unable to load workflowRun by id %d: %v", nr.WorkflowRunID, err)
return
}

Expand All @@ -117,6 +126,7 @@ func ResyncNodeRunsWithCommits(ctx context.Context, db *gorp.DbMap, store cache.

n := wr.Workflow.WorkflowData.NodeByID(nr.WorkflowNodeID)
if n == nil {
ctx := log.ContextWithStackTrace(ctx, err)
log.Error(ctx, "ResyncNodeRuns> Unable to find node data by id %d in a workflow run id %d", nr.WorkflowNodeID, nr.WorkflowRunID)
return
}
Expand All @@ -131,15 +141,17 @@ func ResyncNodeRunsWithCommits(ctx context.Context, db *gorp.DbMap, store cache.
}

//New context because we are in goroutine
commits, curVCSInfos, err := GetNodeRunBuildCommits(context.TODO(), tx, store, proj, wr.Workflow, nodeName, wr.Number, &nr, &app, env)
commits, curVCSInfos, err := GetNodeRunBuildCommits(ctx, tx, store, proj, wr.Workflow, nodeName, wr.Number, nr, &app, env)
if err != nil {
ctx := log.ContextWithStackTrace(ctx, err)
log.Error(ctx, "ResyncNodeRuns> cannot get build commits on a node run %v", err)
} else if commits != nil {
nr.Commits = commits
}

if len(commits) > 0 {
if err := updateNodeRunCommits(tx, nr.ID, commits); err != nil {
if err := updateNodeRunCommits(ctx, tx, nr.ID, commits); err != nil {
ctx := log.ContextWithStackTrace(ctx, err)
log.Error(ctx, "ResyncNodeRuns> Unable to update node run commits %v", err)
}
}
Expand All @@ -160,13 +172,15 @@ func ResyncNodeRunsWithCommits(ctx context.Context, db *gorp.DbMap, store cache.

if tagsUpdated {
if err := UpdateWorkflowRunTags(tx, wr); err != nil {
ctx := log.ContextWithStackTrace(ctx, err)
log.Error(ctx, "ResyncNodeRuns> Unable to update workflow run tags %v", err)
}
}

if err := tx.Commit(); err != nil {
ctx := log.ContextWithStackTrace(ctx, err)
log.Error(ctx, "ResyncNodeRuns> Cannot commit db tx: %v", sdk.WithStack(err))
}
}(nodeRun)
}(nodeRun.ID)
}
}

0 comments on commit ef9d729

Please sign in to comment.