Skip to content

Commit

Permalink
feat(api,hooks): add manual workflow trigger (#6911)
Browse files Browse the repository at this point in the history
  • Loading branch information
sguiheux authored Mar 26, 2024
1 parent fe2808d commit 3d3d7e1
Show file tree
Hide file tree
Showing 49 changed files with 1,076 additions and 507 deletions.
81 changes: 64 additions & 17 deletions cli/cdsctl/experimental_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package main

import (
"context"
"encoding/json"
"fmt"
"os"
"strings"
"time"

"github.com/rockbears/yaml"
"github.com/spf13/cobra"
Expand All @@ -22,7 +22,7 @@ var experimentalWorkflowCmd = cli.Command{

func experimentalWorkflow() *cobra.Command {
return cli.NewCommand(experimentalWorkflowCmd, nil, []*cobra.Command{
cli.NewCommand(workflowRunCmd, workflowRunFunc, nil, withAllCommandModifiers()...),
cli.NewGetCommand(workflowRunCmd, workflowRunFunc, nil, withAllCommandModifiers()...),
cli.NewCommand(workflowRestartCmd, workflowRestartFunc, nil, withAllCommandModifiers()...),
cli.NewListCommand(workflowRunHistoryCmd, workflowRunHistoryFunc, nil, withAllCommandModifiers()...),
cli.NewListCommand(workflowRunInfosListCmd, workflowRunInfosListFunc, nil, withAllCommandModifiers()...),
Expand Down Expand Up @@ -199,36 +199,83 @@ var workflowRunCmd = cli.Command{
Name: "tag",
},
{
Name: "data",
Default: "{}",
Name: "commit",
},
{
Name: "workflow-branch",
},
{
Name: "workflow-tag",
},
},
}

func workflowRunFunc(v cli.Values) error {
func workflowRunFunc(v cli.Values) (interface{}, error) {
projKey := v.GetString("proj_key")
vcsId := v.GetString("vcs_identifier")
repoId := v.GetString("repo_identifier")
wkfName := v.GetString("workflow_name")
branch := v.GetString("branch")
tag := v.GetString("tag")
data := v.GetString("data")
destBranch := v.GetString("branch")
destTag := v.GetString("tag")
destCommit := v.GetString("commit")
workflowBranch := v.GetString("workflow-branch")
workflowTag := v.GetString("workflow-tag")

if tag != "" && branch != "" {
return fmt.Errorf("you cannot use branch and tag together")
if destBranch != "" && destTag != "" {
return nil, fmt.Errorf("you cannot use branch and tag together")
}

var payload map[string]interface{}
if err := json.Unmarshal([]byte(data), &payload); err != nil {
return fmt.Errorf("unable to read json data")
if workflowBranch != "" && workflowTag != "" {
return nil, fmt.Errorf("you cannot use workflow-branch and workflow-tag together")
}

run, err := client.WorkflowV2Run(context.Background(), projKey, vcsId, repoId, wkfName, payload, cdsclient.WithQueryParameter("branch", branch), cdsclient.WithQueryParameter("tag", tag))
payload := sdk.V2WorkflowRunManualRequest{
Branch: destBranch,
Tag: destTag,
Sha: destCommit,
WorkflowBranch: workflowBranch,
WorkflowTag: workflowTag,
}

hookRunEvent, err := client.WorkflowV2Run(context.Background(), projKey, vcsId, repoId, wkfName, payload)
if err != nil {
return err
return nil, err
}

type run struct {
Workflow string `json:"workflow" cli:"workflow"`
RunNumber int64 `json:"run_number" cli:"run_number"`
RunID string `json:"run_id" cli:"run_id"`
Error string `json:"error" cli:"error"`
}

retry := 0
for {
event, err := client.ProjectRepositoryEvent(context.Background(), projKey, vcsId, repoId, hookRunEvent.UUID)
if err != nil {
return nil, err
}
if event.Status == sdk.HookEventStatusDone {
if len(event.WorkflowHooks) == 1 {
return run{
Workflow: wkfName,
RunNumber: event.WorkflowHooks[0].RunNumber,
RunID: event.WorkflowHooks[0].RunID,
}, nil
}
return nil, fmt.Errorf("workflow did not start")
}
if event.Status == sdk.HookEventStatusError || event.Status == sdk.HookEventStatusSkipped {
return run{
Error: event.LastError,
}, nil
}
retry++
if retry > 90 {
return nil, fmt.Errorf("workflow take too much time to start")
}
time.Sleep(1 * time.Second)
}
fmt.Printf("Workflow %s #%d started", run.WorkflowName, run.RunNumber)
return nil
}

var workflowRestartCmd = cli.Command{
Expand Down
14 changes: 12 additions & 2 deletions engine/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,9 @@ type Configuration struct {
WorkflowV2 struct {
JobSchedulingTimeout int64 `toml:"jobSchedulingTimeout" comment:"Timeout delay for job scheduling (in seconds)" json:"jobSchedulingTimeout" default:"600"`
} `toml:"workflowv2" comment:"######################\n 'Workflow V2' global configuration \n######################" json:"workflowv2"`
Entity struct {
Retention string `toml:"retention" comment:"Retention (in hours) of ascode entity for on non head commit" json:"retention" default:"24h"`
} `toml:"entity" comment:"######################\n 'Entity' global configuration \n######################" json:"entity"`
Project struct {
CreationDisabled bool `toml:"creationDisabled" comment:"Disable project creation for CDS non admin users." json:"creationDisabled" default:"false" commented:"true"`
InfoCreationDisabled string `toml:"infoCreationDisabled" comment:"Optional message to display if project creation is disabled." json:"infoCreationDisabled" default:"" commented:"true"`
Expand Down Expand Up @@ -496,6 +499,14 @@ func (a *API) Serve(ctx context.Context) error {

a.StartupTime = time.Now()

if a.Config.Entity.Retention == "" {
a.Config.Entity.Retention = "24h"
}
entityRetention, err := time.ParseDuration(a.Config.Entity.Retention)
if err != nil {
return sdk.WrapError(err, "wrong entity retention %s, bad format.", a.Config.Entity.Retention)
}

// Checking downloadable binaries
if err := download.Init(ctx, a.getDownloadConf()); err != nil {
return sdk.WrapError(err, "unable to initialize downloadable binaries")
Expand Down Expand Up @@ -586,7 +597,6 @@ func (a *API) Serve(ctx context.Context) error {
}

// API Storage will be a public integration
var err error
a.SharedStorage, err = objectstore.Init(ctx, cfg)
if err != nil {
return fmt.Errorf("cannot initialize storage: %v", err)
Expand Down Expand Up @@ -926,7 +936,7 @@ func (a *API) Serve(ctx context.Context) error {
workflow.ResyncWorkflowRunResultsRoutine(ctx, a.mustDB, a.Cache, 5*time.Second)
})
a.GoRoutines.RunWithRestart(ctx, "project.CleanAsCodeEntities", func(ctx context.Context) {
a.cleanProjectEntities(ctx, 1*time.Hour)
a.cleanProjectEntities(ctx, 1*time.Minute, entityRetention)
})

a.GoRoutines.RunWithRestart(ctx, "worker.DeleteDisabledWorkers", func(ctx context.Context) {
Expand Down
1 change: 1 addition & 0 deletions engine/api/api_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ func (api *API) InitRouter() {
r.Handle("/v2/project/{projectKey}/vcs/{vcsIdentifier}/repository/{repositoryIdentifier}/entities", nil, r.GETv2(api.getProjectEntitiesHandler))
r.Handle("/v2/project/{projectKey}/vcs/{vcsIdentifier}/repository/{repositoryIdentifier}/entities/{entityType}/{entityName}", nil, r.GETv2(api.getProjectEntityHandler))
r.Handle("/v2/project/{projectKey}/vcs/{vcsIdentifier}/repository/{repositoryIdentifier}/events", nil, r.GETv2(api.getProjectRepositoryEventsHandler))
r.Handle("/v2/project/{projectKey}/vcs/{vcsIdentifier}/repository/{repositoryIdentifier}/events/{eventID}", nil, r.GETv2(api.getProjectRepositoryEventHandler))
r.Handle("/v2/project/{projectKey}/vcs/{vcsIdentifier}/repository/{repositoryIdentifier}/workermodel", nil, r.GETv2(api.getWorkerModelsV2Handler))
r.Handle("/v2/project/{projectKey}/vcs/{vcsIdentifier}/repository/{repositoryIdentifier}/workermodel/{workerModelName}", nil, r.GETv2(api.getWorkerModelV2Handler))
r.Handle("/v2/project/{projectKey}/vcs/{vcsIdentifier}/repository/{repositoryIdentifier}/workflow/{workflow}/run", nil, r.POSTv2(api.postWorkflowRunV2Handler))
Expand Down
9 changes: 5 additions & 4 deletions engine/api/v2_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func (api *API) postHookEventRetrieveSignKeyHandler() ([]service.RbacChecker, se
HookEventUUID: hookRetrieveSignKey.HookEventUUID,
SigningKeyCallback: &sdk.HookSigninKeyCallback{},
}
callback.SigningKeyCallback.Status = ope.Status
if ope.Status == sdk.OperationStatusDone {
callback.SigningKeyCallback.SemverCurrent = ope.Setup.Checkout.Result.Semver.Current
callback.SigningKeyCallback.SemverNext = ope.Setup.Checkout.Result.Semver.Next
Expand All @@ -141,7 +142,7 @@ func (api *API) postHookEventRetrieveSignKeyHandler() ([]service.RbacChecker, se
callback.SigningKeyCallback.Error = ope.Setup.Checkout.Result.Msg + fmt.Sprintf("(Operation ID: %s)", ope.UUID)
}
} else {
callback.SigningKeyCallback.Error = ope.Error.Message + fmt.Sprintf("(Operation ID: %s)", ope.UUID)
callback.SigningKeyCallback.Error = fmt.Sprintf("%v (Operation ID: %s)", ope.Error.From, ope.UUID)
}

if _, code, err := services.NewClient(srvs).DoJSONRequest(ctx, http.MethodPost, "/v2/repository/event/callback", callback, nil); err != nil {
Expand Down Expand Up @@ -242,7 +243,7 @@ func LoadWorkflowHooksWithModelUpdate(ctx context.Context, db gorp.SqlExecutor,
return nil, err
}
for _, h := range entitiesHooks {
if h.Ref == hookRequest.Ref {
if h.Ref == hookRequest.Ref && hookRequest.Sha == h.Commit {
filteredWorkflowHooks = append(filteredWorkflowHooks, h)
}
}
Expand All @@ -263,7 +264,7 @@ func LoadWorkflowHooksWithWorkflowUpdate(ctx context.Context, db gorp.SqlExecuto
return nil, err
}
// check of event come from the right branch
if hookRequest.Ref == h.Ref {
if hookRequest.Ref == h.Ref && hookRequest.Sha == h.Commit {
filteredWorkflowHooks = append(filteredWorkflowHooks, *h)
}
}
Expand All @@ -286,7 +287,7 @@ func LoadWorkflowHooksWithRepositoryWebHooks(ctx context.Context, db gorp.SqlExe
// If event && workflow declaration are on the same repo
if w.VCSName == hookRequest.VCSName && w.RepositoryName == hookRequest.RepositoryName {
// Only get workflow configuration from current branch
if w.Ref != hookRequest.Ref {
if w.Ref != hookRequest.Ref || w.Commit != hookRequest.Sha {
continue
}
}
Expand Down
1 change: 1 addition & 0 deletions engine/api/v2_hooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ func TestPostRetrieveWorkflowToTriggerHandler_WorkerModels(t *testing.T) {
RepositoryName: repo.Name,
VCSName: vcs.Name,
Ref: "refs/heads/master",
Sha: "123456",
RepositoryEventName: sdk.WorkflowHookEventPush,
AnayzedProjectKeys: []string{p.Key},
Models: []sdk.EntityFullName{
Expand Down
84 changes: 60 additions & 24 deletions engine/api/v2_project_clean_ascode.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ import (
)

type EntitiesCleaner struct {
projKey string
vcsName string
repoName string
refs map[string]struct{}
projKey string
vcsName string
repoName string
refs map[string]string
retention time.Duration
}

func (a *API) cleanProjectEntities(ctx context.Context, delay time.Duration) {
func (a *API) cleanProjectEntities(ctx context.Context, delay time.Duration, entityRetention time.Duration) {
ticker := time.NewTicker(delay)
defer ticker.Stop()

Expand All @@ -48,7 +49,7 @@ func (a *API) cleanProjectEntities(ctx context.Context, delay time.Duration) {
for w := 0; w < 10; w++ {
a.GoRoutines.Exec(ctx, "cleanProjectEntities-"+strconv.Itoa(w), func(ctx context.Context) {
for pKey := range inputChan {
if err := workerCleanProject(ctx, a.mustDB(), a.Cache, pKey); err != nil {
if err := workerCleanProject(ctx, a.mustDB(), a.Cache, pKey, entityRetention); err != nil {
log.ErrorWithStackTrace(ctx, err)
}
resultChan <- true
Expand All @@ -66,7 +67,7 @@ func (a *API) cleanProjectEntities(ctx context.Context, delay time.Duration) {
}
}

func workerCleanProject(ctx context.Context, db *gorp.DbMap, store cache.Store, pKey string) error {
func workerCleanProject(ctx context.Context, db *gorp.DbMap, store cache.Store, pKey string, entityRetention time.Duration) error {
ctx = context.WithValue(ctx, cdslog.Action, "workerCleanProject")
ctx = context.WithValue(ctx, "action_metadata_project_key", pKey)
log.Info(ctx, "Clean ascode entities on project %s", pKey)
Expand All @@ -79,13 +80,13 @@ func workerCleanProject(ctx context.Context, db *gorp.DbMap, store cache.Store,
return nil
}
defer store.Unlock(lockKey)
if err := cleanAscodeProject(ctx, db, store, pKey); err != nil {
if err := cleanAscodeProject(ctx, db, store, pKey, entityRetention); err != nil {
return err
}
return nil
}

func cleanAscodeProject(ctx context.Context, db *gorp.DbMap, store cache.Store, pKey string) error {
func cleanAscodeProject(ctx context.Context, db *gorp.DbMap, store cache.Store, pKey string, entityRetention time.Duration) error {
vcsRepos, err := vcs.LoadAllVCSByProject(ctx, db, pKey)
if err != nil {
return err
Expand Down Expand Up @@ -116,22 +117,30 @@ func cleanAscodeProject(ctx context.Context, db *gorp.DbMap, store cache.Store,
}

cleaner := &EntitiesCleaner{
projKey: pKey,
vcsName: vcsServer.Name,
repoName: r.Name,
refs: make(map[string]struct{}),
projKey: pKey,
vcsName: vcsServer.Name,
repoName: r.Name,
refs: make(map[string]string),
retention: entityRetention,
}
if err := cleaner.getBranches(ctx, db, store); err != nil {
return err
}

for branchName, branchEntities := range entitiesByRef {
if err := cleaner.cleanEntitiesByRef(ctx, db, store, branchName, branchEntities); err != nil {
return err
// Clean entities that exists on deleted branches
if currentHEAD, has := cleaner.refs[branchName]; has {
// Clean non head commits on existing branch
if err := cleaner.cleanNonHeadEntities(ctx, db, store, branchName, currentHEAD, branchEntities); err != nil {
return err
}
} else {
if err := cleaner.cleanEntitiesByDeletedRef(ctx, db, store, branchName, branchEntities); err != nil {
return err
}
}
}

// TODO manage tags
}
}
}
return nil
Expand All @@ -148,22 +157,22 @@ func (c *EntitiesCleaner) getBranches(ctx context.Context, db *gorp.DbMap, store
return err
}

c.refs = make(map[string]struct{})
c.refs = make(map[string]string)
for _, b := range branches {
c.refs[b.ID] = struct{}{}
c.refs[b.ID] = b.LatestCommit
}

tags, err := vcsClient.Tags(ctx, c.repoName)
if err != nil {
return err
}
for _, t := range tags {
c.refs[sdk.GitRefTagPrefix+t.Tag] = struct{}{}
c.refs[sdk.GitRefTagPrefix+t.Tag] = t.Hash
}
return nil
}

func (c *EntitiesCleaner) cleanEntitiesByRef(ctx context.Context, db *gorp.DbMap, store cache.Store, ref string, entitiesByBranch []sdk.Entity) error {
func (c *EntitiesCleaner) cleanNonHeadEntities(ctx context.Context, db *gorp.DbMap, store cache.Store, ref string, refHeadCommit string, entitiesByBranch []sdk.Entity) error {
deletedEntities := make([]sdk.Entity, 0)

tx, err := db.Begin()
Expand All @@ -172,9 +181,9 @@ func (c *EntitiesCleaner) cleanEntitiesByRef(ctx context.Context, db *gorp.DbMap
}
defer tx.Rollback()

if _, has := c.refs[ref]; !has {
log.Info(ctx, "Deleting entities on %s / %s / %s @%s", c.projKey, c.vcsName, c.repoName, ref)
for _, e := range entitiesByBranch {
log.Info(ctx, "Deleting entities on %s / %s / %s @%s", c.projKey, c.vcsName, c.repoName, ref)
for _, e := range entitiesByBranch {
if e.Commit != "HEAD" && e.Commit != refHeadCommit && time.Since(e.LastUpdate) > c.retention {
if err := entity.Delete(ctx, tx, &e); err != nil {
return err
}
Expand All @@ -191,3 +200,30 @@ func (c *EntitiesCleaner) cleanEntitiesByRef(ctx context.Context, db *gorp.DbMap
}
return nil
}

func (c *EntitiesCleaner) cleanEntitiesByDeletedRef(ctx context.Context, db *gorp.DbMap, store cache.Store, ref string, entitiesByBranch []sdk.Entity) error {
deletedEntities := make([]sdk.Entity, 0)

tx, err := db.Begin()
if err != nil {
return sdk.WithStack(err)
}
defer tx.Rollback()

log.Info(ctx, "Deleting entities on %s / %s / %s @%s", c.projKey, c.vcsName, c.repoName, ref)
for _, e := range entitiesByBranch {
if err := entity.Delete(ctx, tx, &e); err != nil {
return err
}
deletedEntities = append(deletedEntities, e)
}

if err := tx.Commit(); err != nil {
return sdk.WithStack(tx.Commit())
}

for _, e := range deletedEntities {
event_v2.PublishEntityEvent(ctx, store, sdk.EventEntityDeleted, c.vcsName, c.repoName, e, nil)
}
return nil
}
2 changes: 1 addition & 1 deletion engine/api/v2_project_clean_ascode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ spec:
servicesClients.EXPECT().
DoJSONRequest(gomock.Any(), "GET", "/vcs/the-name/repos/myrepo/tags", gomock.Any(), gomock.Any(), gomock.Any()).MaxTimes(1)

err = workerCleanProject(context.TODO(), db.DbMap, api.Cache, p.Key)
err = workerCleanProject(context.TODO(), db.DbMap, api.Cache, p.Key, time.Minute)
require.NoError(t, err)

_, err = entity.LoadByRefTypeNameCommit(context.TODO(), db, repo.ID, "refs/heads/temp", sdk.EntityTypeWorkerModel, "model1", "123456")
Expand Down
Loading

0 comments on commit 3d3d7e1

Please sign in to comment.