Skip to content

Commit

Permalink
feat(api): add lock to resync workflow run results (#6364)
Browse files Browse the repository at this point in the history
Signed-off-by: richardlt <[email protected]>

Signed-off-by: richardlt <[email protected]>
  • Loading branch information
richardlt authored Nov 15, 2022
1 parent 05db815 commit c04cf69
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 2 deletions.
2 changes: 1 addition & 1 deletion engine/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -791,7 +791,7 @@ func (a *API) Serve(ctx context.Context) error {
a.cleanRepositoryAnalysis(ctx, 1*time.Hour)
})
a.GoRoutines.RunWithRestart(ctx, "workflow.ResyncWorkflowRunResultsRoutine", func(ctx context.Context) {
workflow.ResyncWorkflowRunResultsRoutine(ctx, a.mustDB, 5*time.Second)
workflow.ResyncWorkflowRunResultsRoutine(ctx, a.mustDB, a.Cache, 5*time.Second)
})
if a.Config.Secrets.SnapshotRetentionDelay > 0 {
a.GoRoutines.RunWithRestart(ctx, "workflow.CleanSecretsSnapshot", func(ctx context.Context) {
Expand Down
16 changes: 15 additions & 1 deletion engine/api/workflow/workflow_run_results.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ func LoadRunResultsByRunIDAndType(ctx context.Context, db gorp.SqlExecutor, runI
return getAll(ctx, db, query)
}

func ResyncWorkflowRunResultsRoutine(ctx context.Context, DBFunc func() *gorp.DbMap, delay time.Duration) {
func ResyncWorkflowRunResultsRoutine(ctx context.Context, DBFunc func() *gorp.DbMap, store cache.Store, delay time.Duration) {
tick := time.NewTicker(delay)
defer tick.Stop()

Expand All @@ -433,21 +433,35 @@ func ResyncWorkflowRunResultsRoutine(ctx context.Context, DBFunc func() *gorp.Db
continue
}
for _, id := range ids {
lockKey := cache.Key("api:resyncWorkflowRunResults", fmt.Sprintf("%d", id))
b, err := store.Lock(lockKey, 5*time.Minute, 0, 1)
if err != nil {
log.ErrorWithStackTrace(ctx, err)
continue
}
if !b {
log.Debug(ctx, "api.resyncWorkflowRunResults> workflow run %d is locked in cache", id)
continue
}
tx, err := DBFunc().Begin()
if err != nil {
log.ErrorWithStackTrace(ctx, sdk.WithStack(err))
_ = store.Unlock(lockKey)
continue
}
if err := SyncRunResultArtifactManagerByRunID(ctx, tx, id); err != nil {
log.ErrorWithStackTrace(ctx, err)
tx.Rollback()
_ = store.Unlock(lockKey)
continue
}
if err := tx.Commit(); err != nil {
log.ErrorWithStackTrace(ctx, sdk.WithStack(err))
tx.Rollback()
_ = store.Unlock(lockKey)
continue
}
_ = store.Unlock(lockKey)
}
}
}
Expand Down

0 comments on commit c04cf69

Please sign in to comment.