Skip to content

Commit

Permalink
feat(sdk): add version 2 of workflow export entities (#5021)
Browse files Browse the repository at this point in the history
  • Loading branch information
sguiheux authored Mar 9, 2020
1 parent b1b927a commit eb3c114
Show file tree
Hide file tree
Showing 46 changed files with 3,031 additions and 1,493 deletions.
8 changes: 1 addition & 7 deletions cli/cdsctl/workflow_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,13 +243,7 @@ func interactiveChoosePipeline(pkey, defaultPipeline string) (string, *sdk.Pipel

func craftWorkflowFile(workflowName, appName, pipName, destinationDir string) (string, error) {
// Crafting the workflow
wkflw := exportentities.Workflow{
Version: exportentities.WorkflowVersion1,
Name: workflowName,
ApplicationName: appName,
PipelineName: pipName,
}

wkflw := exportentities.InitWorkflow(workflowName, appName, pipName)
b, err := exportentities.Marshal(wkflw, exportentities.FormatYAML)
if err != nil {
return "", fmt.Errorf("Unable to write workflow file format: %v", err)
Expand Down
15 changes: 9 additions & 6 deletions engine/api/templates.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,21 +353,24 @@ func (api *API) applyTemplate(ctx context.Context, u sdk.Identifiable, p *sdk.Pr
// parse the generated workflow to find its name an update it in instance if not detached
// also set the template path in generated workflow if not detached
if !req.Detached {
var wor exportentities.Workflow
if err := yaml.Unmarshal([]byte(result.Workflow), &wor); err != nil {
wor, err := exportentities.UnmarshalWorkflow([]byte(result.Workflow))
if err != nil {
return result, sdk.NewError(sdk.Error{
ID: sdk.ErrWrongRequest.ID,
Message: "Cannot parse generated workflow",
}, err)
}

wti.WorkflowName = wor.Name
wti.WorkflowName = wor.GetName()
if err := workflowtemplate.UpdateInstance(tx, wti); err != nil {
return result, err
}

templatePath := fmt.Sprintf("%s/%s", wt.Group.Name, wt.Slug)
wor.Template = &templatePath
wor, err = exportentities.SetTemplate(wor, templatePath)
if err != nil {
return result, err
}
b, err := yaml.Marshal(wor)
if err != nil {
return result, sdk.NewError(sdk.Error{
Expand Down Expand Up @@ -455,7 +458,7 @@ func (api *API) postTemplateApplyHandler() service.Handler {
log.Debug("postTemplateApplyHandler> template %s applied (withImport=%v)", wt.Slug, withImport)

buf := new(bytes.Buffer)
if err := workflowtemplate.Tar(ctx, wt, res, buf); err != nil {
if err := workflowtemplate.Tar(ctx, res, buf); err != nil {
return err
}

Expand Down Expand Up @@ -607,7 +610,7 @@ func (api *API) postTemplateBulkHandler() service.Handler {
}

buf := new(bytes.Buffer)
if err := workflowtemplate.Tar(ctx, wt, res, buf); err != nil {
if err := workflowtemplate.Tar(ctx, res, buf); err != nil {
if errD := errorDefer(err); errD != nil {
log.Error(ctx, "%v", errD)
return
Expand Down
4 changes: 2 additions & 2 deletions engine/api/templates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func generateTemplate(groupID int64, pipelineName string) *sdk.WorkflowTemplate
Slug: slug.Convert(name),
Workflow: base64.StdEncoding.EncodeToString([]byte(
`name: [[.name]]
version: v1.0
version: v2.0
workflow:
Node-1:
pipeline: ` + pipelineName,
Expand Down Expand Up @@ -150,7 +150,7 @@ func Test_postTemplateBulkHandler(t *testing.T) {
Slug: slug.Convert(name),
Workflow: base64.StdEncoding.EncodeToString([]byte(
`name: [[.name]]
version: v1.0
version: v2.0
workflow:
Node-1:
pipeline: ` + pipelineName,
Expand Down
3 changes: 2 additions & 1 deletion engine/api/user_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
v2 "github.com/ovh/cds/sdk/exportentities/v2"
"net/http"
"reflect"

Expand All @@ -30,7 +31,7 @@ func (api *API) getUserJSONSchema() service.Handler {

var sch *jsonschema.Schema
if filter == "" || filter == "workflow" {
sch = ref.ReflectFromType(reflect.TypeOf(exportentities.Workflow{}))
sch = ref.ReflectFromType(reflect.TypeOf(v2.Workflow{}))
buf, _ := json.Marshal(sch)
res.Workflow = string(buf)
}
Expand Down
12 changes: 5 additions & 7 deletions engine/api/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (

"github.com/go-gorp/gorp"
"github.com/gorilla/mux"
yaml "gopkg.in/yaml.v2"

"github.com/ovh/cds/engine/api/application"
"github.com/ovh/cds/engine/api/environment"
"github.com/ovh/cds/engine/api/event"
Expand Down Expand Up @@ -210,8 +208,8 @@ func (api *API) postWorkflowRollbackHandler() service.Handler {
return sdk.WrapError(err, "cannot load workflow audit %s/%s", key, workflowName)
}

var exportWf exportentities.Workflow
if err := yaml.Unmarshal([]byte(audit.DataBefore), &exportWf); err != nil {
exportWf, err := exportentities.UnmarshalWorkflow([]byte(audit.DataBefore))
if err != nil {
return sdk.WrapError(err, "cannot unmarshal data before")
}

Expand All @@ -223,9 +221,9 @@ func (api *API) postWorkflowRollbackHandler() service.Handler {
_ = tx.Rollback()
}()

newWf, _, err := workflow.ParseAndImport(ctx, tx, api.Cache, *proj, wf, &exportWf, u, workflow.ImportOptions{Force: true, WorkflowName: workflowName})
if err != nil {
return sdk.WrapError(err, "cannot parse and import previous workflow")
newWf, _, errP := workflow.ParseAndImport(ctx, tx, api.Cache, *proj, wf, exportWf, u, workflow.ImportOptions{Force: true, WorkflowName: workflowName})
if errP != nil {
return sdk.WrapError(errP, "cannot parse and import previous workflow")
}

if err := tx.Commit(); err != nil {
Expand Down
5 changes: 3 additions & 2 deletions engine/api/workflow/as_code.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding/base64"
"fmt"
v2 "github.com/ovh/cds/sdk/exportentities/v2"
"time"

"github.com/go-gorp/gorp"
Expand All @@ -26,7 +27,7 @@ func UpdateWorkflowAsCode(ctx context.Context, store cache.Store, db gorp.SqlExe

var wp exportentities.WorkflowPulled
buffw := new(bytes.Buffer)
if _, err := exportWorkflow(ctx, wf, exportentities.FormatYAML, buffw, exportentities.WorkflowSkipIfOnlyOneRepoWebhook); err != nil {
if _, err := exportWorkflow(ctx, wf, exportentities.FormatYAML, buffw, v2.WorkflowSkipIfOnlyOneRepoWebhook); err != nil {
return nil, sdk.WrapError(err, "unable to export workflow")
}
wp.Workflow.Name = wf.Name
Expand All @@ -47,7 +48,7 @@ func MigrateAsCode(ctx context.Context, db *gorp.DbMap, store cache.Store, proj
}

// Export workflow
pull, err := Pull(ctx, db, store, proj, wf.Name, exportentities.FormatYAML, encryptFunc, exportentities.WorkflowSkipIfOnlyOneRepoWebhook)
pull, err := Pull(ctx, db, store, proj, wf.Name, exportentities.FormatYAML, encryptFunc, v2.WorkflowSkipIfOnlyOneRepoWebhook)
if err != nil {
return nil, sdk.WrapError(err, "cannot pull workflow")
}
Expand Down
11 changes: 6 additions & 5 deletions engine/api/workflow/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -1329,7 +1329,7 @@ func checkApplication(store cache.Store, db gorp.SqlExecutor, proj sdk.Project,
if n.Context.ApplicationName != "" {
appDB, err := application.LoadByName(db, store, proj.Key, n.Context.ApplicationName, application.LoadOptions.WithDeploymentStrategies, application.LoadOptions.WithVariables)
if err != nil {
if sdk.ErrorIs(err, sdk.ErrPipelineNotFound) {
if sdk.ErrorIs(err, sdk.ErrApplicationNotFound) {
return sdk.WithStack(sdk.ErrorWithData(sdk.ErrApplicationNotFound, n.Context.ApplicationName))
}
return sdk.WrapError(err, "unable to load application %s", n.Context.ApplicationName)
Expand Down Expand Up @@ -1390,12 +1390,13 @@ func Push(ctx context.Context, db *gorp.DbMap, store cache.Store, proj *sdk.Proj
oldWf = opts.OldWorkflow
} else {
// load the workflow from database if exists
workflowExists, err = Exists(db, proj.Key, data.wrkflw.Name)
workflowExists, err = Exists(db, proj.Key, data.wrkflw.GetName())
if err != nil {
return nil, nil, nil, sdk.WrapError(err, "Cannot check if workflow exists")
}
if workflowExists {
oldWf, err = Load(ctx, db, store, *proj, data.wrkflw.Name, LoadOptions{WithIcon: true})
oldWf, err = Load(ctx, db, store, *proj, data.wrkflw.GetName(), LoadOptions{WithIcon: true})

if err != nil {
return nil, nil, nil, sdk.WrapError(err, "Unable to load existing workflow")
}
Expand Down Expand Up @@ -1477,9 +1478,9 @@ func Push(ctx context.Context, db *gorp.DbMap, store cache.Store, proj *sdk.Proj
importOptions.HookUUID = opts.HookUUID
}

wf, msgList, err := ParseAndImport(ctx, tx, store, *proj, oldWf, &data.wrkflw, u, importOptions)
wf, msgList, err := ParseAndImport(ctx, tx, store, *proj, oldWf, data.wrkflw, u, importOptions)
if err != nil {
return msgList, nil, nil, sdk.WrapError(err, "unable to import workflow %s", data.wrkflw.Name)
return msgList, nil, nil, sdk.WrapError(err, "unable to import workflow %s", data.wrkflw.GetName())
}

// If the workflow is "as-code", it should always be linked to a git repository
Expand Down
4 changes: 2 additions & 2 deletions engine/api/workflow/dao_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ func UpdateWorkflowRun(ctx context.Context, db gorp.SqlExecutor, wr *sdk.Workflo

wr.LastModified = time.Now()
for _, info := range wr.Infos {
if info.IsError && info.SubNumber == wr.LastSubNumber {
wr.Status = string(sdk.StatusFail)
if info.Type == sdk.RunInfoTypeError && info.SubNumber == wr.LastSubNumber {
wr.Status = sdk.StatusFail
}
}

Expand Down
3 changes: 2 additions & 1 deletion engine/api/workflow/execute_node_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,9 +347,10 @@ func executeNodeRun(ctx context.Context, db gorp.SqlExecutor, store cache.Store,
log.Error(ctx, "workflow.execute> Unable to load mutex-locked workflow rnode un: %v", errWRun)
return report, nil
}
AddWorkflowRunInfo(workflowRun, false, sdk.SpawnMsg{
AddWorkflowRunInfo(workflowRun, sdk.SpawnMsg{
ID: sdk.MsgWorkflowNodeMutexRelease.ID,
Args: []interface{}{waitingRun.WorkflowNodeName},
Type: sdk.MsgWorkflowNodeMutexRelease.Type,
})

if err := UpdateWorkflowRun(ctx, db, workflowRun); err != nil {
Expand Down
10 changes: 6 additions & 4 deletions engine/api/workflow/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,10 @@ func checkCondition(ctx context.Context, wr *sdk.WorkflowRun, conditions sdk.Wor
luacheck, err := luascript.NewCheck()
if err != nil {
log.Warning(ctx, "processWorkflowNodeRun> WorkflowCheckConditions error: %s", err)
AddWorkflowRunInfo(wr, true, sdk.SpawnMsg{
AddWorkflowRunInfo(wr, sdk.SpawnMsg{
ID: sdk.MsgWorkflowError.ID,
Args: []interface{}{fmt.Sprintf("Error init LUA System: %v", err)},
Type: sdk.MsgWorkflowError.Type,
})
}
luacheck.SetVariables(sdk.ParametersToMap(params))
Expand All @@ -62,22 +63,23 @@ func checkCondition(ctx context.Context, wr *sdk.WorkflowRun, conditions sdk.Wor
}
if errc != nil {
log.Warning(ctx, "processWorkflowNodeRun> WorkflowCheckConditions error: %s", errc)
AddWorkflowRunInfo(wr, true, sdk.SpawnMsg{
AddWorkflowRunInfo(wr, sdk.SpawnMsg{
ID: sdk.MsgWorkflowError.ID,
Args: []interface{}{fmt.Sprintf("Error on LUA Condition: %v", errc)},
Type: sdk.MsgWorkflowError.Type,
})
return false
}
return conditionsOK
}

// AddWorkflowRunInfo add WorkflowRunInfo on a WorkflowRun
func AddWorkflowRunInfo(run *sdk.WorkflowRun, isError bool, infos ...sdk.SpawnMsg) {
func AddWorkflowRunInfo(run *sdk.WorkflowRun, infos ...sdk.SpawnMsg) {
for _, i := range infos {
run.Infos = append(run.Infos, sdk.WorkflowRunInfo{
APITime: time.Now(),
Message: i,
IsError: isError,
Type: i.Type,
SubNumber: run.LastSubNumber,
})
}
Expand Down
16 changes: 10 additions & 6 deletions engine/api/workflow/process_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ func processNodeTriggers(ctx context.Context, db gorp.SqlExecutor, store cache.S
r1, _, errPwnr := processNodeRun(ctx, db, store, proj, wr, mapNodes, &t.ChildNode, int(parentSubNumber), parentNodeRun, nil, nil)
if errPwnr != nil {
log.Error(ctx, "processWorkflowRun> Unable to process node ID=%d: %s", t.ChildNode.ID, errPwnr)
AddWorkflowRunInfo(wr, true, sdk.SpawnMsg{
AddWorkflowRunInfo(wr, sdk.SpawnMsg{
ID: sdk.MsgWorkflowError.ID,
Args: []interface{}{errPwnr.Error()},
Type: sdk.MsgWorkflowError.Type,
})
}
_, _ = report.Merge(ctx, r1, nil)
Expand Down Expand Up @@ -240,9 +241,10 @@ func processNode(ctx context.Context, db gorp.SqlExecutor, store cache.Store, pr
vcsServer := repositoriesmanager.GetProjectVCSServer(proj, app.VCSServer)
vcsInf, errVcs = getVCSInfos(ctx, db, store, proj.Key, vcsServer, currentJobGitValues, app.Name, app.VCSServer, app.RepositoryFullname)
if errVcs != nil {
AddWorkflowRunInfo(wr, true, sdk.SpawnMsg{
AddWorkflowRunInfo(wr, sdk.SpawnMsg{
ID: sdk.MsgWorkflowError.ID,
Args: []interface{}{errVcs.Error()},
Type: sdk.MsgWorkflowError.Type,
})
return nil, false, sdk.WrapError(errVcs, "unable to get git informations")
}
Expand Down Expand Up @@ -298,8 +300,8 @@ func processNode(ctx context.Context, db gorp.SqlExecutor, store cache.Store, pr
}

for _, info := range wr.Infos {
if info.IsError && info.SubNumber == wr.LastSubNumber {
nr.Status = string(sdk.StatusFail)
if info.Type == sdk.RunInfoTypeError && info.SubNumber == wr.LastSubNumber {
nr.Status = sdk.StatusFail
nr.Done = time.Now()
break
}
Expand Down Expand Up @@ -365,9 +367,10 @@ func processNode(ctx context.Context, db gorp.SqlExecutor, store cache.Store, pr
}
if nbMutex > 0 {
log.Debug("Noderun %s processed but not executed because of mutex", n.Name)
AddWorkflowRunInfo(wr, false, sdk.SpawnMsg{
AddWorkflowRunInfo(wr, sdk.SpawnMsg{
ID: sdk.MsgWorkflowNodeMutex.ID,
Args: []interface{}{n.Name},
Type: sdk.MsgWorkflowNodeMutex.Type,
})
if err := UpdateWorkflowRun(ctx, db, wr); err != nil {
return nil, false, sdk.WrapError(err, "unable to update workflow run")
Expand Down Expand Up @@ -479,9 +482,10 @@ func computePayload(n *sdk.Node, hookEvent *sdk.WorkflowNodeRunHookEvent, manual
func computeNodeContextBuildParameters(ctx context.Context, proj sdk.Project, wr *sdk.WorkflowRun, run *sdk.WorkflowNodeRun, n *sdk.Node, runContext nodeRunContext) {
nodeRunParams, errParam := getNodeRunBuildParameters(ctx, proj, wr, run, runContext)
if errParam != nil {
AddWorkflowRunInfo(wr, true, sdk.SpawnMsg{
AddWorkflowRunInfo(wr, sdk.SpawnMsg{
ID: sdk.MsgWorkflowError.ID,
Args: []interface{}{errParam.Error()},
Type: sdk.MsgWorkflowError.Type,
})
// if there an error -> display it in workflowRunInfo and not stop the launch
log.Error(ctx, "processNode> getNodeRunBuildParameters failed. Project:%s [#%d.%d]%s.%d with payload %v err:%v", proj.Name, wr.Number, run.SubNumber, wr.Workflow.Name, n.ID, run.Payload, errParam)
Expand Down
3 changes: 2 additions & 1 deletion engine/api/workflow/process_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,13 @@ func processStartFromRootNode(ctx context.Context, db gorp.SqlExecutor, store ca
log.Debug("processWorkflowRun> starting from the root: %d (pipeline %s)", wr.Workflow.WorkflowData.Node.ID, wr.Workflow.Pipelines[wr.Workflow.WorkflowData.Node.Context.PipelineID].Name)
report := new(ProcessorReport)
//Run the root: manual or from an event
AddWorkflowRunInfo(wr, false, sdk.SpawnMsg{
AddWorkflowRunInfo(wr, sdk.SpawnMsg{
ID: sdk.MsgWorkflowStarting.ID,
Args: []interface{}{
wr.Workflow.Name,
fmt.Sprintf("%d.%d", wr.Number, 0),
},
Type: sdk.MsgWorkflowStarting.Type,
})

r1, conditionOK, errP := processNodeRun(ctx, db, store, proj, wr, mapNodes, &wr.Workflow.WorkflowData.Node, 0, nil, hookEvent, manual)
Expand Down
4 changes: 3 additions & 1 deletion engine/api/workflow/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,9 @@ func extractFromCDSFiles(ctx context.Context, tr *tar.Reader) (*exportedEntities
mError.Append(fmt.Errorf("two workflows files found: %s and %s", workflowFileName, hdr.Name))
break
}
if err := yaml.Unmarshal(b, &res.wrkflw); err != nil {
var err error
res.wrkflw, err = exportentities.UnmarshalWorkflow(b)
if err != nil {
log.Error(ctx, "Push> Unable to unmarshal workflow %s: %v", hdr.Name, err)
mError.Append(fmt.Errorf("Unable to unmarshal workflow %s: %v", hdr.Name, err))
continue
Expand Down
2 changes: 1 addition & 1 deletion engine/api/workflow/resync_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func ResyncWorkflowRunStatus(ctx context.Context, db gorp.SqlExecutor, wr *sdk.W
var isInError bool
var newStatus string
for _, info := range wr.Infos {
if info.IsError && info.SubNumber == wr.LastSubNumber {
if info.Type == sdk.RunInfoTypeError && info.SubNumber == wr.LastSubNumber {
isInError = true
break
}
Expand Down
4 changes: 2 additions & 2 deletions engine/api/workflow/run_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func runFromHook(ctx context.Context, db gorp.SqlExecutor, store cache.Store, pr

// Add add code spawn info
for _, msg := range asCodeMsg {
AddWorkflowRunInfo(wr, false, sdk.SpawnMsg{ID: msg.ID, Args: msg.Args})
AddWorkflowRunInfo(wr, sdk.SpawnMsg{ID: msg.ID, Args: msg.Args, Type: msg.Type})
}

//Process it
Expand Down Expand Up @@ -107,7 +107,7 @@ func StartWorkflowRun(ctx context.Context, db *gorp.DbMap, store cache.Store, pr
defer tx.Rollback() // nolint

for _, msg := range asCodeInfos {
AddWorkflowRunInfo(wr, false, sdk.SpawnMsg{ID: msg.ID, Args: msg.Args})
AddWorkflowRunInfo(wr, sdk.SpawnMsg{ID: msg.ID, Args: msg.Args, Type: msg.Type})
}

wr.Status = sdk.StatusWaiting
Expand Down
Loading

0 comments on commit eb3c114

Please sign in to comment.