Skip to content

Commit

Permalink
feat(api): template bulk update parallel (#6393)
Browse files Browse the repository at this point in the history
  • Loading branch information
richardlt authored Dec 21, 2022
1 parent 2c16282 commit 0da7754
Show file tree
Hide file tree
Showing 21 changed files with 574 additions and 216 deletions.
12 changes: 10 additions & 2 deletions cli/cdsctl/template_bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ var templateBulkCmd = cli.Command{
Name: "track",
Usage: "Wait the bulk to be over",
},
{
Type: cli.FlagBool,
Name: "parallel",
Usage: "Apply template on workflow in parallel",
},
},
}

Expand Down Expand Up @@ -514,7 +519,10 @@ func templateBulkRun(v cli.Values) error {
}

// send bulk request
b := sdk.WorkflowTemplateBulk{Operations: make([]sdk.WorkflowTemplateBulkOperation, len(moperations))}
b := sdk.WorkflowTemplateBulk{
Parallel: v.GetBool("parallel"),
Operations: make([]sdk.WorkflowTemplateBulkOperation, len(moperations)),
}
i := 0
for _, o := range moperations {
b.Operations[i] = o
Expand Down Expand Up @@ -570,7 +578,7 @@ func templateBulkRun(v cli.Values) error {
}

time.Sleep(500 * time.Millisecond)
if res.IsDone() {
if res.Status == sdk.OperationStatusDone || res.Status == sdk.OperationStatusError {
break
}
}
Expand Down
13 changes: 10 additions & 3 deletions engine/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,10 @@ type Configuration struct {
Error string `toml:"error" comment:"Help displayed to user on each error. Warning: this message could be view by anonymous user. Markdown accepted." json:"error" default:""`
} `toml:"help" comment:"######################\n 'Help' informations \n######################" json:"help"`
Workflow struct {
MaxRuns int64 `toml:"maxRuns" comment:"Maximum of runs by workflow" json:"maxRuns" default:"255"`
DefaultRetentionPolicy string `toml:"defaultRetentionPolicy" comment:"Default rule for workflow run retention policy, this rule can be overridden on each workflow.\n Example: 'return run_days_before < 365' keeps runs for one year." json:"defaultRetentionPolicy" default:"return run_days_before < 365"`
DisablePurgeDeletion bool `toml:"disablePurgeDeletion" comment:"Allow you to disable the deletion part of the purge. Workflow run will only be marked as delete" json:"disablePurgeDeletion" default:"false"`
MaxRuns int64 `toml:"maxRuns" comment:"Maximum of runs by workflow" json:"maxRuns" default:"255"`
DefaultRetentionPolicy string `toml:"defaultRetentionPolicy" comment:"Default rule for workflow run retention policy, this rule can be overridden on each workflow.\n Example: 'return run_days_before < 365' keeps runs for one year." json:"defaultRetentionPolicy" default:"return run_days_before < 365"`
DisablePurgeDeletion bool `toml:"disablePurgeDeletion" comment:"Allow you to disable the deletion part of the purge. Workflow run will only be marked as delete" json:"disablePurgeDeletion" default:"false"`
TemplateBulkRunnerCount int64 `toml:"templateBulkRunnerCount" comment:"The count of runner that will execute the workflow template bulk operation." json:"templateBulkRunnerCount" default:"10"`
} `toml:"workflow" comment:"######################\n 'Workflow' global configuration \n######################" json:"workflow"`
EventBus event.Config `toml:"events" comment:"######################\n Event bus configuration \n######################" json:"events" mapstructure:"events"`
}
Expand Down Expand Up @@ -798,6 +799,12 @@ func (a *API) Serve(ctx context.Context) error {
a.cleanWorkflowRunSecrets(ctx)
})
}
chanWorkflowTemplateBulkOperation := make(chan WorkflowTemplateBulkOperation)
defer close(chanWorkflowTemplateBulkOperation)
a.GoRoutines.RunWithRestart(ctx, "api.WorkflowTemplateBulk", func(ctx context.Context) {
a.WorkflowTemplateBulk(ctx, 100*time.Millisecond, chanWorkflowTemplateBulkOperation)
})
a.WorkflowTemplateBulkOperation(ctx, a.Config.Workflow.MaxRuns, chanWorkflowTemplateBulkOperation)

log.Info(ctx, "Bootstrapping database...")
defaultValues := sdk.DefaultValues{
Expand Down
2 changes: 1 addition & 1 deletion engine/api/environment/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func LoadByWorkflowID(db gorp.SqlExecutor, workflowID int64) ([]sdk.Environment,
return envs, nil
}

//Exists checks if an environment already exists on the project
// Exists checks if an environment already exists on the project
func Exists(db gorp.SqlExecutor, projectKey, envName string) (bool, error) {
var n int
query := `SELECT count(1)
Expand Down
8 changes: 4 additions & 4 deletions engine/api/environment/environment_importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ import (
"github.com/ovh/cds/sdk"
)

//Import import or reuser the provided environment
// Import import or reuser the provided environment
func Import(db gorpmapper.SqlExecutorWithTx, proj sdk.Project, env *sdk.Environment, msgChan chan<- sdk.Message, u sdk.Identifiable) error {
exists, err := Exists(db, proj.Key, env.Name)
if err != nil {
return err
}

//If environment exists, reload it
// If environment exists, reload it
if exists {
if msgChan != nil {
msgChan <- sdk.NewMessage(sdk.MsgEnvironmentExists, env.Name)
Expand All @@ -37,7 +37,7 @@ func Import(db gorpmapper.SqlExecutorWithTx, proj sdk.Project, env *sdk.Environm
env.ProjectID = proj.ID
env.ProjectKey = proj.Key
if err := InsertEnvironment(db, env); err != nil {
return sdk.WrapError(err, "Unable to create env %s on project %s(%d) ", env.Name, env.ProjectKey, env.ProjectID)
return sdk.WrapError(err, "unable to create env %q on project %q", env.Name, env.ProjectKey)
}

//Insert all variables
Expand Down Expand Up @@ -65,7 +65,7 @@ func Import(db gorpmapper.SqlExecutorWithTx, proj sdk.Project, env *sdk.Environm
return nil
}

//ImportInto import variables and groups on an existing environment
// ImportInto import variables and groups on an existing environment
func ImportInto(ctx context.Context, db gorpmapper.SqlExecutorWithTx, env *sdk.Environment, into *sdk.Environment, msgChan chan<- sdk.Message, u sdk.Identifiable) error {
var updateVar = func(v *sdk.EnvironmentVariable) {
log.Debug(ctx, "ImportInto> Updating var %q with value %q", v.Name, v.Value)
Expand Down
192 changes: 6 additions & 186 deletions engine/api/templates.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,9 +476,6 @@ func (api *API) postTemplateBulkHandler() service.Handler {
return err
}

branch := FormString(r, "branch")
message := FormString(r, "message")

// check all requests
var req sdk.WorkflowTemplateBulk
if err := service.UnmarshalBody(r, &req); err != nil {
Expand Down Expand Up @@ -512,199 +509,22 @@ func (api *API) postTemplateBulkHandler() service.Handler {

// store the bulk request
bulk := sdk.WorkflowTemplateBulk{
Parallel: req.Parallel,
UserID: consumer.AuthConsumerUser.AuthentifiedUser.ID,
AuthConsumerID: consumer.ID,
WorkflowTemplateID: wt.ID,
Operations: make([]sdk.WorkflowTemplateBulkOperation, len(req.Operations)),
}
for i := range req.Operations {
bulk.Operations[i].Status = sdk.OperationStatusPending
bulk.Operations[i].Request = req.Operations[i].Request
bulk.Operations[i].Request.Branch = FormString(r, "branch")
bulk.Operations[i].Request.Message = FormString(r, "message")
}
if err := workflowtemplate.InsertBulk(api.mustDB(), &bulk); err != nil {
return err
}

// start async bulk tasks
api.GoRoutines.Exec(context.Background(), "api.templateBulkApply", func(ctx context.Context) {
for i := range bulk.Operations {
if bulk.Operations[i].Status == sdk.OperationStatusPending {
bulk.Operations[i].Status = sdk.OperationStatusProcessing
if err := workflowtemplate.UpdateBulk(api.mustDB(), &bulk); err != nil {
log.Error(ctx, "%v", err)
return
}

errorDefer := func(err error) error {
if err != nil {
err = sdk.WrapError(err, "error occurred in template bulk with id %d", bulk.ID)
ctx = sdk.ContextWithStacktrace(ctx, err)
log.Error(ctx, "%v", err)
bulk.Operations[i].Status = sdk.OperationStatusError
bulk.Operations[i].Error = fmt.Sprintf("%s", sdk.Cause(err))
if err := workflowtemplate.UpdateBulk(api.mustDB(), &bulk); err != nil {
return err
}
}

return nil
}

// load project with key
p, err := project.Load(ctx, api.mustDB(), bulk.Operations[i].Request.ProjectKey,
project.LoadOptions.WithGroups,
project.LoadOptions.WithApplications,
project.LoadOptions.WithEnvironments,
project.LoadOptions.WithPipelines,
project.LoadOptions.WithApplicationWithDeploymentStrategies,
project.LoadOptions.WithIntegrations,
project.LoadOptions.WithClearKeys,
)
if err != nil {
if errD := errorDefer(err); errD != nil {
log.Error(ctx, "%v", errD)
return
}
continue
}

// apply and import workflow
data := exportentities.WorkflowComponents{
Template: exportentities.TemplateInstance{
Name: bulk.Operations[i].Request.WorkflowName,
From: wt.PathWithVersion(),
Parameters: bulk.Operations[i].Request.Parameters,
},
}

// In case we want to update a workflow that is ascode, we want to create a PR instead of pushing directly the new workflow.
wti, err := workflowtemplate.LoadInstanceByTemplateIDAndProjectIDAndRequestWorkflowName(ctx, api.mustDB(), wt.ID, p.ID, data.Template.Name)
if err != nil && !sdk.ErrorIs(err, sdk.ErrNotFound) {
if errD := errorDefer(err); errD != nil {
log.Error(ctx, "%v", errD)
return
}
continue
}
if wti != nil && wti.WorkflowID != nil {
existingWorkflow, err := workflow.LoadByID(ctx, api.mustDB(), api.Cache, *p, *wti.WorkflowID, workflow.LoadOptions{})
if err != nil {
if errD := errorDefer(err); errD != nil {
log.Error(ctx, "%v", errD)
return
}
continue
}
if existingWorkflow.FromRepository != "" {
var rootApp *sdk.Application
if existingWorkflow.WorkflowData.Node.Context != nil && existingWorkflow.WorkflowData.Node.Context.ApplicationID != 0 {
rootApp, err = application.LoadByIDWithClearVCSStrategyPassword(ctx, api.mustDB(), existingWorkflow.WorkflowData.Node.Context.ApplicationID)
if err != nil {
if errD := errorDefer(err); errD != nil {
log.Error(ctx, "%v", errD)
return
}
continue
}
}
if rootApp == nil {
if errD := errorDefer(sdk.NewErrorFrom(sdk.ErrWrongRequest, "cannot find the root application of the workflow")); errD != nil {
log.Error(ctx, "%v", errD)
return
}
continue
}

if branch == "" || message == "" {
if errD := errorDefer(sdk.NewErrorFrom(sdk.ErrWrongRequest, "missing branch or message data")); errD != nil {
log.Error(ctx, "%v", errD)
return
}
continue
}

tx, err := api.mustDB().Begin()
if err != nil {
if errD := errorDefer(err); errD != nil {
log.Error(ctx, "%v", errD)
return
}
continue
}
ope, err := operation.PushOperationUpdate(ctx, tx, api.Cache, *p, data, rootApp.VCSServer, rootApp.RepositoryFullname, branch, message, rootApp.RepositoryStrategy, consumer)
if err != nil {
tx.Rollback() // nolint
if errD := errorDefer(err); errD != nil {
log.Error(ctx, "%v", errD)
return
}
continue
}
if err := tx.Commit(); err != nil {
tx.Rollback() // nolint
if errD := errorDefer(err); errD != nil {
log.Error(ctx, "%v", errD)
return
}
continue
}

ed := ascode.EntityData{
Name: existingWorkflow.Name,
ID: existingWorkflow.ID,
Type: ascode.WorkflowEvent,
FromRepo: existingWorkflow.FromRepository,
OperationUUID: ope.UUID,
}
ascode.UpdateAsCodeResult(ctx, api.mustDB(), api.Cache, api.GoRoutines, *p, *existingWorkflow, *rootApp, ed, consumer)

bulk.Operations[i].Status = sdk.OperationStatusDone
if err := workflowtemplate.UpdateBulk(api.mustDB(), &bulk); err != nil {
log.Error(ctx, "%v", err)
return
}

continue
}
}

mods := []workflowtemplate.TemplateRequestModifierFunc{
workflowtemplate.TemplateRequestModifiers.DefaultKeys(*p),
}
_, wti, err = workflowtemplate.CheckAndExecuteTemplate(ctx, api.mustDB(), api.Cache, *consumer, *p, &data, mods...)
if err != nil {
if errD := errorDefer(err); errD != nil {
log.Error(ctx, "%v", errD)
return
}
continue
}

_, wkf, _, _, err := workflow.Push(ctx, api.mustDB(), api.Cache, p, data, nil, consumer, project.DecryptWithBuiltinKey)
if err != nil {
if errD := errorDefer(sdk.WrapError(err, "cannot push generated workflow")); errD != nil {
log.Error(ctx, "%v", errD)
return
}
continue
}

if err := workflowtemplate.UpdateTemplateInstanceWithWorkflow(ctx, api.mustDB(), *wkf, *consumer, wti); err != nil {
if errD := errorDefer(err); errD != nil {
log.Error(ctx, "%v", errD)
return
}
continue
}

bulk.Operations[i].Status = sdk.OperationStatusDone
if err := workflowtemplate.UpdateBulk(api.mustDB(), &bulk); err != nil {
log.Error(ctx, "%v", err)
return
}
}
}
})

// returns created bulk
return service.WriteJSON(w, bulk, http.StatusOK)
}
Expand Down Expand Up @@ -735,8 +555,8 @@ func (api *API) getTemplateBulkHandler() service.Handler {
return err
}

b, err := workflowtemplate.GetBulkByIDAndTemplateID(api.mustDB(), id, wt.ID)
if err != nil {
b, err := workflowtemplate.GetBulkByIDAndTemplateID(ctx, api.mustDB(), id, wt.ID)
if err != nil && !sdk.ErrorIs(err, sdk.ErrNotFound) {
return err
}
if b == nil || (b.UserID != getUserConsumer(ctx).AuthConsumerUser.AuthentifiedUser.ID && !isMaintainer(ctx) && !isAdmin(ctx)) {
Expand Down
1 change: 0 additions & 1 deletion engine/api/workflow_run_craft.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ func (api *API) WorkflowRunCraft(ctx context.Context, tick time.Duration) error
}
}
}

}

func (api *API) workflowRunCraft(ctx context.Context, id int64) error {
Expand Down
Loading

0 comments on commit 0da7754

Please sign in to comment.