Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api,hooks): add manual workflow trigger #6911

Merged
merged 25 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 64 additions & 17 deletions cli/cdsctl/experimental_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package main

import (
"context"
"encoding/json"
"fmt"
"os"
"strings"
"time"

"github.com/rockbears/yaml"
"github.com/spf13/cobra"
Expand All @@ -22,7 +22,7 @@ var experimentalWorkflowCmd = cli.Command{

func experimentalWorkflow() *cobra.Command {
return cli.NewCommand(experimentalWorkflowCmd, nil, []*cobra.Command{
cli.NewCommand(workflowRunCmd, workflowRunFunc, nil, withAllCommandModifiers()...),
cli.NewGetCommand(workflowRunCmd, workflowRunFunc, nil, withAllCommandModifiers()...),
cli.NewCommand(workflowRestartCmd, workflowRestartFunc, nil, withAllCommandModifiers()...),
cli.NewListCommand(workflowRunHistoryCmd, workflowRunHistoryFunc, nil, withAllCommandModifiers()...),
cli.NewListCommand(workflowRunInfosListCmd, workflowRunInfosListFunc, nil, withAllCommandModifiers()...),
Expand Down Expand Up @@ -199,36 +199,83 @@ var workflowRunCmd = cli.Command{
Name: "tag",
},
{
Name: "data",
Default: "{}",
Name: "commit",
},
{
Name: "workflow-branch",
},
{
Name: "workflow-tag",
},
},
}

func workflowRunFunc(v cli.Values) error {
func workflowRunFunc(v cli.Values) (interface{}, error) {
projKey := v.GetString("proj_key")
vcsId := v.GetString("vcs_identifier")
repoId := v.GetString("repo_identifier")
wkfName := v.GetString("workflow_name")
branch := v.GetString("branch")
tag := v.GetString("tag")
data := v.GetString("data")
destBranch := v.GetString("branch")
destTag := v.GetString("tag")
destCommit := v.GetString("commit")
workflowBranch := v.GetString("workflow-branch")
workflowTag := v.GetString("workflow-tag")

if tag != "" && branch != "" {
return fmt.Errorf("you cannot use branch and tag together")
if destBranch != "" && destTag != "" {
return nil, fmt.Errorf("you cannot use branch and tag together")
}

var payload map[string]interface{}
if err := json.Unmarshal([]byte(data), &payload); err != nil {
return fmt.Errorf("unable to read json data")
if workflowBranch != "" && workflowTag != "" {
return nil, fmt.Errorf("you cannot use workflow-branch and workflow-tag together")
}

run, err := client.WorkflowV2Run(context.Background(), projKey, vcsId, repoId, wkfName, payload, cdsclient.WithQueryParameter("branch", branch), cdsclient.WithQueryParameter("tag", tag))
payload := sdk.V2WorkflowRunManualRequest{
Branch: destBranch,
Tag: destTag,
Sha: destCommit,
WorkflowBranch: workflowBranch,
WorkflowTag: workflowTag,
}

hookRunEvent, err := client.WorkflowV2Run(context.Background(), projKey, vcsId, repoId, wkfName, payload)
if err != nil {
return err
return nil, err
}

type run struct {
Workflow string `json:"workflow" cli:"workflow"`
RunNumber int64 `json:"run_number" cli:"run_number"`
RunID string `json:"run_id" cli:"run_id"`
Error string `json:"error" cli:"error"`
}

retry := 0
for {
event, err := client.ProjectRepositoryEvent(context.Background(), projKey, vcsId, repoId, hookRunEvent.UUID)
if err != nil {
return nil, err
}
if event.Status == sdk.HookEventStatusDone {
if len(event.WorkflowHooks) == 1 {
return run{
Workflow: wkfName,
RunNumber: event.WorkflowHooks[0].RunNumber,
RunID: event.WorkflowHooks[0].RunID,
}, nil
}
return nil, fmt.Errorf("workflow did not start")
}
if event.Status == sdk.HookEventStatusError || event.Status == sdk.HookEventStatusSkipped {
return run{
Error: event.LastError,
}, nil
}
retry++
if retry > 90 {
return nil, fmt.Errorf("workflow take too much time to start")
}
time.Sleep(1 * time.Second)
}
fmt.Printf("Workflow %s #%d started", run.WorkflowName, run.RunNumber)
return nil
}

var workflowRestartCmd = cli.Command{
Expand Down
14 changes: 12 additions & 2 deletions engine/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,9 @@ type Configuration struct {
WorkflowV2 struct {
JobSchedulingTimeout int64 `toml:"jobSchedulingTimeout" comment:"Timeout delay for job scheduling (in seconds)" json:"jobSchedulingTimeout" default:"600"`
} `toml:"workflowv2" comment:"######################\n 'Workflow V2' global configuration \n######################" json:"workflowv2"`
Entity struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing toml annotation

Retention string `toml:"retention" comment:"Retention (in hours) of ascode entity for on non head commit" json:"retention" default:"24h"`
} `toml:"entity" comment:"######################\n 'Entity' global configuration \n######################" json:"entity"`
Project struct {
CreationDisabled bool `toml:"creationDisabled" comment:"Disable project creation for CDS non admin users." json:"creationDisabled" default:"false" commented:"true"`
InfoCreationDisabled string `toml:"infoCreationDisabled" comment:"Optional message to display if project creation is disabled." json:"infoCreationDisabled" default:"" commented:"true"`
Expand Down Expand Up @@ -496,6 +499,14 @@ func (a *API) Serve(ctx context.Context) error {

a.StartupTime = time.Now()

if a.Config.Entity.Retention == "" {
a.Config.Entity.Retention = "24h"
}
entityRetention, err := time.ParseDuration(a.Config.Entity.Retention)
if err != nil {
return sdk.WrapError(err, "wrong entity retention %s, bad format.", a.Config.Entity.Retention)
}

// Checking downloadable binaries
if err := download.Init(ctx, a.getDownloadConf()); err != nil {
return sdk.WrapError(err, "unable to initialize downloadable binaries")
Expand Down Expand Up @@ -586,7 +597,6 @@ func (a *API) Serve(ctx context.Context) error {
}

// API Storage will be a public integration
var err error
a.SharedStorage, err = objectstore.Init(ctx, cfg)
if err != nil {
return fmt.Errorf("cannot initialize storage: %v", err)
Expand Down Expand Up @@ -926,7 +936,7 @@ func (a *API) Serve(ctx context.Context) error {
workflow.ResyncWorkflowRunResultsRoutine(ctx, a.mustDB, a.Cache, 5*time.Second)
})
a.GoRoutines.RunWithRestart(ctx, "project.CleanAsCodeEntities", func(ctx context.Context) {
a.cleanProjectEntities(ctx, 1*time.Hour)
a.cleanProjectEntities(ctx, 1*time.Minute, entityRetention)
})

a.GoRoutines.RunWithRestart(ctx, "worker.DeleteDisabledWorkers", func(ctx context.Context) {
Expand Down
1 change: 1 addition & 0 deletions engine/api/api_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ func (api *API) InitRouter() {
r.Handle("/v2/project/{projectKey}/vcs/{vcsIdentifier}/repository/{repositoryIdentifier}/entities", nil, r.GETv2(api.getProjectEntitiesHandler))
r.Handle("/v2/project/{projectKey}/vcs/{vcsIdentifier}/repository/{repositoryIdentifier}/entities/{entityType}/{entityName}", nil, r.GETv2(api.getProjectEntityHandler))
r.Handle("/v2/project/{projectKey}/vcs/{vcsIdentifier}/repository/{repositoryIdentifier}/events", nil, r.GETv2(api.getProjectRepositoryEventsHandler))
r.Handle("/v2/project/{projectKey}/vcs/{vcsIdentifier}/repository/{repositoryIdentifier}/events/{eventID}", nil, r.GETv2(api.getProjectRepositoryEventHandler))
r.Handle("/v2/project/{projectKey}/vcs/{vcsIdentifier}/repository/{repositoryIdentifier}/workermodel", nil, r.GETv2(api.getWorkerModelsV2Handler))
r.Handle("/v2/project/{projectKey}/vcs/{vcsIdentifier}/repository/{repositoryIdentifier}/workermodel/{workerModelName}", nil, r.GETv2(api.getWorkerModelV2Handler))
r.Handle("/v2/project/{projectKey}/vcs/{vcsIdentifier}/repository/{repositoryIdentifier}/workflow/{workflow}/run", nil, r.POSTv2(api.postWorkflowRunV2Handler))
Expand Down
9 changes: 5 additions & 4 deletions engine/api/v2_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func (api *API) postHookEventRetrieveSignKeyHandler() ([]service.RbacChecker, se
HookEventUUID: hookRetrieveSignKey.HookEventUUID,
SigningKeyCallback: &sdk.HookSigninKeyCallback{},
}
callback.SigningKeyCallback.Status = ope.Status
if ope.Status == sdk.OperationStatusDone {
callback.SigningKeyCallback.SemverCurrent = ope.Setup.Checkout.Result.Semver.Current
callback.SigningKeyCallback.SemverNext = ope.Setup.Checkout.Result.Semver.Next
Expand All @@ -141,7 +142,7 @@ func (api *API) postHookEventRetrieveSignKeyHandler() ([]service.RbacChecker, se
callback.SigningKeyCallback.Error = ope.Setup.Checkout.Result.Msg + fmt.Sprintf("(Operation ID: %s)", ope.UUID)
}
} else {
callback.SigningKeyCallback.Error = ope.Error.Message + fmt.Sprintf("(Operation ID: %s)", ope.UUID)
callback.SigningKeyCallback.Error = fmt.Sprintf("%v (Operation ID: %s)", ope.Error.From, ope.UUID)
}

if _, code, err := services.NewClient(srvs).DoJSONRequest(ctx, http.MethodPost, "/v2/repository/event/callback", callback, nil); err != nil {
Expand Down Expand Up @@ -242,7 +243,7 @@ func LoadWorkflowHooksWithModelUpdate(ctx context.Context, db gorp.SqlExecutor,
return nil, err
}
for _, h := range entitiesHooks {
if h.Ref == hookRequest.Ref {
if h.Ref == hookRequest.Ref && hookRequest.Sha == h.Commit {
filteredWorkflowHooks = append(filteredWorkflowHooks, h)
}
}
Expand All @@ -263,7 +264,7 @@ func LoadWorkflowHooksWithWorkflowUpdate(ctx context.Context, db gorp.SqlExecuto
return nil, err
}
// check of event come from the right branch
if hookRequest.Ref == h.Ref {
if hookRequest.Ref == h.Ref && hookRequest.Sha == h.Commit {
filteredWorkflowHooks = append(filteredWorkflowHooks, *h)
}
}
Expand All @@ -286,7 +287,7 @@ func LoadWorkflowHooksWithRepositoryWebHooks(ctx context.Context, db gorp.SqlExe
// If event && workflow declaration are on the same repo
if w.VCSName == hookRequest.VCSName && w.RepositoryName == hookRequest.RepositoryName {
// Only get workflow configuration from current branch
if w.Ref != hookRequest.Ref {
if w.Ref != hookRequest.Ref || w.Commit != hookRequest.Sha {
continue
}
}
Expand Down
1 change: 1 addition & 0 deletions engine/api/v2_hooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ func TestPostRetrieveWorkflowToTriggerHandler_WorkerModels(t *testing.T) {
RepositoryName: repo.Name,
VCSName: vcs.Name,
Ref: "refs/heads/master",
Sha: "123456",
RepositoryEventName: sdk.WorkflowHookEventPush,
AnayzedProjectKeys: []string{p.Key},
Models: []sdk.EntityFullName{
Expand Down
84 changes: 60 additions & 24 deletions engine/api/v2_project_clean_ascode.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ import (
)

type EntitiesCleaner struct {
projKey string
vcsName string
repoName string
refs map[string]struct{}
projKey string
vcsName string
repoName string
refs map[string]string
retention time.Duration
}

func (a *API) cleanProjectEntities(ctx context.Context, delay time.Duration) {
func (a *API) cleanProjectEntities(ctx context.Context, delay time.Duration, entityRetention time.Duration) {
ticker := time.NewTicker(delay)
defer ticker.Stop()

Expand All @@ -48,7 +49,7 @@ func (a *API) cleanProjectEntities(ctx context.Context, delay time.Duration) {
for w := 0; w < 10; w++ {
a.GoRoutines.Exec(ctx, "cleanProjectEntities-"+strconv.Itoa(w), func(ctx context.Context) {
for pKey := range inputChan {
if err := workerCleanProject(ctx, a.mustDB(), a.Cache, pKey); err != nil {
if err := workerCleanProject(ctx, a.mustDB(), a.Cache, pKey, entityRetention); err != nil {
log.ErrorWithStackTrace(ctx, err)
}
resultChan <- true
Expand All @@ -66,7 +67,7 @@ func (a *API) cleanProjectEntities(ctx context.Context, delay time.Duration) {
}
}

func workerCleanProject(ctx context.Context, db *gorp.DbMap, store cache.Store, pKey string) error {
func workerCleanProject(ctx context.Context, db *gorp.DbMap, store cache.Store, pKey string, entityRetention time.Duration) error {
ctx = context.WithValue(ctx, cdslog.Action, "workerCleanProject")
ctx = context.WithValue(ctx, "action_metadata_project_key", pKey)
log.Info(ctx, "Clean ascode entities on project %s", pKey)
Expand All @@ -79,13 +80,13 @@ func workerCleanProject(ctx context.Context, db *gorp.DbMap, store cache.Store,
return nil
}
defer store.Unlock(lockKey)
if err := cleanAscodeProject(ctx, db, store, pKey); err != nil {
if err := cleanAscodeProject(ctx, db, store, pKey, entityRetention); err != nil {
return err
}
return nil
}

func cleanAscodeProject(ctx context.Context, db *gorp.DbMap, store cache.Store, pKey string) error {
func cleanAscodeProject(ctx context.Context, db *gorp.DbMap, store cache.Store, pKey string, entityRetention time.Duration) error {
vcsRepos, err := vcs.LoadAllVCSByProject(ctx, db, pKey)
if err != nil {
return err
Expand Down Expand Up @@ -116,22 +117,30 @@ func cleanAscodeProject(ctx context.Context, db *gorp.DbMap, store cache.Store,
}

cleaner := &EntitiesCleaner{
projKey: pKey,
vcsName: vcsServer.Name,
repoName: r.Name,
refs: make(map[string]struct{}),
projKey: pKey,
vcsName: vcsServer.Name,
repoName: r.Name,
refs: make(map[string]string),
retention: entityRetention,
}
if err := cleaner.getBranches(ctx, db, store); err != nil {
return err
}

for branchName, branchEntities := range entitiesByRef {
if err := cleaner.cleanEntitiesByRef(ctx, db, store, branchName, branchEntities); err != nil {
return err
// Clean entities that exists on deleted branches
if currentHEAD, has := cleaner.refs[branchName]; has {
// Clean non head commits on existing branch
if err := cleaner.cleanNonHeadEntities(ctx, db, store, branchName, currentHEAD, branchEntities); err != nil {
return err
}
} else {
if err := cleaner.cleanEntitiesByDeletedRef(ctx, db, store, branchName, branchEntities); err != nil {
return err
}
}
}

// TODO manage tags
}
}
}
return nil
Expand All @@ -148,22 +157,22 @@ func (c *EntitiesCleaner) getBranches(ctx context.Context, db *gorp.DbMap, store
return err
}

c.refs = make(map[string]struct{})
c.refs = make(map[string]string)
for _, b := range branches {
c.refs[b.ID] = struct{}{}
c.refs[b.ID] = b.LatestCommit
}

tags, err := vcsClient.Tags(ctx, c.repoName)
if err != nil {
return err
}
for _, t := range tags {
c.refs[sdk.GitRefTagPrefix+t.Tag] = struct{}{}
c.refs[sdk.GitRefTagPrefix+t.Tag] = t.Hash
}
return nil
}

func (c *EntitiesCleaner) cleanEntitiesByRef(ctx context.Context, db *gorp.DbMap, store cache.Store, ref string, entitiesByBranch []sdk.Entity) error {
func (c *EntitiesCleaner) cleanNonHeadEntities(ctx context.Context, db *gorp.DbMap, store cache.Store, ref string, refHeadCommit string, entitiesByBranch []sdk.Entity) error {
deletedEntities := make([]sdk.Entity, 0)

tx, err := db.Begin()
Expand All @@ -172,9 +181,9 @@ func (c *EntitiesCleaner) cleanEntitiesByRef(ctx context.Context, db *gorp.DbMap
}
defer tx.Rollback()

if _, has := c.refs[ref]; !has {
log.Info(ctx, "Deleting entities on %s / %s / %s @%s", c.projKey, c.vcsName, c.repoName, ref)
for _, e := range entitiesByBranch {
log.Info(ctx, "Deleting entities on %s / %s / %s @%s", c.projKey, c.vcsName, c.repoName, ref)
for _, e := range entitiesByBranch {
if e.Commit != "HEAD" && e.Commit != refHeadCommit && time.Since(e.LastUpdate) > c.retention {
if err := entity.Delete(ctx, tx, &e); err != nil {
return err
}
Expand All @@ -191,3 +200,30 @@ func (c *EntitiesCleaner) cleanEntitiesByRef(ctx context.Context, db *gorp.DbMap
}
return nil
}

func (c *EntitiesCleaner) cleanEntitiesByDeletedRef(ctx context.Context, db *gorp.DbMap, store cache.Store, ref string, entitiesByBranch []sdk.Entity) error {
deletedEntities := make([]sdk.Entity, 0)

tx, err := db.Begin()
if err != nil {
return sdk.WithStack(err)
}
defer tx.Rollback()

log.Info(ctx, "Deleting entities on %s / %s / %s @%s", c.projKey, c.vcsName, c.repoName, ref)
for _, e := range entitiesByBranch {
if err := entity.Delete(ctx, tx, &e); err != nil {
return err
}
deletedEntities = append(deletedEntities, e)
}

if err := tx.Commit(); err != nil {
return sdk.WithStack(tx.Commit())
}

for _, e := range deletedEntities {
event_v2.PublishEntityEvent(ctx, store, sdk.EventEntityDeleted, c.vcsName, c.repoName, e, nil)
}
return nil
}
2 changes: 1 addition & 1 deletion engine/api/v2_project_clean_ascode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ spec:
servicesClients.EXPECT().
DoJSONRequest(gomock.Any(), "GET", "/vcs/the-name/repos/myrepo/tags", gomock.Any(), gomock.Any(), gomock.Any()).MaxTimes(1)

err = workerCleanProject(context.TODO(), db.DbMap, api.Cache, p.Key)
err = workerCleanProject(context.TODO(), db.DbMap, api.Cache, p.Key, time.Minute)
require.NoError(t, err)

_, err = entity.LoadByRefTypeNameCommit(context.TODO(), db, repo.ID, "refs/heads/temp", sdk.EntityTypeWorkerModel, "model1", "123456")
Expand Down
Loading