Skip to content

Commit

Permalink
feat(api,ui): run workflow become async (#3999)
Browse files Browse the repository at this point in the history
  • Loading branch information
sguiheux authored and yesnault committed Mar 7, 2019
1 parent 059b369 commit acd7a91
Show file tree
Hide file tree
Showing 25 changed files with 836 additions and 398 deletions.
4 changes: 1 addition & 3 deletions cli/cdsctl/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,12 +284,10 @@ const (
)

func (ui *Termui) staticRender() {
checking, checkingColor := statusShort(sdk.StatusChecking.String())
waiting, waitingColor := statusShort(sdk.StatusWaiting.String())
building, buildingColor := statusShort(sdk.StatusBuilding.String())
disabled, disabledColor := statusShort(sdk.StatusDisabled.String())
ui.header.Text = fmt.Sprintf("[CDS | (h)elp | (q)uit | Legend: ](fg-cyan)[Checking:%s](%s) [Waiting:%s](%s) [Building:%s](%s) [Disabled:%s](%s)",
checking, checkingColor,
ui.header.Text = fmt.Sprintf("[CDS | (h)elp | (q)uit | Legend: ](fg-cyan) [Waiting:%s](%s) [Building:%s](%s) [Disabled:%s](%s)",
waiting, waitingColor,
building, buildingColor,
disabled, disabledColor)
Expand Down
82 changes: 68 additions & 14 deletions engine/api/workflow/dao_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"
"time"

"github.com/fsamin/go-dump"
"github.com/go-gorp/gorp"
"go.opencensus.io/stats"

Expand Down Expand Up @@ -58,7 +59,6 @@ func UpdateWorkflowRun(ctx context.Context, db gorp.SqlExecutor, wr *sdk.Workflo
defer end()

wr.LastModified = time.Now()

for _, info := range wr.Infos {
if info.IsError && info.SubNumber == wr.LastSubNumber {
wr.Status = string(sdk.StatusFail)
Expand Down Expand Up @@ -165,15 +165,6 @@ func (r *Run) PostGet(db gorp.SqlExecutor) error {
for i := range w.Joins {
w.Joins[i].Ref = fmt.Sprintf("%d", w.Joins[i].ID)
}
// This is usefull for oldserialized workflows...
//TODO: delete this after a while
if len(w.Pipelines) == 0 {
w.Pipelines = map[int64]sdk.Pipeline{}
w.Visit(func(n *sdk.WorkflowNode) {
w.Pipelines[n.PipelineID] = n.DeprecatedPipeline
n.PipelineName = n.DeprecatedPipeline.Name
})
}
r.Workflow = w

i := []sdk.WorkflowRunInfo{}
Expand Down Expand Up @@ -233,7 +224,6 @@ func updateTags(db gorp.SqlExecutor, r *Run) error {
if _, err := db.Exec("delete from workflow_run_tag where workflow_run_id = $1", r.ID); err != nil {
return sdk.WrapError(err, "Unable to store tags")
}

return InsertWorkflowRunTags(db, r.ID, r.Tags)
}

Expand Down Expand Up @@ -579,10 +569,74 @@ func InsertRunNum(db gorp.SqlExecutor, w *sdk.Workflow, num int64) error {
return nil
}

// CreateRun creates a new workflow run and insert it
func CreateRun(db *gorp.DbMap, wf *sdk.Workflow, opts *sdk.WorkflowRunPostHandlerOption, u *sdk.User) (*sdk.WorkflowRun, error) {
number, err := NextRunNumber(db, wf.ID)
if err != nil {
return nil, sdk.WrapError(err, "unable to get next run number")
}

wr := &sdk.WorkflowRun{
Number: number,
WorkflowID: wf.ID,
Start: time.Now(),
LastModified: time.Now(),
ProjectID: wf.ProjectID,
Status: sdk.StatusPending.String(),
LastExecution: time.Now(),
Tags: make([]sdk.WorkflowRunTag, 0),
}

if opts != nil && opts.Hook != nil {
if trigg, ok := opts.Hook.Payload["cds.triggered_by.username"]; ok {
wr.Tag(tagTriggeredBy, trigg)
} else {
wr.Tag(tagTriggeredBy, "cds.hook")
}
} else {
wr.Tag(tagTriggeredBy, u.Username)
}

tags := wf.Metadata["default_tags"]
var payload map[string]string
if opts != nil && opts.Hook != nil {
payload = opts.Hook.Payload
}
if opts != nil && opts.Manual != nil {
e := dump.NewDefaultEncoder()
e.Formatters = []dump.KeyFormatterFunc{dump.WithDefaultLowerCaseFormatter()}
e.ExtraFields.DetailedMap = false
e.ExtraFields.DetailedStruct = false
e.ExtraFields.Len = false
e.ExtraFields.Type = false
m1, errm1 := e.ToStringMap(opts.Manual)
if errm1 != nil {
return nil, sdk.WrapError(errm1, "unable to compute manual payload")
}
payload = m1
}
if tags != "" {
tagsSplited := strings.Split(tags, ",")
for _, t := range tagsSplited {
if pTag, hash := payload[t]; hash {
wr.Tags = append(wr.Tags, sdk.WorkflowRunTag{
Tag: t,
Value: pTag,
})
}
}
}

if err := insertWorkflowRun(db, wr); err != nil {
return nil, sdk.WrapError(err, "unable to create workflow run")
}
return wr, nil
}

// UpdateRunNum Update run number for the given workflow
func UpdateRunNum(db gorp.SqlExecutor, w *sdk.Workflow, num int64) error {
if num == 1 {
if _, err := nextRunNumber(db, w); err != nil {
if _, err := NextRunNumber(db, w.ID); err != nil {
return sdk.WrapError(err, "Cannot create run number")
}
return nil
Expand All @@ -597,8 +651,8 @@ func UpdateRunNum(db gorp.SqlExecutor, w *sdk.Workflow, num int64) error {
return nil
}

func nextRunNumber(db gorp.SqlExecutor, w *sdk.Workflow) (int64, error) {
i, err := db.SelectInt("select workflow_sequences_nextval($1)", w.ID)
func NextRunNumber(db gorp.SqlExecutor, workflowID int64) (int64, error) {
i, err := db.SelectInt("select workflow_sequences_nextval($1)", workflowID)
if err != nil {
return 0, sdk.WrapError(err, "nextRunNumber")
}
Expand Down
121 changes: 78 additions & 43 deletions engine/api/workflow/dao_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package workflow_test
import (
"context"
"fmt"
"github.com/stretchr/testify/assert"
"testing"

"github.com/ovh/cds/engine/api/bootstrap"
Expand Down Expand Up @@ -131,13 +132,18 @@ func TestPurgeWorkflowRun(t *testing.T) {
test.NoError(t, err)

for i := 0; i < 5; i++ {
_, _, errWr := workflow.ManualRun(context.TODO(), db, cache, proj, w1, &sdk.WorkflowNodeRunManual{
User: *u,
Payload: map[string]string{
"git.branch": "master",
"git.author": "test",
wr, errWR := workflow.CreateRun(db, w1, nil, u)
assert.NoError(t, errWR)
wr.Workflow = *w1
_, errWr := workflow.StartWorkflowRun(context.TODO(), db, cache, proj, wr, &sdk.WorkflowRunPostHandlerOption{
Manual: &sdk.WorkflowNodeRunManual{
User: *u,
Payload: map[string]string{
"git.branch": "master",
"git.author": "test",
},
},
}, nil)
}, u, nil)
test.NoError(t, errWr)
}

Expand Down Expand Up @@ -222,15 +228,19 @@ func TestPurgeWorkflowRunWithRunningStatus(t *testing.T) {
test.NoError(t, err)

for i := 0; i < 5; i++ {
wfr, _, errWr := workflow.ManualRun(context.TODO(), db, cache, proj, w1, &sdk.WorkflowNodeRunManual{
User: *u,
Payload: map[string]string{
"git.branch": "master",
"git.author": "test",
wfr, errWR := workflow.CreateRun(db, w1, nil, u)
assert.NoError(t, errWR)
wfr.Workflow = *w1
_, errWr := workflow.StartWorkflowRun(context.TODO(), db, cache, proj, wfr, &sdk.WorkflowRunPostHandlerOption{
Manual: &sdk.WorkflowNodeRunManual{
User: *u,
Payload: map[string]string{
"git.branch": "master",
"git.author": "test",
},
},
}, nil)
}, u, nil)
test.NoError(t, errWr)

wfr.Status = sdk.StatusBuilding.String()
test.NoError(t, workflow.UpdateWorkflowRunStatus(db, wfr))
}
Expand Down Expand Up @@ -315,23 +325,33 @@ func TestPurgeWorkflowRunWithOneSuccessWorkflowRun(t *testing.T) {
})
test.NoError(t, err)

_, _, errWr := workflow.ManualRun(context.TODO(), db, cache, proj, w1, &sdk.WorkflowNodeRunManual{
User: *u,
Payload: map[string]string{
"git.branch": "master",
"git.author": "test",
},
}, nil)
test.NoError(t, errWr)

for i := 0; i < 5; i++ {
wfr, _, errWr := workflow.ManualRun(context.TODO(), db, cache, proj, w1, &sdk.WorkflowNodeRunManual{
wr, errWR := workflow.CreateRun(db, w1, nil, u)
assert.NoError(t, errWR)
wr.Workflow = *w1
_, errWr := workflow.StartWorkflowRun(context.TODO(), db, cache, proj, wr, &sdk.WorkflowRunPostHandlerOption{
Manual: &sdk.WorkflowNodeRunManual{
User: *u,
Payload: map[string]string{
"git.branch": "master",
"git.author": "test",
},
}, nil)
},
}, u, nil)
test.NoError(t, errWr)

for i := 0; i < 5; i++ {
wfr, errWR := workflow.CreateRun(db, w1, nil, u)
assert.NoError(t, errWR)
wfr.Workflow = *w1
_, errWr := workflow.StartWorkflowRun(context.TODO(), db, cache, proj, wfr, &sdk.WorkflowRunPostHandlerOption{
Manual: &sdk.WorkflowNodeRunManual{
User: *u,
Payload: map[string]string{
"git.branch": "master",
"git.author": "test",
},
},
}, u, nil)
test.NoError(t, errWr)

wfr.Status = sdk.StatusFail.String()
Expand Down Expand Up @@ -423,13 +443,18 @@ func TestPurgeWorkflowRunWithNoSuccessWorkflowRun(t *testing.T) {
test.NoError(t, err)

for i := 0; i < 5; i++ {
wfr, _, errWr := workflow.ManualRun(context.TODO(), db, cache, proj, w1, &sdk.WorkflowNodeRunManual{
User: *u,
Payload: map[string]string{
"git.branch": "master",
"git.author": "test",
wfr, errWR := workflow.CreateRun(db, w1, nil, u)
assert.NoError(t, errWR)
wfr.Workflow = *w1
_, errWr := workflow.StartWorkflowRun(context.TODO(), db, cache, proj, wfr, &sdk.WorkflowRunPostHandlerOption{
Manual: &sdk.WorkflowNodeRunManual{
User: *u,
Payload: map[string]string{
"git.branch": "master",
"git.author": "test",
},
},
}, nil)
}, u, nil)
test.NoError(t, errWr)

wfr.Status = sdk.StatusFail.String()
Expand Down Expand Up @@ -515,13 +540,18 @@ func TestPurgeWorkflowRunWithoutTags(t *testing.T) {

branches := []string{"master", "master", "master", "develop", "develop", "testBr", "testBr", "testBr", "testBr", "test4"}
for i := 0; i < 10; i++ {
_, _, errWr := workflow.ManualRun(context.TODO(), db, cache, proj, w1, &sdk.WorkflowNodeRunManual{
User: *u,
Payload: map[string]string{
"git.branch": branches[i],
"git.author": "test",
wr, errWR := workflow.CreateRun(db, w1, nil, u)
assert.NoError(t, errWR)
wr.Workflow = *w1
_, errWr := workflow.StartWorkflowRun(context.TODO(), db, cache, proj, wr, &sdk.WorkflowRunPostHandlerOption{
Manual: &sdk.WorkflowNodeRunManual{
User: *u,
Payload: map[string]string{
"git.branch": branches[i],
"git.author": "test",
},
},
}, nil)
}, u, nil)
test.NoError(t, errWr)
}

Expand Down Expand Up @@ -604,13 +634,18 @@ func TestPurgeWorkflowRunWithoutTagsBiggerHistoryLength(t *testing.T) {

branches := []string{"master", "master", "master", "develop", "develop", "testBr", "testBr", "testBr", "testBr", "test4"}
for i := 0; i < 10; i++ {
_, _, errWr := workflow.ManualRun(context.TODO(), db, cache, proj, w1, &sdk.WorkflowNodeRunManual{
User: *u,
Payload: map[string]string{
"git.branch": branches[i],
"git.author": "test",
wr, errWR := workflow.CreateRun(db, w1, nil, u)
assert.NoError(t, errWR)
wr.Workflow = *w1
_, errWr := workflow.StartWorkflowRun(context.TODO(), db, cache, proj, wr, &sdk.WorkflowRunPostHandlerOption{
Manual: &sdk.WorkflowNodeRunManual{
User: *u,
Payload: map[string]string{
"git.branch": branches[i],
"git.author": "test",
},
},
}, nil)
}, u, nil)
test.NoError(t, errWr)
}

Expand Down
7 changes: 6 additions & 1 deletion engine/api/workflow/dao_staticfiles_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,12 @@ func TestInsertStaticFiles(t *testing.T) {
})
test.NoError(t, err)

wfr, _, errWr := workflow.ManualRun(context.TODO(), db, cache, proj, w1, &sdk.WorkflowNodeRunManual{User: *u}, nil)
wfr, errWR := workflow.CreateRun(db, w1, nil, u)
assert.NoError(t, errWR)
wfr.Workflow = *w1
_, errWr := workflow.StartWorkflowRun(context.TODO(), db, cache, proj, wfr, &sdk.WorkflowRunPostHandlerOption{
Manual: &sdk.WorkflowNodeRunManual{User: *u},
}, u, nil)
test.NoError(t, errWr)

var stFile sdk.StaticFiles
Expand Down
5 changes: 3 additions & 2 deletions engine/api/workflow/process_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,16 @@ func processWorkflowDataRun(ctx context.Context, db gorp.SqlExecutor, store cach
//////

//// Process Report
oldStatus := wr.Status
report := new(ProcessorReport)
defer func(oldStatus string, wr *sdk.WorkflowRun) {
if oldStatus != wr.Status {
report.Add(*wr)
}
}(wr.Status, wr)
}(oldStatus, wr)
////

wr.Status = string(sdk.StatusBuilding)
wr.Status = sdk.StatusBuilding.String()
maxsn := MaxSubNumber(wr.WorkflowNodeRuns)
wr.LastSubNumber = maxsn

Expand Down
Loading

0 comments on commit acd7a91

Please sign in to comment.