Skip to content

Commit

Permalink
feat(api): stop all workflows which are running more than 24hours (#3115
Browse files Browse the repository at this point in the history
)

Signed-off-by: Benjamin Coenen <[email protected]>
  • Loading branch information
bnjjj authored Aug 1, 2018
1 parent d74c73c commit 904ef0a
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 9 deletions.
79 changes: 71 additions & 8 deletions engine/api/purge/purge.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package purge
import (
"context"
"database/sql"
"fmt"
"strings"
"time"

"github.com/go-gorp/gorp"
Expand All @@ -28,20 +30,24 @@ func Initialize(c context.Context, store cache.Store, DBFunc func() *gorp.DbMap)
}
case <-tickPurge.C:
log.Debug("purge> Deleting all workflow run marked to delete...")
if err := RunsHistory(DBFunc()); err != nil {
log.Warning("purge> Error : %v", err)
if err := deleteWorkflowRunsHistory(DBFunc()); err != nil {
log.Warning("purge> Error on deleteWorkflowRunsHistory : %v", err)
}

log.Debug("purge> Deleting all workflow marked to delete....")
if err := Workflows(DBFunc(), store); err != nil {
log.Warning("purge> Error : %v", err)
if err := workflows(DBFunc(), store); err != nil {
log.Warning("purge> Error on workflows : %v", err)
}

if err := stopRunsBlocked(DBFunc()); err != nil {
log.Warning("purge> Error on stopRunsBlocked : %v", err)
}
}
}
}

// Workflows purges all marked workflows
func Workflows(db *gorp.DbMap, store cache.Store) error {
// workflows purges all marked workflows
func workflows(db *gorp.DbMap, store cache.Store) error {
query := "SELECT id, project_id FROM workflow WHERE to_delete = true ORDER BY id ASC"
res := []struct {
ID int64 `db:"id"`
Expand Down Expand Up @@ -104,8 +110,8 @@ func Workflows(db *gorp.DbMap, store cache.Store) error {
return nil
}

// RunsHistory is useful to delete all the workflow run marked with to delete flag in db
func RunsHistory(db gorp.SqlExecutor) error {
// deleteWorkflowRunsHistory is useful to delete all the workflow run marked with to delete flag in db
func deleteWorkflowRunsHistory(db gorp.SqlExecutor) error {
query := `DELETE FROM workflow_run WHERE workflow_run.id IN (SELECT id FROM workflow_run WHERE to_delete = true LIMIT 30)`

if _, err := db.Exec(query); err != nil {
Expand All @@ -114,3 +120,60 @@ func RunsHistory(db gorp.SqlExecutor) error {
}
return nil
}

// stopRunsBlocked is useful to force stop all workflow that is running more than 24hrs
func stopRunsBlocked(db *gorp.DbMap) error {
query := `SELECT workflow_run.id
FROM workflow_run
WHERE (workflow_run.status = $1 or workflow_run.status = $2 or workflow_run.status = $3)
AND now() - workflow_run.start > interval '1 day'
LIMIT 30`
ids := []struct {
ID int64 `db:"id"`
}{}

if _, err := db.Select(&ids, query, sdk.StatusWaiting.String(), sdk.StatusChecking.String(), sdk.StatusBuilding.String()); err != nil {
if err == sql.ErrNoRows {
return nil
}
return sdk.WrapError(err, "stopRunsBlocked>")
}

tx, errTx := db.Begin()
if errTx != nil {
return sdk.WrapError(errTx, "stopRunsBlocked>")
}
defer tx.Rollback() // nolint

wfIds := make([]string, len(ids))
for i := range wfIds {
wfIds[i] = fmt.Sprintf("%d", ids[i].ID)
}
wfIdsJoined := strings.Join(wfIds, ",")
queryUpdateWf := `UPDATE workflow_run SET status = $1 WHERE id = ANY(string_to_array($2, ',')::bigint[])`
if _, err := tx.Exec(queryUpdateWf, sdk.StatusStopped.String(), wfIdsJoined); err != nil {
return sdk.WrapError(err, "stopRunsBlocked> Unable to stop workflow run history")
}
args := []interface{}{sdk.StatusStopped.String(), wfIdsJoined, sdk.StatusBuilding.String(), sdk.StatusChecking.String(), sdk.StatusWaiting.String()}
queryUpdateNodeRun := `UPDATE workflow_node_run SET status = $1, done = now()
WHERE workflow_run_id = ANY(string_to_array($2, ',')::bigint[])
AND (status = $3 OR status = $4 OR status = $5)`
if _, err := tx.Exec(queryUpdateNodeRun, args...); err != nil {
return sdk.WrapError(err, "stopRunsBlocked> Unable to stop workflow node run history")
}
queryUpdateNodeJobRun := `UPDATE workflow_node_run_job SET status = $1, done = now()
WHERE workflow_node_run_job.workflow_node_run_id IN (
SELECT workflow_node_run.id
FROM workflow_node_run
WHERE workflow_node_run.workflow_run_id = ANY(string_to_array($2, ',')::bigint[])
AND (status = $3 OR status = $4 OR status = $5)
)`
if _, err := tx.Exec(queryUpdateNodeJobRun, args...); err != nil {
return sdk.WrapError(err, "stopRunsBlocked> Unable to stop workflow node job run history")
}

if err := tx.Commit(); err != nil {
return sdk.WrapError(err, "stopRunsBlocked> Unable to commit transaction")
}
return nil
}
2 changes: 1 addition & 1 deletion ui/src/app/shared/workflow/node/workflow.node.component.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ export class WorkflowNodeComponent implements OnInit {
this.workflowRun = wr;
if (wr.nodes && wr.nodes[this.node.id] && wr.nodes[this.node.id].length > 0) {
if (!this.currentNodeRun ||
( (new Date(wr.nodes[this.node.id][0].last_modified)) > (new Date(this.currentNodeRun.last_modified)) )) {
((new Date(wr.nodes[this.node.id][0].last_modified)) > (new Date(this.currentNodeRun.last_modified)))) {
this.currentNodeRun = wr.nodes[this.node.id][0];
}
}
Expand Down

0 comments on commit 904ef0a

Please sign in to comment.