From 444e2b4012fdd841a6026db7eb2bc3df579dcd4f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Samin?= Date: Fri, 22 Feb 2019 12:11:09 +0100 Subject: [PATCH] fix(api): purge workflow audits --- engine/api/workflow/audit.go | 42 +++++++++++++++++++++++++++++++ engine/api/workflow/audit_test.go | 16 ++++++++++++ 2 files changed, 58 insertions(+) create mode 100644 engine/api/workflow/audit_test.go diff --git a/engine/api/workflow/audit.go b/engine/api/workflow/audit.go index 021ed9842f..02fda5bb3f 100644 --- a/engine/api/workflow/audit.go +++ b/engine/api/workflow/audit.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" "strings" + "time" "github.com/go-gorp/gorp" "github.com/mitchellh/mapstructure" @@ -31,6 +32,8 @@ var ( func ComputeAudit(c context.Context, DBFunc func() *gorp.DbMap) { chanEvent := make(chan sdk.Event) event.Subscribe(chanEvent) + deleteTicker := time.NewTicker(15 * time.Minute) + defer deleteTicker.Stop() db := DBFunc() for { @@ -40,6 +43,10 @@ func ComputeAudit(c context.Context, DBFunc func() *gorp.DbMap) { log.Error("ComputeWorkflowAudit> Exiting: %v", c.Err()) return } + case <-deleteTicker.C: + if err := purgeAudits(DBFunc()); err != nil { + log.Error("ComputeWorkflowAudit> Purge error: %v", c) + } case e := <-chanEvent: if !strings.HasPrefix(e.EventType, "sdk.EventWorkflow") { continue @@ -221,3 +228,38 @@ func (a deleteWorkflowPermissionAudit) Compute(db gorp.SqlExecutor, e sdk.Event) WorkflowID: wEvent.WorkflowID, }) } + +const keepAudits = 50 + +func purgeAudits(db gorp.SqlExecutor) error { + var nbAuditsPerWorkflowID = []struct { + WorkflowID int64 `db:"workflow_id"` + NbAudits int64 `db:"nb_audits"` + }{} + + query := `select workflow_id, count(id) "nb_audits" from workflow_audit group by workflow_id having count(id) > $1` + if _, err := db.Select(&nbAuditsPerWorkflowID, query, keepAudits); err != nil { + return sdk.WithStack(err) + } + + for _, r := range nbAuditsPerWorkflowID { + log.Debug("purgeAudits> deleting audits for workflow %d (%d audits)", r.WorkflowID, r.NbAudits) + var ids []int64 + query = `select id from workflow_audit where workflow_id = $1 order by created asc offset $2` + if _, err := db.Select(&ids, query, r.WorkflowID, keepAudits); err != nil { + return sdk.WithStack(err) + } + for _, id := range ids { + if err := deleteAudit(db, id); err != nil { + log.Error("purgeAudits> unable to delete audit %d: %v", id, err) + } + } + } + + return nil +} + +func deleteAudit(db gorp.SqlExecutor, id int64) error { + _, err := db.Exec(`delete from workflow_audit where id = $1`, id) + return sdk.WithStack(err) +} diff --git a/engine/api/workflow/audit_test.go b/engine/api/workflow/audit_test.go new file mode 100644 index 0000000000..e5f577c8ae --- /dev/null +++ b/engine/api/workflow/audit_test.go @@ -0,0 +1,16 @@ +package workflow + +import ( + "testing" + + "github.com/ovh/cds/engine/api/bootstrap" + "github.com/ovh/cds/engine/api/test" +) + +func Test_purgeAudits(t *testing.T) { + db, _, end := test.SetupPG(t, bootstrap.InitiliazeDB) + defer end() + + err := purgeAudits(db) + test.NoError(t, err) +}