Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add version 2 of workflow export entities #5021

Merged
merged 22 commits into from
Mar 9, 2020
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions cli/cdsctl/workflow_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,13 +243,7 @@ func interactiveChoosePipeline(pkey, defaultPipeline string) (string, *sdk.Pipel

func craftWorkflowFile(workflowName, appName, pipName, destinationDir string) (string, error) {
// Crafting the workflow
wkflw := exportentities.Workflow{
Version: exportentities.WorkflowVersion1,
Name: workflowName,
ApplicationName: appName,
PipelineName: pipName,
}

wkflw := exportentities.InitWorkflow(workflowName, appName, pipName)
b, err := exportentities.Marshal(wkflw, exportentities.FormatYAML)
if err != nil {
return "", fmt.Errorf("Unable to write workflow file format: %v", err)
Expand Down
15 changes: 9 additions & 6 deletions engine/api/templates.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,21 +353,24 @@ func (api *API) applyTemplate(ctx context.Context, u sdk.Identifiable, p *sdk.Pr
// parse the generated workflow to find its name an update it in instance if not detached
// also set the template path in generated workflow if not detached
if !req.Detached {
var wor exportentities.Workflow
if err := yaml.Unmarshal([]byte(result.Workflow), &wor); err != nil {
wor, err := exportentities.UnmarshalWorkflow([]byte(result.Workflow))
if err != nil {
return result, sdk.NewError(sdk.Error{
ID: sdk.ErrWrongRequest.ID,
Message: "Cannot parse generated workflow",
}, err)
}

wti.WorkflowName = wor.Name
wti.WorkflowName = wor.GetName()
if err := workflowtemplate.UpdateInstance(tx, wti); err != nil {
return result, err
}

templatePath := fmt.Sprintf("%s/%s", wt.Group.Name, wt.Slug)
wor.Template = &templatePath
wor, err = exportentities.SetTemplate(wor, templatePath)
if err != nil {
return result, err
}
b, err := yaml.Marshal(wor)
if err != nil {
return result, sdk.NewError(sdk.Error{
Expand Down Expand Up @@ -455,7 +458,7 @@ func (api *API) postTemplateApplyHandler() service.Handler {
log.Debug("postTemplateApplyHandler> template %s applied (withImport=%v)", wt.Slug, withImport)

buf := new(bytes.Buffer)
if err := workflowtemplate.Tar(ctx, wt, res, buf); err != nil {
if err := workflowtemplate.Tar(ctx, res, buf); err != nil {
return err
}

Expand Down Expand Up @@ -607,7 +610,7 @@ func (api *API) postTemplateBulkHandler() service.Handler {
}

buf := new(bytes.Buffer)
if err := workflowtemplate.Tar(ctx, wt, res, buf); err != nil {
if err := workflowtemplate.Tar(ctx, res, buf); err != nil {
if errD := errorDefer(err); errD != nil {
log.Error(ctx, "%v", errD)
return
Expand Down
4 changes: 2 additions & 2 deletions engine/api/templates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func generateTemplate(groupID int64, pipelineName string) *sdk.WorkflowTemplate
Slug: slug.Convert(name),
Workflow: base64.StdEncoding.EncodeToString([]byte(
`name: [[.name]]
version: v1.0
version: v2.0
workflow:
Node-1:
pipeline: ` + pipelineName,
Expand Down Expand Up @@ -150,7 +150,7 @@ func Test_postTemplateBulkHandler(t *testing.T) {
Slug: slug.Convert(name),
Workflow: base64.StdEncoding.EncodeToString([]byte(
`name: [[.name]]
version: v1.0
version: v2.0
workflow:
Node-1:
pipeline: ` + pipelineName,
Expand Down
3 changes: 2 additions & 1 deletion engine/api/user_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
v1 "github.com/ovh/cds/sdk/exportentities/v1"
"net/http"
"reflect"

Expand All @@ -30,7 +31,7 @@ func (api *API) getUserJSONSchema() service.Handler {

var sch *jsonschema.Schema
if filter == "" || filter == "workflow" {
sch = ref.ReflectFromType(reflect.TypeOf(exportentities.Workflow{}))
sch = ref.ReflectFromType(reflect.TypeOf(v1.Workflow{}))
sguiheux marked this conversation as resolved.
Show resolved Hide resolved
buf, _ := json.Marshal(sch)
res.Workflow = string(buf)
}
Expand Down
14 changes: 6 additions & 8 deletions engine/api/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (

"github.com/go-gorp/gorp"
"github.com/gorilla/mux"
yaml "gopkg.in/yaml.v2"

"github.com/ovh/cds/engine/api/application"
"github.com/ovh/cds/engine/api/environment"
"github.com/ovh/cds/engine/api/event"
Expand Down Expand Up @@ -210,9 +208,9 @@ func (api *API) postWorkflowRollbackHandler() service.Handler {
return sdk.WrapError(err, "cannot load workflow audit %s/%s", key, workflowName)
}

var exportWf exportentities.Workflow
if err := yaml.Unmarshal([]byte(audit.DataBefore), &exportWf); err != nil {
return sdk.WrapError(err, "cannot unmarshal data before")
exportWf, err := exportentities.UnmarshalWorkflow([]byte(audit.DataBefore))
if err != nil {
return sdk.WrapError(err, "Cannot unmarshal data before")
sguiheux marked this conversation as resolved.
Show resolved Hide resolved
}

tx, err := db.Begin()
Expand All @@ -223,9 +221,9 @@ func (api *API) postWorkflowRollbackHandler() service.Handler {
_ = tx.Rollback()
}()

newWf, _, err := workflow.ParseAndImport(ctx, tx, api.Cache, *proj, wf, &exportWf, u, workflow.ImportOptions{Force: true, WorkflowName: workflowName})
if err != nil {
return sdk.WrapError(err, "cannot parse and import previous workflow")
newWf, _, errP := workflow.ParseAndImport(ctx, tx, api.Cache, *proj, wf, exportWf, u, workflow.ImportOptions{Force: true, WorkflowName: workflowName})
if errP != nil {
return sdk.WrapError(errP, "Cannot parse and import previous workflow")
sguiheux marked this conversation as resolved.
Show resolved Hide resolved
}

if err := tx.Commit(); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions engine/api/workflow/as_code.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func UpdateWorkflowAsCode(ctx context.Context, store cache.Store, db gorp.SqlExe

var wp exportentities.WorkflowPulled
buffw := new(bytes.Buffer)
if _, err := exportWorkflow(ctx, wf, exportentities.FormatYAML, buffw, exportentities.WorkflowSkipIfOnlyOneRepoWebhook); err != nil {
if _, err := exportWorkflow(ctx, wf, exportentities.FormatYAML, buffw, exportentities.Options{SkipIfOnlyOneRepoWebhook: true}); err != nil {
sguiheux marked this conversation as resolved.
Show resolved Hide resolved
return nil, sdk.WrapError(err, "unable to export workflow")
}
wp.Workflow.Name = wf.Name
Expand All @@ -47,7 +47,7 @@ func MigrateAsCode(ctx context.Context, db *gorp.DbMap, store cache.Store, proj
}

// Export workflow
pull, err := Pull(ctx, db, store, proj, wf.Name, exportentities.FormatYAML, encryptFunc, exportentities.WorkflowSkipIfOnlyOneRepoWebhook)
pull, err := Pull(ctx, db, store, proj, wf.Name, exportentities.FormatYAML, encryptFunc, exportentities.Options{SkipIfOnlyOneRepoWebhook: true})
if err != nil {
return nil, sdk.WrapError(err, "cannot pull workflow")
}
Expand Down
6 changes: 3 additions & 3 deletions engine/api/workflow/audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (a addWorkflowAudit) Compute(ctx context.Context, db gorp.SqlExecutor, e sd
}

buffer := bytes.NewBufferString("")
if _, err := exportWorkflow(ctx, wEvent.Workflow, exportentities.FormatYAML, buffer); err != nil {
if _, err := exportWorkflow(ctx, wEvent.Workflow, exportentities.FormatYAML, buffer, exportentities.Options{}); err != nil {
return sdk.WrapError(err, "Unable to export workflow")
}

Expand All @@ -60,12 +60,12 @@ func (u updateWorkflowAudit) Compute(ctx context.Context, db gorp.SqlExecutor, e
}

oldWorkflowBuffer := bytes.NewBufferString("")
if _, err := exportWorkflow(ctx, wEvent.OldWorkflow, exportentities.FormatYAML, oldWorkflowBuffer); err != nil {
if _, err := exportWorkflow(ctx, wEvent.OldWorkflow, exportentities.FormatYAML, oldWorkflowBuffer, exportentities.Options{}); err != nil {
return sdk.WrapError(err, "Unable to export workflow")
}

newWorkflowBuffer := bytes.NewBufferString("")
if _, err := exportWorkflow(ctx, wEvent.NewWorkflow, exportentities.FormatYAML, newWorkflowBuffer); err != nil {
if _, err := exportWorkflow(ctx, wEvent.NewWorkflow, exportentities.FormatYAML, newWorkflowBuffer, exportentities.Options{}); err != nil {
return sdk.WrapError(err, "Unable to export workflow")
}

Expand Down
11 changes: 6 additions & 5 deletions engine/api/workflow/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -1329,7 +1329,7 @@ func checkApplication(store cache.Store, db gorp.SqlExecutor, proj sdk.Project,
if n.Context.ApplicationName != "" {
appDB, err := application.LoadByName(db, store, proj.Key, n.Context.ApplicationName, application.LoadOptions.WithDeploymentStrategies, application.LoadOptions.WithVariables)
if err != nil {
if sdk.ErrorIs(err, sdk.ErrPipelineNotFound) {
if sdk.ErrorIs(err, sdk.ErrApplicationNotFound) {
return sdk.WithStack(sdk.ErrorWithData(sdk.ErrApplicationNotFound, n.Context.ApplicationName))
}
return sdk.WrapError(err, "unable to load application %s", n.Context.ApplicationName)
Expand Down Expand Up @@ -1390,12 +1390,13 @@ func Push(ctx context.Context, db *gorp.DbMap, store cache.Store, proj *sdk.Proj
oldWf = opts.OldWorkflow
} else {
// load the workflow from database if exists
workflowExists, err = Exists(db, proj.Key, data.wrkflw.Name)
workflowExists, err = Exists(db, proj.Key, data.wrkflw.GetName())
if err != nil {
return nil, nil, nil, sdk.WrapError(err, "Cannot check if workflow exists")
}
if workflowExists {
oldWf, err = Load(ctx, db, store, *proj, data.wrkflw.Name, LoadOptions{WithIcon: true})
oldWf, err = Load(ctx, db, store, *proj, data.wrkflw.GetName(), LoadOptions{WithIcon: true})

if err != nil {
return nil, nil, nil, sdk.WrapError(err, "Unable to load existing workflow")
}
Expand Down Expand Up @@ -1477,9 +1478,9 @@ func Push(ctx context.Context, db *gorp.DbMap, store cache.Store, proj *sdk.Proj
importOptions.HookUUID = opts.HookUUID
}

wf, msgList, err := ParseAndImport(ctx, tx, store, *proj, oldWf, &data.wrkflw, u, importOptions)
wf, msgList, err := ParseAndImport(ctx, tx, store, *proj, oldWf, data.wrkflw, u, importOptions)
if err != nil {
return msgList, nil, nil, sdk.WrapError(err, "unable to import workflow %s", data.wrkflw.Name)
return msgList, nil, nil, sdk.WrapError(err, "unable to import workflow %s", data.wrkflw.GetName())
}

// If the workflow is "as-code", it should always be linked to a git repository
Expand Down
4 changes: 2 additions & 2 deletions engine/api/workflow/dao_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ func UpdateWorkflowRun(ctx context.Context, db gorp.SqlExecutor, wr *sdk.Workflo

wr.LastModified = time.Now()
for _, info := range wr.Infos {
if info.IsError && info.SubNumber == wr.LastSubNumber {
wr.Status = string(sdk.StatusFail)
if info.Type == sdk.RunInfoTypeError && info.SubNumber == wr.LastSubNumber {
wr.Status = sdk.StatusFail
}
}

Expand Down
8 changes: 4 additions & 4 deletions engine/api/workflow/dao_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func TestInsertSimpleWorkflowAndExport(t *testing.T) {
test.NoError(t, err)
assert.Equal(t, 1, len(ws))

exp, err := exportentities.NewWorkflow(context.TODO(), *w1)
exp, err := exportentities.NewWorkflow(context.TODO(), *w1, exportentities.Options{})
test.NoError(t, err)
btes, err := exportentities.Marshal(exp, exportentities.FormatYAML)
test.NoError(t, err)
Expand Down Expand Up @@ -340,7 +340,7 @@ func TestInsertComplexeWorkflowAndExport(t *testing.T) {

assertEqualNode(t, &w.WorkflowData.Node, &w1.WorkflowData.Node)

exp, err := exportentities.NewWorkflow(context.TODO(), w)
exp, err := exportentities.NewWorkflow(context.TODO(), w, exportentities.Options{})
test.NoError(t, err)
btes, err := exportentities.Marshal(exp, exportentities.FormatYAML)
test.NoError(t, err)
Expand Down Expand Up @@ -833,7 +833,7 @@ func TestInsertComplexeWorkflowWithJoinsAndExport(t *testing.T) {
}, []int64{w1.WorkflowData.Joins[0].JoinContext[0].ParentID, w1.WorkflowData.Joins[0].JoinContext[1].ParentID})
assert.Equal(t, pip5.ID, w.WorkflowData.Joins[0].Triggers[0].ChildNode.Context.PipelineID)

exp, err := exportentities.NewWorkflow(context.TODO(), *w1)
exp, err := exportentities.NewWorkflow(context.TODO(), *w1, exportentities.Options{})
test.NoError(t, err)
btes, err := exportentities.Marshal(exp, exportentities.FormatYAML)
test.NoError(t, err)
Expand Down Expand Up @@ -1486,7 +1486,7 @@ func TestInsertSimpleWorkflowWithHookAndExport(t *testing.T) {

assert.Len(t, w.WorkflowData.Node.Hooks, 1)

exp, err := exportentities.NewWorkflow(context.TODO(), *w1)
exp, err := exportentities.NewWorkflow(context.TODO(), *w1, exportentities.Options{})
test.NoError(t, err)
btes, err := exportentities.Marshal(exp, exportentities.FormatYAML)
test.NoError(t, err)
Expand Down
3 changes: 2 additions & 1 deletion engine/api/workflow/execute_node_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,9 +347,10 @@ func executeNodeRun(ctx context.Context, db gorp.SqlExecutor, store cache.Store,
log.Error(ctx, "workflow.execute> Unable to load mutex-locked workflow rnode un: %v", errWRun)
return report, nil
}
AddWorkflowRunInfo(workflowRun, false, sdk.SpawnMsg{
AddWorkflowRunInfo(workflowRun, sdk.SpawnMsg{
sguiheux marked this conversation as resolved.
Show resolved Hide resolved
ID: sdk.MsgWorkflowNodeMutexRelease.ID,
Args: []interface{}{waitingRun.WorkflowNodeName},
Type: sdk.MsgWorkflowNodeMutexRelease.Type,
})

if err := UpdateWorkflowRun(ctx, db, workflowRun); err != nil {
Expand Down
10 changes: 6 additions & 4 deletions engine/api/workflow/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,10 @@ func checkCondition(ctx context.Context, wr *sdk.WorkflowRun, conditions sdk.Wor
luacheck, err := luascript.NewCheck()
if err != nil {
log.Warning(ctx, "processWorkflowNodeRun> WorkflowCheckConditions error: %s", err)
AddWorkflowRunInfo(wr, true, sdk.SpawnMsg{
AddWorkflowRunInfo(wr, sdk.SpawnMsg{
ID: sdk.MsgWorkflowError.ID,
Args: []interface{}{fmt.Sprintf("Error init LUA System: %v", err)},
Type: sdk.MsgWorkflowError.Type,
})
}
luacheck.SetVariables(sdk.ParametersToMap(params))
Expand All @@ -62,22 +63,23 @@ func checkCondition(ctx context.Context, wr *sdk.WorkflowRun, conditions sdk.Wor
}
if errc != nil {
log.Warning(ctx, "processWorkflowNodeRun> WorkflowCheckConditions error: %s", errc)
AddWorkflowRunInfo(wr, true, sdk.SpawnMsg{
AddWorkflowRunInfo(wr, sdk.SpawnMsg{
ID: sdk.MsgWorkflowError.ID,
Args: []interface{}{fmt.Sprintf("Error on LUA Condition: %v", errc)},
Type: sdk.MsgWorkflowError.Type,
})
return false
}
return conditionsOK
}

// AddWorkflowRunInfo add WorkflowRunInfo on a WorkflowRun
func AddWorkflowRunInfo(run *sdk.WorkflowRun, isError bool, infos ...sdk.SpawnMsg) {
func AddWorkflowRunInfo(run *sdk.WorkflowRun, infos ...sdk.SpawnMsg) {
for _, i := range infos {
run.Infos = append(run.Infos, sdk.WorkflowRunInfo{
APITime: time.Now(),
Message: i,
IsError: isError,
Type: i.Type,
SubNumber: run.LastSubNumber,
})
}
Expand Down
16 changes: 10 additions & 6 deletions engine/api/workflow/process_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ func processNodeTriggers(ctx context.Context, db gorp.SqlExecutor, store cache.S
r1, _, errPwnr := processNodeRun(ctx, db, store, proj, wr, mapNodes, &t.ChildNode, int(parentSubNumber), parentNodeRun, nil, nil)
if errPwnr != nil {
log.Error(ctx, "processWorkflowRun> Unable to process node ID=%d: %s", t.ChildNode.ID, errPwnr)
AddWorkflowRunInfo(wr, true, sdk.SpawnMsg{
AddWorkflowRunInfo(wr, sdk.SpawnMsg{
ID: sdk.MsgWorkflowError.ID,
Args: []interface{}{errPwnr.Error()},
Type: sdk.MsgWorkflowError.Type,
})
}
_, _ = report.Merge(ctx, r1, nil)
Expand Down Expand Up @@ -240,9 +241,10 @@ func processNode(ctx context.Context, db gorp.SqlExecutor, store cache.Store, pr
vcsServer := repositoriesmanager.GetProjectVCSServer(proj, app.VCSServer)
vcsInf, errVcs = getVCSInfos(ctx, db, store, proj.Key, vcsServer, currentJobGitValues, app.Name, app.VCSServer, app.RepositoryFullname)
if errVcs != nil {
AddWorkflowRunInfo(wr, true, sdk.SpawnMsg{
AddWorkflowRunInfo(wr, sdk.SpawnMsg{
ID: sdk.MsgWorkflowError.ID,
Args: []interface{}{errVcs.Error()},
Type: sdk.MsgWorkflowError.Type,
})
return nil, false, sdk.WrapError(errVcs, "unable to get git informations")
}
Expand Down Expand Up @@ -298,8 +300,8 @@ func processNode(ctx context.Context, db gorp.SqlExecutor, store cache.Store, pr
}

for _, info := range wr.Infos {
if info.IsError && info.SubNumber == wr.LastSubNumber {
nr.Status = string(sdk.StatusFail)
if info.Type == sdk.RunInfoTypeError && info.SubNumber == wr.LastSubNumber {
nr.Status = sdk.StatusFail
nr.Done = time.Now()
break
}
Expand Down Expand Up @@ -365,9 +367,10 @@ func processNode(ctx context.Context, db gorp.SqlExecutor, store cache.Store, pr
}
if nbMutex > 0 {
log.Debug("Noderun %s processed but not executed because of mutex", n.Name)
AddWorkflowRunInfo(wr, false, sdk.SpawnMsg{
AddWorkflowRunInfo(wr, sdk.SpawnMsg{
ID: sdk.MsgWorkflowNodeMutex.ID,
Args: []interface{}{n.Name},
Type: sdk.MsgWorkflowNodeMutex.Type,
})
if err := UpdateWorkflowRun(ctx, db, wr); err != nil {
return nil, false, sdk.WrapError(err, "unable to update workflow run")
Expand Down Expand Up @@ -479,9 +482,10 @@ func computePayload(n *sdk.Node, hookEvent *sdk.WorkflowNodeRunHookEvent, manual
func computeNodeContextBuildParameters(ctx context.Context, proj sdk.Project, wr *sdk.WorkflowRun, run *sdk.WorkflowNodeRun, n *sdk.Node, runContext nodeRunContext) {
nodeRunParams, errParam := getNodeRunBuildParameters(ctx, proj, wr, run, runContext)
if errParam != nil {
AddWorkflowRunInfo(wr, true, sdk.SpawnMsg{
AddWorkflowRunInfo(wr, sdk.SpawnMsg{
ID: sdk.MsgWorkflowError.ID,
Args: []interface{}{errParam.Error()},
Type: sdk.MsgWorkflowError.Type,
})
// if there an error -> display it in workflowRunInfo and not stop the launch
log.Error(ctx, "processNode> getNodeRunBuildParameters failed. Project:%s [#%d.%d]%s.%d with payload %v err:%v", proj.Name, wr.Number, run.SubNumber, wr.Workflow.Name, n.ID, run.Payload, errParam)
Expand Down
3 changes: 2 additions & 1 deletion engine/api/workflow/process_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,13 @@ func processStartFromRootNode(ctx context.Context, db gorp.SqlExecutor, store ca
log.Debug("processWorkflowRun> starting from the root: %d (pipeline %s)", wr.Workflow.WorkflowData.Node.ID, wr.Workflow.Pipelines[wr.Workflow.WorkflowData.Node.Context.PipelineID].Name)
report := new(ProcessorReport)
//Run the root: manual or from an event
AddWorkflowRunInfo(wr, false, sdk.SpawnMsg{
AddWorkflowRunInfo(wr, sdk.SpawnMsg{
ID: sdk.MsgWorkflowStarting.ID,
Args: []interface{}{
wr.Workflow.Name,
fmt.Sprintf("%d.%d", wr.Number, 0),
},
Type: sdk.MsgWorkflowStarting.Type,
})

r1, conditionOK, errP := processNodeRun(ctx, db, store, proj, wr, mapNodes, &wr.Workflow.WorkflowData.Node, 0, nil, hookEvent, manual)
Expand Down
4 changes: 3 additions & 1 deletion engine/api/workflow/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,9 @@ func extractFromCDSFiles(ctx context.Context, tr *tar.Reader) (*exportedEntities
mError.Append(fmt.Errorf("two workflows files found: %s and %s", workflowFileName, hdr.Name))
break
}
if err := yaml.Unmarshal(b, &res.wrkflw); err != nil {
var err error
res.wrkflw, err = exportentities.UnmarshalWorkflow(b)
if err != nil {
log.Error(ctx, "Push> Unable to unmarshal workflow %s: %v", hdr.Name, err)
mError.Append(fmt.Errorf("Unable to unmarshal workflow %s: %v", hdr.Name, err))
continue
Expand Down
Loading