Skip to content

Commit

Permalink
feat(api): metrics about workflow runs purge
Browse files Browse the repository at this point in the history
  • Loading branch information
fsamin committed Feb 21, 2019
1 parent 64a1cab commit 9baf85b
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 37 deletions.
47 changes: 29 additions & 18 deletions engine/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,21 +233,23 @@ type API struct {
warnChan chan sdk.Event
Cache cache.Store
Metrics struct {
WorkflowRunFailed *stats.Int64Measure
WorkflowRunStarted *stats.Int64Measure
Sessions *stats.Int64Measure
nbUsers *stats.Int64Measure
nbApplications *stats.Int64Measure
nbProjects *stats.Int64Measure
nbGroups *stats.Int64Measure
nbPipelines *stats.Int64Measure
nbWorkflows *stats.Int64Measure
nbArtifacts *stats.Int64Measure
nbWorkerModels *stats.Int64Measure
nbWorkflowRuns *stats.Int64Measure
nbWorkflowNodeRuns *stats.Int64Measure
nbMaxWorkersBuilding *stats.Int64Measure
queue *stats.Int64Measure
WorkflowRunFailed *stats.Int64Measure
WorkflowRunStarted *stats.Int64Measure
Sessions *stats.Int64Measure
nbUsers *stats.Int64Measure
nbApplications *stats.Int64Measure
nbProjects *stats.Int64Measure
nbGroups *stats.Int64Measure
nbPipelines *stats.Int64Measure
nbWorkflows *stats.Int64Measure
nbArtifacts *stats.Int64Measure
nbWorkerModels *stats.Int64Measure
nbWorkflowRuns *stats.Int64Measure
nbWorkflowNodeRuns *stats.Int64Measure
nbMaxWorkersBuilding *stats.Int64Measure
queue *stats.Int64Measure
WorkflowRunsMarkToDelete *stats.Int64Measure
WorkflowRunsDeleted *stats.Int64Measure
}
}

Expand Down Expand Up @@ -783,9 +785,18 @@ func (a *API) Serve(ctx context.Context) error {
func(ctx context.Context) {
workflow.Initialize(ctx, a.DBConnectionFactory.GetDBMap, a.Cache, a.Config.URL.UI, a.Config.DefaultOS, a.Config.DefaultArch)
}, a.PanicDump())
sdk.GoRoutine(ctx, "PushInElasticSearch", func(ctx context.Context) { event.PushInElasticSearch(ctx, a.mustDB(), a.Cache) }, a.PanicDump())
sdk.GoRoutine(ctx, "Metrics.pushInElasticSearch", func(ctx context.Context) { metrics.Init(ctx, a.DBConnectionFactory.GetDBMap) }, a.PanicDump())
sdk.GoRoutine(ctx, "Purge", func(ctx context.Context) { purge.Initialize(ctx, a.Cache, a.DBConnectionFactory.GetDBMap) }, a.PanicDump())
sdk.GoRoutine(ctx, "PushInElasticSearch",
func(ctx context.Context) {
event.PushInElasticSearch(ctx, a.mustDB(), a.Cache)
}, a.PanicDump())
sdk.GoRoutine(ctx, "Metrics.pushInElasticSearch",
func(ctx context.Context) {
metrics.Init(ctx, a.DBConnectionFactory.GetDBMap)
}, a.PanicDump())
sdk.GoRoutine(ctx, "Purge",
func(ctx context.Context) {
purge.Initialize(ctx, a.Cache, a.DBConnectionFactory.GetDBMap, a.Metrics.WorkflowRunsMarkToDelete, a.Metrics.WorkflowRunsDeleted)
}, a.PanicDump())

s := &http.Server{
Addr: fmt.Sprintf("%s:%d", a.Config.HTTP.Addr, a.Config.HTTP.Port),
Expand Down
30 changes: 21 additions & 9 deletions engine/api/purge/purge.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,42 +6,44 @@ import (
"time"

"github.com/go-gorp/gorp"
"go.opencensus.io/stats"

"github.com/ovh/cds/engine/api/cache"
"github.com/ovh/cds/engine/api/observability"
"github.com/ovh/cds/engine/api/project"
"github.com/ovh/cds/engine/api/workflow"
"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/log"
)

//Initialize starts goroutines for workflows
func Initialize(c context.Context, store cache.Store, DBFunc func() *gorp.DbMap) {
func Initialize(ctx context.Context, store cache.Store, DBFunc func() *gorp.DbMap, workflowRunsMarkToDelete, workflowRunsDeleted *stats.Int64Measure) {
tickPurge := time.NewTicker(30 * time.Minute)
defer tickPurge.Stop()

for {
select {
case <-c.Done():
if c.Err() != nil {
log.Error("Exiting purge: %v", c.Err())
case <-ctx.Done():
if ctx.Err() != nil {
log.Error("Exiting purge: %v", ctx.Err())
return
}
case <-tickPurge.C:
log.Debug("purge> Deleting all workflow run marked to delete...")
if err := deleteWorkflowRunsHistory(DBFunc()); err != nil {
if err := deleteWorkflowRunsHistory(ctx, DBFunc(), workflowRunsDeleted); err != nil {
log.Warning("purge> Error on deleteWorkflowRunsHistory : %v", err)
}

log.Debug("purge> Deleting all workflow marked to delete....")
if err := workflows(c, DBFunc(), store); err != nil {
if err := workflows(ctx, DBFunc(), store, workflowRunsMarkToDelete); err != nil {
log.Warning("purge> Error on workflows : %v", err)
}
}
}
}

// workflows purges all marked workflows
func workflows(ctx context.Context, db *gorp.DbMap, store cache.Store) error {
func workflows(ctx context.Context, db *gorp.DbMap, store cache.Store, workflowRunsMarkToDelete *stats.Int64Measure) error {
query := "SELECT id, project_id FROM workflow WHERE to_delete = true ORDER BY id ASC"
res := []struct {
ID int64 `db:"id"`
Expand Down Expand Up @@ -80,6 +82,9 @@ func workflows(ctx context.Context, db *gorp.DbMap, store cache.Store) error {
}
if n > 0 {
// If there is workflow runs to delete, wait for it...
if workflowRunsMarkToDelete != nil {
observability.Record(ctx, workflowRunsMarkToDelete, int64(n))
}
continue
}

Expand Down Expand Up @@ -126,12 +131,19 @@ func workflows(ctx context.Context, db *gorp.DbMap, store cache.Store) error {
}

// deleteWorkflowRunsHistory is useful to delete all the workflow run marked with to delete flag in db
func deleteWorkflowRunsHistory(db gorp.SqlExecutor) error {
func deleteWorkflowRunsHistory(ctx context.Context, db gorp.SqlExecutor, workflowRunsDeleted *stats.Int64Measure) error {
query := `DELETE FROM workflow_run WHERE workflow_run.id IN (SELECT id FROM workflow_run WHERE to_delete = true LIMIT 30)`

if _, err := db.Exec(query); err != nil {
res, err := db.Exec(query)
if err != nil {
log.Warning("deleteWorkflowRunsHistory> Unable to delete workflow history %s", err)
return err
}

n, _ := res.RowsAffected()
if workflowRunsDeleted != nil {
observability.Record(ctx, workflowRunsDeleted, n)
}

return nil
}
7 changes: 7 additions & 0 deletions engine/api/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,11 @@ func (api *API) initMetrics(ctx context.Context) error {

api.Metrics.queue = stats.Int64("cds/cds-api/queue", "queue", stats.UnitDimensionless)

label = fmt.Sprintf("cds/cds-api/%s/workflow_runs_mark_to_delete", api.Name)
api.Metrics.WorkflowRunsMarkToDelete = stats.Int64(label, "number of workflow runs mark to delete", stats.UnitDimensionless)
label = fmt.Sprintf("cds/cds-api/%s/workflow_runs_deleted", api.Name)
api.Metrics.WorkflowRunsDeleted = stats.Int64(label, "number of workflow runs deleted", stats.UnitDimensionless)

tagRange, _ = tag.NewKey("range")
tagStatus, _ = tag.NewKey("status")
tagServiceName, _ = tag.NewKey("name")
Expand All @@ -280,6 +285,8 @@ func (api *API) initMetrics(ctx context.Context) error {
observability.NewViewLast("queue", api.Metrics.queue, tagsRange),
observability.NewViewCount("workflow_runs_started", api.Metrics.WorkflowRunStarted, tags),
observability.NewViewCount("workflow_runs_failed", api.Metrics.WorkflowRunFailed, tags),
observability.NewViewCount("workflow_runs_mark_to_delete", api.Metrics.WorkflowRunsMarkToDelete, tags),
observability.NewViewCount("workflow_runs_deleted", api.Metrics.WorkflowRunsDeleted, tags),
)

return err
Expand Down
19 changes: 16 additions & 3 deletions engine/api/workflow/dao_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/go-gorp/gorp"
"go.opencensus.io/stats"

"github.com/ovh/cds/engine/api/database/gorpmapping"
"github.com/ovh/cds/engine/api/observability"
Expand Down Expand Up @@ -617,7 +618,7 @@ func PurgeAllWorkflowRunsByWorkflowID(db gorp.SqlExecutor, id int64) (int, error
}

// PurgeWorkflowRun mark all workflow run to delete
func PurgeWorkflowRun(db gorp.SqlExecutor, wf sdk.Workflow) error {
func PurgeWorkflowRun(ctx context.Context, db gorp.SqlExecutor, wf sdk.Workflow, workflowRunsMarkToDelete *stats.Int64Measure) error {
ids := []struct {
Ids string `json:"ids" db:"ids"`
}{}
Expand Down Expand Up @@ -666,11 +667,17 @@ func PurgeWorkflowRun(db gorp.SqlExecutor, wf sdk.Workflow) error {
LIMIT 100
)
`
if _, err := db.Exec(qDelete, wf.ID, lastWfrID, sdk.StatusBuilding.String(), sdk.StatusChecking.String(), sdk.StatusWaiting.String()); err != nil {
res, err := db.Exec(qDelete, wf.ID, lastWfrID, sdk.StatusBuilding.String(), sdk.StatusChecking.String(), sdk.StatusWaiting.String())
if err != nil {
log.Warning("PurgeWorkflowRun> Unable to update workflow run for purge without tags for workflow id %d and history length %d : %s", wf.ID, wf.HistoryLength, err)
return err
}

n, _ := res.RowsAffected()
if workflowRunsMarkToDelete != nil {
observability.Record(ctx, workflowRunsMarkToDelete, n)
}

return nil
}

Expand Down Expand Up @@ -763,10 +770,16 @@ func PurgeWorkflowRun(db gorp.SqlExecutor, wf sdk.Workflow) error {
}

queryUpdate := `UPDATE workflow_run SET to_delete = true WHERE workflow_run.id = ANY(string_to_array($1, ',')::bigint[])`
if _, err := db.Exec(queryUpdate, strings.Join(idsToUpdate, ",")); err != nil {
res, err := db.Exec(queryUpdate, strings.Join(idsToUpdate, ","))
if err != nil {
log.Warning("PurgeWorkflowRun> Unable to update workflow run for purge for workflow id %d and history length %d : %s", wf.ID, wf.HistoryLength, err)
return err
}

n, _ := res.RowsAffected()
if workflowRunsMarkToDelete != nil {
observability.Record(ctx, workflowRunsMarkToDelete, n)
}
return nil
}

Expand Down
12 changes: 6 additions & 6 deletions engine/api/workflow/dao_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func TestPurgeWorkflowRun(t *testing.T) {
test.NoError(t, errWr)
}

errP := workflow.PurgeWorkflowRun(db, *w1)
errP := workflow.PurgeWorkflowRun(context.Background(), db, *w1, nil)
test.NoError(t, errP)

wruns, _, _, count, errRuns := workflow.LoadRuns(db, proj.Key, w1.Name, 0, 10, nil)
Expand Down Expand Up @@ -237,7 +237,7 @@ func TestPurgeWorkflowRunWithRunningStatus(t *testing.T) {
test.NoError(t, workflow.UpdateWorkflowRunStatus(db, wfr))
}

errP := workflow.PurgeWorkflowRun(db, *w1)
errP := workflow.PurgeWorkflowRun(context.Background(), db, *w1, nil)
test.NoError(t, errP)

wruns, _, _, count, errRuns := workflow.LoadRuns(db, proj.Key, w1.Name, 0, 10, nil)
Expand Down Expand Up @@ -341,7 +341,7 @@ func TestPurgeWorkflowRunWithOneSuccessWorkflowRun(t *testing.T) {
test.NoError(t, workflow.UpdateWorkflowRunStatus(db, wfr))
}

errP := workflow.PurgeWorkflowRun(db, *w1)
errP := workflow.PurgeWorkflowRun(context.Background(), db, *w1, nil)
test.NoError(t, errP)

wruns, _, _, count, errRuns := workflow.LoadRuns(db, proj.Key, w1.Name, 0, 10, nil)
Expand Down Expand Up @@ -440,7 +440,7 @@ func TestPurgeWorkflowRunWithNoSuccessWorkflowRun(t *testing.T) {
test.NoError(t, workflow.UpdateWorkflowRunStatus(db, wfr))
}

errP := workflow.PurgeWorkflowRun(db, *w1)
errP := workflow.PurgeWorkflowRun(context.Background(), db, *w1, nil)
test.NoError(t, errP)

wruns, _, _, count, errRuns := workflow.LoadRuns(db, proj.Key, w1.Name, 0, 10, nil)
Expand Down Expand Up @@ -530,7 +530,7 @@ func TestPurgeWorkflowRunWithoutTags(t *testing.T) {
test.NoError(t, errWr)
}

errP := workflow.PurgeWorkflowRun(db, *w1)
errP := workflow.PurgeWorkflowRun(context.Background(), db, *w1, nil)
test.NoError(t, errP)

wruns, _, _, count, errRuns := workflow.LoadRuns(db, proj.Key, w1.Name, 0, 10, nil)
Expand Down Expand Up @@ -620,7 +620,7 @@ func TestPurgeWorkflowRunWithoutTagsBiggerHistoryLength(t *testing.T) {
test.NoError(t, errWr)
}

errP := workflow.PurgeWorkflowRun(db, *w1)
errP := workflow.PurgeWorkflowRun(context.Background(), db, *w1, nil)
test.NoError(t, errP)

wruns, _, _, count, errRuns := workflow.LoadRuns(db, proj.Key, w1.Name, 0, 10, nil)
Expand Down
2 changes: 1 addition & 1 deletion engine/api/workflow_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,7 @@ func (api *API) postWorkflowRunHandler() service.Handler {

// Purge workflow run
sdk.GoRoutine(ctx, "workflow.PurgeWorkflowRun", func(ctx context.Context) {
if err := workflow.PurgeWorkflowRun(api.mustDB(), *wf); err != nil {
if err := workflow.PurgeWorkflowRun(ctx, api.mustDB(), *wf, api.Metrics.WorkflowRunsMarkToDelete); err != nil {
log.Error("workflow.PurgeWorkflowRun> error %v", err)
}
}, api.PanicDump())
Expand Down

0 comments on commit 9baf85b

Please sign in to comment.