Skip to content

Commit

Permalink
feat(api,ui): add workflow retention policy (#5474)
Browse files Browse the repository at this point in the history
  • Loading branch information
sguiheux authored Oct 8, 2020
1 parent bbbdcd3 commit 1830951
Show file tree
Hide file tree
Showing 47 changed files with 1,145 additions and 199 deletions.
1 change: 1 addition & 0 deletions cli/cdsctl/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func adminCommands() []*cobra.Command {
adminErrors(),
adminCurl(),
adminFeatures(),
adminWorkflows(),
}
}

Expand Down
42 changes: 42 additions & 0 deletions cli/cdsctl/admin_workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package main

import (
"github.com/ovh/cds/cli"
"github.com/spf13/cobra"
)

var adminWorkflowsCmd = cli.Command{
Name: "workflows",
Aliases: []string{"workflow"},
Short: "Manage CDS workflows",
}

func adminWorkflows() *cobra.Command {
return cli.NewCommand(adminWorkflowsCmd, nil, []*cobra.Command{
cli.NewCommand(adminWorkflowUpdateMaxRunCmd, adminWorkflowUpdateMaxRun, nil),
})
}

var adminWorkflowUpdateMaxRunCmd = cli.Command{
Name: "maxrun",
Short: "Update the maximum number of workflow executions",
Args: []cli.Arg{
{
Name: "projectKey",
},
{
Name: "workflowName",
},
{
Name: "maxRuns",
},
},
}

func adminWorkflowUpdateMaxRun(v cli.Values) error {
maxRuns, err := v.GetInt64("maxRuns")
if err != nil {
return err
}
return client.AdminWorkflowUpdateMaxRuns(v.GetString("projectKey"), v.GetString("workflowName"), maxRuns)
}
5 changes: 5 additions & 0 deletions docs/content/docs/concepts/files/workflow-syntax.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ notifications:
on_success: never
recipients:
- [email protected]
retention_policy: return run_days_before < 7
```
There are two major things to understand: `workflow` and `hooks`. A workflow is a kind of graph starting from a root pipeline, and other pipelines with dependencies. In this example, the `deploy` pipeline will be triggered after the `build` pipeline.
Expand Down Expand Up @@ -143,3 +144,7 @@ workflow:
# ...
one_at_a_time: true # No concurent deployments
```

## Retention Policy

[Retention documentation]({{<relref "/docs/concepts/workflow/retention.md">}})
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
25 changes: 25 additions & 0 deletions docs/content/docs/concepts/workflow/retention.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
---
title: "Retention"
weight: 10
---

You can configure the workflow run retention in the workflow advanced section on the CDS UI.

![retention.png](../images/workflow_retention.png)


* The first line defines the number maximum of runs that CDS can keep for this workflow. Only a CDS administrator can update this value.

* On the second line, you will be able to define your retention policy through a lua condition.
You will be able to use these variables:
* run_days_before: to identify runs older than x days
* git_branch_exist: to identify if the git branch used for this run still exists on the git repository
* run_status: to identidy run status
* and all variables defined in your workflow payload

For example, the rule defined above means:

Keep workflow run for 365 days, but if branch does not exist on repository, only keep the run for 2 days.


* The dry run button allows you to test your lua expression. The result is a table filled with all runs that would be kept
31 changes: 31 additions & 0 deletions engine/api/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import (
"github.com/gorilla/mux"

"github.com/ovh/cds/engine/api/database/gorpmapping"
"github.com/ovh/cds/engine/api/project"
"github.com/ovh/cds/engine/api/services"
"github.com/ovh/cds/engine/api/workflow"
"github.com/ovh/cds/engine/featureflipping"
"github.com/ovh/cds/engine/service"
"github.com/ovh/cds/sdk"
Expand Down Expand Up @@ -351,3 +353,32 @@ func (api *API) deleteAdminFeatureFlipping() service.Handler {
return nil
}
}

func (api *API) postWorkflowMaxRunHandler() service.Handler {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
vars := mux.Vars(r)
key := vars["key"]
name := vars["permWorkflowName"]

var request sdk.UpdateMaxRunRequest
if err := service.UnmarshalBody(r, &request); err != nil {
return err
}

proj, err := project.Load(ctx, api.mustDBWithCtx(ctx), key)
if err != nil {
return err
}

wf, err := workflow.Load(ctx, api.mustDBWithCtx(ctx), api.Cache, *proj, name, workflow.LoadOptions{})
if err != nil {
return err
}

if err := workflow.UpdateMaxRunsByID(api.mustDBWithCtx(ctx), wf.ID, request.MaxRuns); err != nil {
return err
}

return nil
}
}
38 changes: 38 additions & 0 deletions engine/api/admin_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package api

import (
"context"
"encoding/json"
"net/http/httptest"
"testing"
Expand All @@ -10,6 +11,7 @@ import (

"github.com/ovh/cds/engine/api/database/gorpmapping"
"github.com/ovh/cds/engine/api/test/assets"
"github.com/ovh/cds/engine/api/workflow"
"github.com/ovh/cds/engine/gorpmapper"
"github.com/ovh/cds/sdk"
)
Expand Down Expand Up @@ -189,3 +191,39 @@ func Test_postAdminDatabaseRollEncryptedEntityByPrimaryKey(t *testing.T) {
assert.Equal(t, 204, w.Code)
}
}

func Test_postWorkflowMaxRunHandler(t *testing.T) {
api, db, _ := newTestAPI(t)
api.Config.Workflow.MaxRuns = 10
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

go workflow.Initialize(ctx, api.DBConnectionFactory.GetDBMap(gorpmapping.Mapper), api.Cache, "", "", "", 15, api.Config.Workflow.MaxRuns)
_, jwt := assets.InsertAdminUser(t, db)

p := assets.InsertTestProject(t, db, api.Cache, sdk.RandomString(10), sdk.RandomString(10))
w := assets.InsertTestWorkflow(t, db, api.Cache, p, sdk.RandomString(10))

require.Equal(t, w.MaxRuns, api.Config.Workflow.MaxRuns)

uri := api.Router.GetRoute("POST", api.postWorkflowMaxRunHandler, map[string]string{"key": p.Key, "permWorkflowName": w.Name})
req := assets.NewJWTAuthentifiedRequest(t, jwt, "POST", uri, sdk.UpdateMaxRunRequest{MaxRuns: 5})

// Do the request
rec := httptest.NewRecorder()
api.Router.Mux.ServeHTTP(rec, req)
assert.Equal(t, 204, rec.Code)

wfDb, err := workflow.Load(context.TODO(), db, api.Cache, *p, w.Name, workflow.LoadOptions{})
require.NoError(t, err)
require.Equal(t, int64(5), wfDb.MaxRuns)

wfDb.MaxRuns = 20
require.NoError(t, workflow.Update(context.TODO(), db, api.Cache, *p, wfDb, workflow.UpdateOptions{}))

// Max runs must not be updated
wfDb2, err := workflow.Load(context.TODO(), db, api.Cache, *p, w.Name, workflow.LoadOptions{})
require.NoError(t, err)
require.Equal(t, int64(5), wfDb2.MaxRuns)

}
5 changes: 4 additions & 1 deletion engine/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ type Configuration struct {
Content string `toml:"content" comment:"Help Content. Warning: this message could be view by anonymous user. Markdown accepted." json:"content" default:""`
Error string `toml:"error" comment:"Help displayed to user on each error. Warning: this message could be view by anonymous user. Markdown accepted." json:"error" default:""`
} `toml:"help" comment:"######################\n 'Help' informations \n######################" json:"help"`
Workflow struct {
MaxRuns int64 `toml:"maxRuns" comment:"Maximum of runs by workflow" json:"maxRuns" default:"255"`
} `toml:"workflow" comment:"######################\n 'Workflow' global configuration \n######################" json:"workflow"`
}

// DefaultValues is the struc for API Default configuration default values
Expand Down Expand Up @@ -775,7 +778,7 @@ func (a *API) Serve(ctx context.Context) error {
}, a.PanicDump())
a.GoRoutines.Run(ctx, "workflow.Initialize",
func(ctx context.Context) {
workflow.Initialize(ctx, a.DBConnectionFactory.GetDBMap(gorpmapping.Mapper), a.Cache, a.Config.URL.UI, a.Config.DefaultOS, a.Config.DefaultArch, a.Config.Log.StepMaxSize)
workflow.Initialize(ctx, a.DBConnectionFactory.GetDBMap(gorpmapping.Mapper), a.Cache, a.Config.URL.UI, a.Config.DefaultOS, a.Config.DefaultArch, a.Config.Log.StepMaxSize, a.Config.Workflow.MaxRuns)
}, a.PanicDump())
a.GoRoutines.Run(ctx, "PushInElasticSearch",
func(ctx context.Context) {
Expand Down
3 changes: 3 additions & 0 deletions engine/api/api_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,9 @@ func (api *API) InitRouter() {
r.Handle("/project/{permProjectKey}/workflows", Scope(sdk.AuthConsumerScopeProject), r.POST(api.postWorkflowHandler), r.GET(api.getWorkflowsHandler))
r.Handle("/project/{permProjectKey}/workflows/runs/nodes/ids", Scope(sdk.AuthConsumerScopeProject), r.GET(api.getWorkflowsRunsAndNodesIDshandler))
r.Handle("/project/{key}/workflows/{permWorkflowName}", Scope(sdk.AuthConsumerScopeProject), r.GET(api.getWorkflowHandler), r.PUT(api.putWorkflowHandler), r.DELETE(api.deleteWorkflowHandler))
r.Handle("/project/{key}/workflows/{permWorkflowName}/retention/maxruns", Scope(sdk.AuthConsumerScopeProject), r.POST(api.postWorkflowMaxRunHandler, service.OverrideAuth(api.authAdminMiddleware)))
r.Handle("/project/{key}/workflows/{permWorkflowName}/retention/dryrun", Scope(sdk.AuthConsumerScopeProject), r.POST(api.postWorkflowRetentionPolicyDryRun))
r.Handle("/project/{key}/workflows/{permWorkflowName}/retention/suggest", Scope(sdk.AuthConsumerScopeProject), r.GET(api.getRetentionPolicySuggestionHandler))
r.Handle("/project/{key}/workflows/{permWorkflowName}/eventsintegration/{integrationID}", Scope(sdk.AuthConsumerScopeProject), r.DELETE(api.deleteWorkflowEventsIntegrationHandler))
r.Handle("/project/{key}/workflows/{permWorkflowName}/icon", Scope(sdk.AuthConsumerScopeProject), r.PUT(api.putWorkflowIconHandler), r.DELETE(api.deleteWorkflowIconHandler))
r.Handle("/project/{key}/workflows/{permWorkflowName}/ascode", Scope(sdk.AuthConsumerScopeProject), r.POST(api.postWorkflowAsCodeHandler))
Expand Down
63 changes: 56 additions & 7 deletions engine/api/purge/purge.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,32 @@ package purge
import (
"context"
"database/sql"
"fmt"
"net/http"
"time"

"github.com/go-gorp/gorp"
"github.com/sirupsen/logrus"
"go.opencensus.io/stats"

"github.com/ovh/cds/engine/api/database/gorpmapping"
"github.com/ovh/cds/engine/api/integration"
"github.com/ovh/cds/engine/api/objectstore"
"github.com/ovh/cds/engine/api/project"
"github.com/ovh/cds/engine/api/services"
"github.com/ovh/cds/engine/api/workflow"
"github.com/ovh/cds/engine/cache"
"github.com/ovh/cds/engine/featureflipping"
"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/log"
"github.com/ovh/cds/sdk/telemetry"
)

const (
FeaturePurgeName = "workflow-retention-policy"
FeatureMaxRuns = "workflow-retention-maxruns"
)

//Initialize starts goroutines for workflows
func Initialize(ctx context.Context, store cache.Store, DBFunc func() *gorp.DbMap, sharedStorage objectstore.Driver, workflowRunsMarkToDelete, workflowRunsDeleted *stats.Int64Measure) {
tickPurge := time.NewTicker(15 * time.Minute)
Expand All @@ -33,13 +42,18 @@ func Initialize(ctx context.Context, store cache.Store, DBFunc func() *gorp.DbMa
return
}
case <-tickPurge.C:
// Mark workflow run to delete
if err := markWorkflowRunsToDelete(ctx, store, DBFunc(), workflowRunsMarkToDelete); err != nil {
log.ErrorWithFields(ctx, logrus.Fields{"stack_trace": fmt.Sprintf("%+v", err)}, "%s", err)
}

// Check all workflows to mark runs that should be deleted
if err := workflow.PurgeWorkflowRuns(ctx, DBFunc(), workflowRunsMarkToDelete); err != nil {
if err := MarkWorkflowRuns(ctx, DBFunc(), workflowRunsMarkToDelete); err != nil {
log.Warning(ctx, "purge> Error: %v", err)
}

log.Debug("purge> Deleting all workflow run marked to delete...")
if err := deleteWorkflowRunsHistory(ctx, DBFunc(), store, sharedStorage, workflowRunsDeleted); err != nil {
if err := deleteWorkflowRunsHistory(ctx, DBFunc(), sharedStorage, workflowRunsDeleted); err != nil {
log.Warning(ctx, "purge> Error on deleteWorkflowRunsHistory : %v", err)
}

Expand All @@ -51,13 +65,48 @@ func Initialize(ctx context.Context, store cache.Store, DBFunc func() *gorp.DbMa
}
}

// Deprecated: old method to mark runs to delete
func MarkWorkflowRuns(ctx context.Context, db *gorp.DbMap, workflowRunsMarkToDelete *stats.Int64Measure) error {
dao := new(workflow.WorkflowDAO)
dao.Filters.DisableFilterDeletedWorkflow = false
wfs, err := dao.LoadAll(ctx, db)
if err != nil {
return err
}
for _, wf := range wfs {
enabled := featureflipping.IsEnabled(ctx, gorpmapping.Mapper, db, FeaturePurgeName, map[string]string{"project_key": wf.ProjectKey})
if enabled {
continue
}
tx, err := db.Begin()
if err != nil {
log.Error(ctx, "workflow.PurgeWorkflowRuns> error %v", err)
tx.Rollback() // nolint
continue
}
if err := workflow.PurgeWorkflowRun(ctx, tx, wf); err != nil {
log.Error(ctx, "workflow.PurgeWorkflowRuns> error %v", err)
tx.Rollback() // nolint
continue
}
if err := tx.Commit(); err != nil {
log.Error(ctx, "workflow.PurgeWorkflowRuns> unable to commit transaction: %v", err)
_ = tx.Rollback()
continue
}
}

workflow.CountWorkflowRunsMarkToDelete(ctx, db, workflowRunsMarkToDelete)
return nil
}

// workflows purges all marked workflows
func workflows(ctx context.Context, db *gorp.DbMap, store cache.Store, workflowRunsMarkToDelete *stats.Int64Measure) error {
query := "SELECT id, project_id FROM workflow WHERE to_delete = true ORDER BY id ASC"
res := []struct {
var res []struct {
ID int64 `db:"id"`
ProjectID int64 `db:"project_id"`
}{}
}

if _, err := db.Select(&res, query); err != nil {
if err == sql.ErrNoRows {
Expand Down Expand Up @@ -147,7 +196,7 @@ func workflows(ctx context.Context, db *gorp.DbMap, store cache.Store, workflowR
}

// deleteWorkflowRunsHistory is useful to delete all the workflow run marked with to delete flag in db
func deleteWorkflowRunsHistory(ctx context.Context, db *gorp.DbMap, store cache.Store, sharedStorage objectstore.Driver, workflowRunsDeleted *stats.Int64Measure) error {
func deleteWorkflowRunsHistory(ctx context.Context, db *gorp.DbMap, sharedStorage objectstore.Driver, workflowRunsDeleted *stats.Int64Measure) error {
var workflowRunIDs []int64
if _, err := db.Select(&workflowRunIDs, "SELECT id FROM workflow_run WHERE to_delete = true ORDER BY id ASC LIMIT 2000"); err != nil {
return err
Expand All @@ -166,7 +215,7 @@ func deleteWorkflowRunsHistory(ctx context.Context, db *gorp.DbMap, store cache.
log.Error(ctx, "deleteWorkflowRunsHistory> error while opening transaction : %v", err)
continue
}
if err := DeleteArtifacts(ctx, tx, store, sharedStorage, workflowRunID); err != nil {
if err := DeleteArtifacts(ctx, tx, sharedStorage, workflowRunID); err != nil {
log.Error(ctx, "deleteWorkflowRunsHistory> error while deleting artifacts: %v", err)
_ = tx.Rollback()
continue
Expand Down Expand Up @@ -202,7 +251,7 @@ func deleteWorkflowRunsHistory(ctx context.Context, db *gorp.DbMap, store cache.
}

// DeleteArtifacts removes artifacts from storage
func DeleteArtifacts(ctx context.Context, db gorp.SqlExecutor, store cache.Store, sharedStorage objectstore.Driver, workflowRunID int64) error {
func DeleteArtifacts(ctx context.Context, db gorp.SqlExecutor, sharedStorage objectstore.Driver, workflowRunID int64) error {
wr, err := workflow.LoadRunByID(db, workflowRunID, workflow.LoadRunOptions{WithArtifacts: true, DisableDetailledNodeRun: false, WithDeleted: true})
if err != nil {
return sdk.WrapError(err, "error on load LoadRunByID:%d", workflowRunID)
Expand Down
Loading

0 comments on commit 1830951

Please sign in to comment.