Skip to content

Commit

Permalink
fix(api): fix stop workflow run for purge (#3149)
Browse files Browse the repository at this point in the history
  • Loading branch information
bnjjj authored and fsamin committed Aug 7, 2018
1 parent 7ddcd90 commit 7aa9b83
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 75 deletions.
4 changes: 3 additions & 1 deletion engine/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,9 @@ func (a *API) Serve(ctx context.Context) error {
log.Warning("⚠ Cron Scheduler is disabled")
}

workflow.Initialize(a.Config.URL.UI, a.Config.DefaultOS, a.Config.DefaultArch)
sdk.GoRoutine("workflow.Initialize", func() {
workflow.Initialize(ctx, a.DBConnectionFactory.GetDBMap, a.Config.URL.UI, a.Config.DefaultOS, a.Config.DefaultArch)
})
sdk.GoRoutine("PushInElasticSearch", func() { event.PushInElasticSearch(ctx, a.mustDB(), a.Cache) })
sdk.GoRoutine("Purge", func() { purge.Initialize(ctx, a.Cache, a.DBConnectionFactory.GetDBMap) })

Expand Down
63 changes: 0 additions & 63 deletions engine/api/purge/purge.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package purge
import (
"context"
"database/sql"
"fmt"
"strings"
"time"

"github.com/go-gorp/gorp"
Expand Down Expand Up @@ -38,10 +36,6 @@ func Initialize(c context.Context, store cache.Store, DBFunc func() *gorp.DbMap)
if err := workflows(DBFunc(), store); err != nil {
log.Warning("purge> Error on workflows : %v", err)
}

if err := stopRunsBlocked(DBFunc()); err != nil {
log.Warning("purge> Error on stopRunsBlocked : %v", err)
}
}
}
}
Expand Down Expand Up @@ -120,60 +114,3 @@ func deleteWorkflowRunsHistory(db gorp.SqlExecutor) error {
}
return nil
}

// stopRunsBlocked is useful to force stop all workflow that is running more than 24hrs
func stopRunsBlocked(db *gorp.DbMap) error {
query := `SELECT workflow_run.id
FROM workflow_run
WHERE (workflow_run.status = $1 or workflow_run.status = $2 or workflow_run.status = $3)
AND now() - workflow_run.last_execution > interval '1 day'
LIMIT 30`
ids := []struct {
ID int64 `db:"id"`
}{}

if _, err := db.Select(&ids, query, sdk.StatusWaiting.String(), sdk.StatusChecking.String(), sdk.StatusBuilding.String()); err != nil {
if err == sql.ErrNoRows {
return nil
}
return sdk.WrapError(err, "stopRunsBlocked>")
}

tx, errTx := db.Begin()
if errTx != nil {
return sdk.WrapError(errTx, "stopRunsBlocked>")
}
defer tx.Rollback() // nolint

wfIds := make([]string, len(ids))
for i := range wfIds {
wfIds[i] = fmt.Sprintf("%d", ids[i].ID)
}
wfIdsJoined := strings.Join(wfIds, ",")
queryUpdateWf := `UPDATE workflow_run SET status = $1 WHERE id = ANY(string_to_array($2, ',')::bigint[])`
if _, err := tx.Exec(queryUpdateWf, sdk.StatusStopped.String(), wfIdsJoined); err != nil {
return sdk.WrapError(err, "stopRunsBlocked> Unable to stop workflow run history")
}
args := []interface{}{sdk.StatusStopped.String(), wfIdsJoined, sdk.StatusBuilding.String(), sdk.StatusChecking.String(), sdk.StatusWaiting.String()}
queryUpdateNodeRun := `UPDATE workflow_node_run SET status = $1, done = now()
WHERE workflow_run_id = ANY(string_to_array($2, ',')::bigint[])
AND (status = $3 OR status = $4 OR status = $5)`
if _, err := tx.Exec(queryUpdateNodeRun, args...); err != nil {
return sdk.WrapError(err, "stopRunsBlocked> Unable to stop workflow node run history")
}
queryUpdateNodeJobRun := `UPDATE workflow_node_run_job SET status = $1, done = now()
WHERE workflow_node_run_job.workflow_node_run_id IN (
SELECT workflow_node_run.id
FROM workflow_node_run
WHERE workflow_node_run.workflow_run_id = ANY(string_to_array($2, ',')::bigint[])
AND (status = $3 OR status = $4 OR status = $5)
)`
if _, err := tx.Exec(queryUpdateNodeJobRun, args...); err != nil {
return sdk.WrapError(err, "stopRunsBlocked> Unable to stop workflow node job run history")
}

if err := tx.Commit(); err != nil {
return sdk.WrapError(err, "stopRunsBlocked> Unable to commit transaction")
}
return nil
}
1 change: 1 addition & 0 deletions engine/api/workflow/dao_node_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,7 @@ func updateNodeRunCommits(db gorp.SqlExecutor, id int64, commits []sdk.VCSCommit
return nil
}

// updateNodeRunStatusAndStage update just noderun status and stage
func updateNodeRunStatusAndStage(db gorp.SqlExecutor, nodeRun *sdk.WorkflowNodeRun) error {
stagesBts, errMarshal := json.Marshal(nodeRun.Stages)
if errMarshal != nil {
Expand Down
91 changes: 91 additions & 0 deletions engine/api/workflow/dao_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -738,3 +738,94 @@ func syncNodeRuns(db gorp.SqlExecutor, wr *sdk.WorkflowRun, loadOpts LoadRunOpti

return nil
}

// stopRunsBlocked is useful to force stop all workflow that is running more than 24hrs
func stopRunsBlocked(db *gorp.DbMap) error {
query := `SELECT workflow_run.id
FROM workflow_run
WHERE (workflow_run.status = $1 or workflow_run.status = $2 or workflow_run.status = $3)
AND now() - workflow_run.last_execution > interval '1 day'
LIMIT 30`
ids := []struct {
ID int64 `db:"id"`
}{}

if _, err := db.Select(&ids, query, sdk.StatusWaiting.String(), sdk.StatusChecking.String(), sdk.StatusBuilding.String()); err != nil {
if err == sql.ErrNoRows {
return nil
}
return sdk.WrapError(err, "stopRunsBlocked>")
}

tx, errTx := db.Begin()
if errTx != nil {
return sdk.WrapError(errTx, "stopRunsBlocked>")
}
defer tx.Rollback() // nolint

wfIds := make([]string, len(ids))
for i := range wfIds {
wfIds[i] = fmt.Sprintf("%d", ids[i].ID)
}
wfIdsJoined := strings.Join(wfIds, ",")
queryUpdateWf := `UPDATE workflow_run SET status = $1 WHERE id = ANY(string_to_array($2, ',')::bigint[])`
if _, err := tx.Exec(queryUpdateWf, sdk.StatusStopped.String(), wfIdsJoined); err != nil {
return sdk.WrapError(err, "stopRunsBlocked> Unable to stop workflow run history")
}
args := []interface{}{sdk.StatusStopped.String(), wfIdsJoined, sdk.StatusBuilding.String(), sdk.StatusChecking.String(), sdk.StatusWaiting.String()}
queryUpdateNodeRun := `UPDATE workflow_node_run SET status = $1, done = now()
WHERE workflow_run_id = ANY(string_to_array($2, ',')::bigint[])
AND (status = $3 OR status = $4 OR status = $5)`
if _, err := tx.Exec(queryUpdateNodeRun, args...); err != nil {
return sdk.WrapError(err, "stopRunsBlocked> Unable to stop workflow node run history")
}
queryUpdateNodeJobRun := `UPDATE workflow_node_run_job SET status = $1, done = now()
WHERE workflow_node_run_job.workflow_node_run_id IN (
SELECT workflow_node_run.id
FROM workflow_node_run
WHERE workflow_node_run.workflow_run_id = ANY(string_to_array($2, ',')::bigint[])
AND (status = $3 OR status = $4 OR status = $5)
)`
if _, err := tx.Exec(queryUpdateNodeJobRun, args...); err != nil {
return sdk.WrapError(err, "stopRunsBlocked> Unable to stop workflow node job run history")
}

resp := []struct {
ID int64 `db:"id"`
Status string `db:"status"`
Stages string `db:"stages"`
}{}

querySelectNodeRuns := `
SELECT workflow_node_run.id, workflow_node_run.status, workflow_node_run.stages
FROM workflow_node_run
WHERE workflow_node_run.workflow_run_id = ANY(string_to_array($1, ',')::bigint[])
`
if _, err := tx.Select(&resp, querySelectNodeRuns, wfIdsJoined); err != nil {
return sdk.WrapError(err, "stopRunsBlocked> cannot get workflow node run infos")
}

now := time.Now()
for i := range resp {
nr := sdk.WorkflowNodeRun{
ID: resp[i].ID,
Status: resp[i].Status,
}
if err := json.Unmarshal([]byte(resp[i].Stages), &nr.Stages); err != nil {
return sdk.WrapError(err, "stopRunsBlocked> cannot unmarshal stages")
}

stopWorkflowNodeRunStages(&nr)
nr.Status = sdk.StatusStopped.String()
nr.Done = now

if err := updateNodeRunStatusAndStage(tx, &nr); err != nil {
return sdk.WrapError(err, "stopRunsBlocked> cannot update node runs stages")
}
}

if err := tx.Commit(); err != nil {
return sdk.WrapError(err, "stopRunsBlocked> Unable to commit transaction")
}
return nil
}
24 changes: 15 additions & 9 deletions engine/api/workflow/execute_node_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,21 @@ func StopWorkflowNodeRun(ctx context.Context, dbFunc func() *gorp.DbMap, store c
}
wg.Wait()

// Update stages from node run
stopWorkflowNodeRunStages(&nodeRun)

nodeRun.Status = sdk.StatusStopped.String()
nodeRun.Done = time.Now()
if errU := UpdateNodeRun(dbFunc(), &nodeRun); errU != nil {
return report, sdk.WrapError(errU, "StopWorkflowNodeRun> Cannot update node run")
}
report.Add(nodeRun)

return report, nil
}

// stopWorkflowNodeRunStages mark to stop all stages and step status in struct
func stopWorkflowNodeRunStages(nodeRun *sdk.WorkflowNodeRun) {
// Update stages from node run
for iS := range nodeRun.Stages {
stag := &nodeRun.Stages[iS]
Expand All @@ -668,15 +683,6 @@ func StopWorkflowNodeRun(ctx context.Context, dbFunc func() *gorp.DbMap, store c
stag.Status = sdk.StatusStopped
}
}

nodeRun.Status = sdk.StatusStopped.String()
nodeRun.Done = time.Now()
if errU := UpdateNodeRun(dbFunc(), &nodeRun); errU != nil {
return report, sdk.WrapError(errU, "StopWorkflowNodeRun> Cannot update node run")
}
report.Add(nodeRun)

return report, nil
}

func stopWorkflowNodeJobRun(ctx context.Context, dbFunc func() *gorp.DbMap, store cache.Store, proj *sdk.Project, nodeRun *sdk.WorkflowNodeRun, stopInfos sdk.SpawnInfo, chanNjrID <-chan int64, chanErr chan<- error, chanDone chan<- bool, wg *sync.WaitGroup) *ProcessorReport {
Expand Down
27 changes: 26 additions & 1 deletion engine/api/workflow/init.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,35 @@
package workflow

import (
"context"
"time"

"github.com/go-gorp/gorp"

"github.com/ovh/cds/sdk/log"
)

var baseUIURL, defaultOS, defaultArch string

//Initialize starts goroutines for workflows
func Initialize(uiURL, confDefaultOS, confDefaultArch string) {
func Initialize(c context.Context, DBFunc func() *gorp.DbMap, uiURL, confDefaultOS, confDefaultArch string) {
baseUIURL = uiURL
defaultOS = confDefaultOS
defaultArch = confDefaultArch
tickStop := time.NewTicker(30 * time.Minute)
defer tickStop.Stop()

for {
select {
case <-c.Done():
if c.Err() != nil {
log.Error("Exiting workflow ticker: %v", c.Err())
return
}
case <-tickStop.C:
if err := stopRunsBlocked(DBFunc()); err != nil {
log.Warning("workflow.stopRunsBlocked> Error on stopRunsBlocked : %v", err)
}
}
}
}
2 changes: 1 addition & 1 deletion engine/api/workflow_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -1030,7 +1030,7 @@ func (api *API) getWorkflowNodeRunJobStepHandler() Handler {
}

// Check nodeRunID is link to workflow
nodeRun, errNR := workflow.LoadNodeRun(api.mustDB(), projectKey, workflowName, number, nodeRunID, workflow.LoadRunOptions{})
nodeRun, errNR := workflow.LoadNodeRun(api.mustDB(), projectKey, workflowName, number, nodeRunID, workflow.LoadRunOptions{DisableDetailledNodeRun: true})
if errNR != nil {
return sdk.WrapError(errNR, "getWorkflowNodeRunJobBuildLogsHandler> Cannot find nodeRun %d/%d for workflow %s in project %s", nodeRunID, number, workflowName, projectKey)
}
Expand Down

0 comments on commit 7aa9b83

Please sign in to comment.