diff --git a/engine/api/api.go b/engine/api/api.go index 17c2848ad0..5bff139661 100644 --- a/engine/api/api.go +++ b/engine/api/api.go @@ -791,7 +791,7 @@ func (a *API) Serve(ctx context.Context) error { a.cleanRepositoryAnalysis(ctx, 1*time.Hour) }) a.GoRoutines.RunWithRestart(ctx, "workflow.ResyncWorkflowRunResultsRoutine", func(ctx context.Context) { - workflow.ResyncWorkflowRunResultsRoutine(ctx, a.mustDB, 5*time.Second) + workflow.ResyncWorkflowRunResultsRoutine(ctx, a.mustDB, a.Cache, 5*time.Second) }) if a.Config.Secrets.SnapshotRetentionDelay > 0 { a.GoRoutines.RunWithRestart(ctx, "workflow.CleanSecretsSnapshot", func(ctx context.Context) { diff --git a/engine/api/workflow/workflow_run_results.go b/engine/api/workflow/workflow_run_results.go index ed36c94cab..3bc6ca4d6a 100644 --- a/engine/api/workflow/workflow_run_results.go +++ b/engine/api/workflow/workflow_run_results.go @@ -413,7 +413,7 @@ func LoadRunResultsByRunIDAndType(ctx context.Context, db gorp.SqlExecutor, runI return getAll(ctx, db, query) } -func ResyncWorkflowRunResultsRoutine(ctx context.Context, DBFunc func() *gorp.DbMap, delay time.Duration) { +func ResyncWorkflowRunResultsRoutine(ctx context.Context, DBFunc func() *gorp.DbMap, store cache.Store, delay time.Duration) { tick := time.NewTicker(delay) defer tick.Stop() @@ -433,21 +433,35 @@ func ResyncWorkflowRunResultsRoutine(ctx context.Context, DBFunc func() *gorp.Db continue } for _, id := range ids { + lockKey := cache.Key("api:resyncWorkflowRunResults", fmt.Sprintf("%d", id)) + b, err := store.Lock(lockKey, 5*time.Minute, 0, 1) + if err != nil { + log.ErrorWithStackTrace(ctx, err) + continue + } + if !b { + log.Debug(ctx, "api.resyncWorkflowRunResults> workflow run %d is locked in cache", id) + continue + } tx, err := DBFunc().Begin() if err != nil { log.ErrorWithStackTrace(ctx, sdk.WithStack(err)) + _ = store.Unlock(lockKey) continue } if err := SyncRunResultArtifactManagerByRunID(ctx, tx, id); err != nil { log.ErrorWithStackTrace(ctx, err) tx.Rollback() + _ = store.Unlock(lockKey) continue } if err := tx.Commit(); err != nil { log.ErrorWithStackTrace(ctx, sdk.WithStack(err)) tx.Rollback() + _ = store.Unlock(lockKey) continue } + _ = store.Unlock(lockKey) } } }