Skip to content

Commit

Permalink
feat(api): execute ascode action (#6523)
Browse files Browse the repository at this point in the history
  • Loading branch information
sguiheux authored Apr 4, 2023
1 parent 94663f5 commit f43efd0
Show file tree
Hide file tree
Showing 21 changed files with 290 additions and 60 deletions.
13 changes: 13 additions & 0 deletions engine/api/entity/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/go-gorp/gorp"
"github.com/lib/pq"
"github.com/rockbears/log"
"github.com/rockbears/yaml"

"github.com/ovh/cds/engine/api/database/gorpmapping"
"github.com/ovh/cds/engine/gorpmapper"
Expand Down Expand Up @@ -121,6 +122,18 @@ func LoadByBranchTypeName(ctx context.Context, db gorp.SqlExecutor, projectRepos
return getEntity(ctx, db, query, opts...)
}

// LoadAndUnmarshalByBranchTypeName loads an entity by his repository, branch, type, name and unmarshal it
func LoadAndUnmarshalByBranchTypeName(ctx context.Context, db gorp.SqlExecutor, projectRepositoryID string, branch string, t string, name string, out interface{}, opts ...gorpmapping.GetOptionFunc) error {
ent, err := LoadByBranchTypeName(ctx, db, projectRepositoryID, branch, t, name, opts...)
if err != nil {
return err
}
if err := yaml.Unmarshal([]byte(ent.Data), out); err != nil {
return sdk.WrapError(err, "unable to read %s / %s @ %s", projectRepositoryID, name, branch)
}
return nil
}

func UnsafeLoadAllByType(_ context.Context, db gorp.SqlExecutor, t string) ([]sdk.EntityFullName, error) {
query := `
SELECT entity.name as name,
Expand Down
54 changes: 33 additions & 21 deletions engine/api/pipeline/pipeline_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,31 +115,43 @@ func CheckJob(ctx context.Context, db gorp.SqlExecutor, job *sdk.Job) error {
step := &job.Action.Actions[i]
log.Debug(ctx, "CheckJob> Checking step %s", step.Name)

a, err := action.RetrieveForGroupAndName(ctx, db, step.Group, step.Name)
if err != nil {
if sdk.ErrorIs(err, sdk.ErrNoAction) {
errs = append(errs, sdk.NewMessage(sdk.MsgJobNotValidActionNotFound, job.Action.Name, step.Name, i+1))
continue
if step.Type != sdk.AsCodeAction {
a, err := action.RetrieveForGroupAndName(ctx, db, step.Group, step.Name)
if err != nil {
if sdk.ErrorIs(err, sdk.ErrNoAction) {
errs = append(errs, sdk.NewMessage(sdk.MsgJobNotValidActionNotFound, job.Action.Name, step.Name, i+1))
continue
}
return err
}
return err
}
job.Action.Actions[i].ID = a.ID

// FIXME better check for params
for x := range step.Parameters {
sp := &step.Parameters[x]
log.Debug(ctx, "CheckJob> Checking step parameter %s = %s", sp.Name, sp.Value)
var found bool
for y := range a.Parameters {
ap := a.Parameters[y]
if strings.ToLower(sp.Name) == strings.ToLower(ap.Name) {
found = true
break
job.Action.Actions[i].ID = a.ID

// FIXME better check for params
for x := range step.Parameters {
sp := &step.Parameters[x]
log.Debug(ctx, "CheckJob> Checking step parameter %s = %s", sp.Name, sp.Value)
var found bool
for y := range a.Parameters {
ap := a.Parameters[y]
if strings.ToLower(sp.Name) == strings.ToLower(ap.Name) {
found = true
break
}
}
if !found {
errs = append(errs, sdk.NewMessage(sdk.MsgJobNotValidInvalidActionParameter, job.Action.Name, sp.Name, i+1, step.Name))
}
}
if !found {
errs = append(errs, sdk.NewMessage(sdk.MsgJobNotValidInvalidActionParameter, job.Action.Name, sp.Name, i+1, step.Name))

} else {
ascodeAction, err := action.LoadAllByTypes(ctx, db, []string{sdk.AsCodeAction})
if err != nil {
return err
}
if len(ascodeAction) != 1 {
return sdk.NewErrorFrom(sdk.ErrInvalidData, "unable to find ascode action")
}
job.Action.Actions[i].ID = ascodeAction[0].ID
}

if len(errs) > 0 {
Expand Down
10 changes: 2 additions & 8 deletions engine/api/v2_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"net/url"

"github.com/gorilla/mux"
"github.com/rockbears/yaml"

"github.com/ovh/cds/engine/api/entity"
"github.com/ovh/cds/engine/api/project"
"github.com/ovh/cds/engine/api/repositoriesmanager"
Expand Down Expand Up @@ -68,13 +66,9 @@ func (api *API) getActionV2Handler() ([]service.RbacChecker, service.Handler) {
branch = defaultBranch.DisplayID
}

ent, err := entity.LoadByBranchTypeName(ctx, api.mustDB(), repo.ID, branch, sdk.EntityTypeAction, actionName)
if err != nil {
return err
}
var act sdk.V2Action
if err := yaml.Unmarshal([]byte(ent.Data), &act); err != nil {
return sdk.NewErrorFrom(sdk.ErrInvalidData, "unable to read action: %v", err)
if err := entity.LoadAndUnmarshalByBranchTypeName(ctx, api.mustDB(), repo.ID, branch, sdk.EntityTypeAction, actionName, &act); err != nil {
return err
}
return service.WriteJSON(w, act, http.StatusOK)
}
Expand Down
3 changes: 3 additions & 0 deletions engine/api/v2_repository_analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,9 @@ func (api *API) analyzeRepository(ctx context.Context, projectRepoID string, ana
analysis.Data.Error = strings.Join(skippedFiles, "\n")
if len(skippedFiles) == len(analysis.Data.Entities) {
analysis.Status = sdk.RepositoryAnalysisStatusSkipped
if len(analysis.Data.Entities) == 0 {
analysis.Data.Error = "no file found"
}
} else {
analysis.Status = sdk.RepositoryAnalysisStatusSucceed
}
Expand Down
8 changes: 2 additions & 6 deletions engine/api/v2_worker_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,9 @@ func (api *API) getWorkerModelV2Handler() ([]service.RbacChecker, service.Handle
branch = defaultBranch.DisplayID
}

ent, err := entity.LoadByBranchTypeName(ctx, api.mustDB(), repo.ID, branch, sdk.EntityTypeWorkerModel, workerModelName)
if err != nil {
return err
}
var workerModel sdk.V2WorkerModel
if err := yaml.Unmarshal([]byte(ent.Data), &workerModel); err != nil {
return sdk.NewErrorFrom(sdk.ErrInvalidData, "unable to read worker model data: %v", err)
if err := entity.LoadAndUnmarshalByBranchTypeName(ctx, api.mustDB(), repo.ID, branch, sdk.EntityTypeWorkerModel, workerModelName, &workerModel); err != nil {
return err
}
if withCreds {
if err := entity.WorkerModelDecryptSecrets(ctx, api.mustDB(), proj.ID, &workerModel, project.DecryptWithBuiltinKey); err != nil {
Expand Down
8 changes: 4 additions & 4 deletions engine/api/workflow/execute_node_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ func addJobsToQueue(ctx context.Context, store cache.Store, db gorpmapper.SqlExe
//Browse the jobs
jobLoop:
for j := range stage.Jobs {
job := &stage.Jobs[j]
job := stage.Jobs[j]

if previousStage != nil {
for _, rj := range previousStage.RunJobs {
Expand All @@ -489,14 +489,14 @@ jobLoop:

//Process variables for the jobs
_, next = telemetry.Span(ctx, "workflow..getNodeJobRunParameters")
jobParams, err := getNodeJobRunParameters(*job, nr, stage)
jobParams, err := getNodeJobRunParameters(job, nr, stage)
next()
if err != nil {
spawnErrs.Join(*err)
}

_, next = telemetry.Span(ctx, "workflow.processNodeJobRunRequirements")
jobRequirements, containsService, modelType, err := processNodeJobRunRequirements(ctx, store, db, proj.Key, *wr, *job, nr, sdk.Groups(groups).ToIDs(), integrationPlugins, integrationConfigs)
jobRequirements, containsService, modelType, err := processNodeJobRunRequirements(ctx, store, db, proj.Key, *wr, job, nr, sdk.Groups(groups).ToIDs(), integrationPlugins, integrationConfigs)
next()
if err != nil {
spawnErrs.Join(*err)
Expand Down Expand Up @@ -529,7 +529,7 @@ jobLoop:
ExecGroups: groups,
IntegrationPlugins: integrationPlugins,
Job: sdk.ExecutedJob{
Job: *job,
Job: job,
},
Header: nr.Header,
ContainsService: containsService,
Expand Down
111 changes: 109 additions & 2 deletions engine/api/workflow/factory_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@ import (
"github.com/ovh/cds/engine/api/application"
"github.com/ovh/cds/engine/api/ascode"
"github.com/ovh/cds/engine/api/database/gorpmapping"
"github.com/ovh/cds/engine/api/entity"
"github.com/ovh/cds/engine/api/environment"
"github.com/ovh/cds/engine/api/group"
"github.com/ovh/cds/engine/api/integration"
"github.com/ovh/cds/engine/api/pipeline"
"github.com/ovh/cds/engine/api/repository"
"github.com/ovh/cds/engine/api/vcs"
"github.com/ovh/cds/engine/api/workflowtemplate"
"github.com/ovh/cds/engine/gorpmapper"
"github.com/ovh/cds/sdk"
Expand All @@ -35,6 +38,13 @@ type LoadOptions struct {
WithFavoritesForUserID string
}

type asCodeLoader struct {
db gorp.SqlExecutor
vcs map[string]*sdk.VCSProject
repo map[string]*sdk.ProjectRepository
action map[string]sdk.V2Action
}

func (loadOpts LoadOptions) GetWorkflowDAO() WorkflowDAO {
var dao WorkflowDAO

Expand Down Expand Up @@ -217,7 +227,7 @@ func (dao WorkflowDAO) Query() gorpmapping.Query {

func (dao WorkflowDAO) GetLoaders() []gorpmapping.GetOptionFunc {

var loaders = []gorpmapping.GetOptionFunc{}
var loaders = make([]gorpmapping.GetOptionFunc, 0)

if dao.Loaders.WithApplications {
loaders = append(loaders, func(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, i interface{}) error {
Expand Down Expand Up @@ -446,12 +456,109 @@ func (dao WorkflowDAO) withPipelines(ctx context.Context, db gorp.SqlExecutor, w
if pip == nil {
return sdk.WrapError(sdk.ErrNotFound, "unable to find pipeline %d", n.Context.PipelineID)
}

// Load ascode actions
if deep {
ascodeLoader := &asCodeLoader{
db: db,
vcs: make(map[string]*sdk.VCSProject),
repo: make(map[string]*sdk.ProjectRepository),
action: make(map[string]sdk.V2Action),
}
for stageIndex := range pip.Stages {
s := &pip.Stages[stageIndex]
for jobIndex := range s.Jobs {
j := &s.Jobs[jobIndex]
if err := ascodeLoader.browseNonAscodeActionSteps(ctx, j.Action.Actions); err != nil {
return err
}
}
}
w.AscodeActions = ascodeLoader.action
}
w.Pipelines[n.Context.PipelineID] = *pip
}
}
}
}
return nil
}

func (loader *asCodeLoader) browseNonAscodeActionSteps(ctx context.Context, actions []sdk.Action) error {
for _, a := range actions {
if a.Type != sdk.AsCodeAction && len(a.Actions) > 0 {
if err := loader.browseNonAscodeActionSteps(ctx, a.Actions); err != nil {
return err
}
continue
}
if a.Type == sdk.AsCodeAction {
if err := loader.loadAsCodeActionStep(ctx, a.StepName); err != nil {
return err
}
}
}
return nil
}

func (loader *asCodeLoader) loadAsCodeActionStep(ctx context.Context, stepName string) error {
// If already loaded, skip it
if _, has := loader.action[stepName]; has {
return nil
}

branchSplit := strings.Split(stepName, "@")
var projKey, vcsName, repoName, actionName, branch string
if len(branchSplit) != 2 {
return nil
}
branch = branchSplit[1]
actionSplit := strings.Split(branchSplit[0], "/")
if len(actionSplit) != 5 {
return nil
}
projKey = actionSplit[0]
vcsName = actionSplit[1]
repoName = actionSplit[2] + "/" + actionSplit[3]
actionName = actionSplit[4]
vcsServer, has := loader.vcs[projKey+"/"+vcsName]
if !has {
var err error
vcsServer, err = vcs.LoadVCSByName(ctx, loader.db, projKey, vcsName)
if err != nil {
return err
}
loader.vcs[projKey+"/"+vcsName] = vcsServer
}
repo, has := loader.repo[projKey+"/"+vcsName+"/"+repoName]
if !has {
var err error
repo, err = repository.LoadRepositoryByName(ctx, loader.db, vcsServer.ID, repoName)
if err != nil {
return err
}
loader.repo[projKey+"/"+vcsName+"/"+repoName] = repo
}
var currentAction sdk.V2Action
if err := entity.LoadAndUnmarshalByBranchTypeName(ctx, loader.db, repo.ID, branch, sdk.EntityTypeAction, actionName, &currentAction); err != nil {
return err
}
loader.action[stepName] = currentAction
if err := loader.browseAsCodeActionSteps(ctx, currentAction.Runs.Steps); err != nil {
return err
}
return nil
}

func (loader *asCodeLoader) browseAsCodeActionSteps(ctx context.Context, steps []sdk.ActionStep) error {
for _, step := range steps {
if step.Uses == "" {
continue
}
if err := loader.loadAsCodeActionStep(ctx, step.Uses); err != nil {
return err
}
}
return nil
}

Expand Down Expand Up @@ -551,7 +658,7 @@ func (dao WorkflowDAO) withIntegrations(ctx context.Context, db gorp.SqlExecutor
return nil
}

func (dao WorkflowDAO) withAsCodeUpdateEvents(ctx context.Context, db gorp.SqlExecutor, ws *[]Workflow) error {
func (dao WorkflowDAO) withAsCodeUpdateEvents(_ context.Context, db gorp.SqlExecutor, ws *[]Workflow) error {
var ids = make([]int64, 0, len(*ws))
for _, w := range *ws {
ids = append(ids, w.ID)
Expand Down
1 change: 1 addition & 0 deletions engine/api/workflow_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ func (api *API) takeJob(ctx context.Context, p *sdk.Project, id int64, workerMod
wnjri.WorkflowID = workflowRun.WorkflowID
wnjri.WorkflowName = workflowRun.Workflow.Name
wnjri.NodeRunName = noderun.WorkflowNodeName
wnjri.AscodeActions = workflowRun.Workflow.AscodeActions

secretsReqs := job.Job.Action.Requirements.FilterByType(sdk.SecretRequirement).Values()
secretsReqsRegs := make([]*regexp.Regexp, 0, len(secretsReqs))
Expand Down
3 changes: 3 additions & 0 deletions engine/worker/internal/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,9 @@ func (w *CurrentWorker) runAction(ctx context.Context, a sdk.Action, jobID int64
case sdk.PluginAction:
res := w.runGRPCPlugin(ctx, a)
return res
case sdk.AsCodeAction:
res := w.runAscodeAction(ctx, a.StepName)
return res
}

// There is is no children actions (action is empty) to do, success !
Expand Down
Loading

0 comments on commit f43efd0

Please sign in to comment.