diff --git a/engine/api/api.go b/engine/api/api.go index bf0c239a61..8d32963140 100644 --- a/engine/api/api.go +++ b/engine/api/api.go @@ -650,6 +650,9 @@ func (a *API) Serve(ctx context.Context) error { sdk.GoRoutine(ctx, "authentication.SessionCleaner", func(ctx context.Context) { authentication.SessionCleaner(ctx, a.mustDB, 10*time.Second) }, a.PanicDump()) + sdk.GoRoutine(ctx, "api.WorkflowRunCraft", func(ctx context.Context) { + a.WorkflowRunCraft(ctx, 100*time.Millisecond) + }, a.PanicDump()) migrate.Add(ctx, sdk.Migration{Name: "RunsSecrets", Release: "0.47.0", Blocker: false, Automatic: true, ExecFunc: func(ctx context.Context) error { return migrate.RunsSecrets(ctx, a.DBConnectionFactory.GetDBMap(gorpmapping.Mapper)) diff --git a/engine/api/operation/operation.go b/engine/api/operation/operation.go index 841ec884e8..ea8b1dc992 100644 --- a/engine/api/operation/operation.go +++ b/engine/api/operation/operation.go @@ -9,9 +9,9 @@ import ( "github.com/go-gorp/gorp" - "github.com/ovh/cds/engine/cache" "github.com/ovh/cds/engine/api/repositoriesmanager" "github.com/ovh/cds/engine/api/services" + "github.com/ovh/cds/engine/cache" "github.com/ovh/cds/engine/gorpmapper" "github.com/ovh/cds/sdk" "github.com/ovh/cds/sdk/exportentities" diff --git a/engine/api/project/dao.go b/engine/api/project/dao.go index 072b707801..ca4c0a5cf9 100644 --- a/engine/api/project/dao.go +++ b/engine/api/project/dao.go @@ -5,16 +5,17 @@ import ( "database/sql" "reflect" "runtime" + "strings" "time" "github.com/go-gorp/gorp" - "github.com/ovh/cds/engine/cache" "github.com/ovh/cds/engine/api/database/gorpmapping" "github.com/ovh/cds/engine/api/environment" "github.com/ovh/cds/engine/api/group" "github.com/ovh/cds/engine/api/keys" "github.com/ovh/cds/engine/api/repositoriesmanager" + "github.com/ovh/cds/engine/cache" "github.com/ovh/cds/engine/gorpmapper" "github.com/ovh/cds/sdk" "github.com/ovh/cds/sdk/log" @@ -310,6 +311,8 @@ func unwrap(ctx context.Context, db gorp.SqlExecutor, p *dbProject, opts []LoadO continue } name := runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name() + nameSplitted := strings.Split(name, "/") + name = nameSplitted[len(nameSplitted)-1] _, end = telemetry.Span(ctx, name) if err := f(db, &proj); err != nil && sdk.Cause(err) != sql.ErrNoRows { end() diff --git a/engine/api/repositories_manager_test.go b/engine/api/repositories_manager_test.go index e6c9172508..310455f52f 100644 --- a/engine/api/repositories_manager_test.go +++ b/engine/api/repositories_manager_test.go @@ -229,7 +229,7 @@ vcs_ssh_key: proj-blabla t.Log("Inserting workflow run=====") // creates a run - wr, errWR := workflow.CreateRun(db.DbMap, w1, nil, u) + wr, errWR := workflow.CreateRun(db.DbMap, w1, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID}) assert.NoError(t, errWR) wr.Workflow = *w1 t.Log("Starting workflow run=====") diff --git a/engine/api/router_middleware_auth_permission_test.go b/engine/api/router_middleware_auth_permission_test.go index 941b7f12e7..fb16d9d8b5 100644 --- a/engine/api/router_middleware_auth_permission_test.go +++ b/engine/api/router_middleware_auth_permission_test.go @@ -23,7 +23,7 @@ import ( func Test_checkWorkflowPermissions(t *testing.T) { api, db, _ := newTestAPI(t) - wctx := testRunWorkflow(t, api, db, api.Router) + wctx := testRunWorkflow(t, api, api.Router) user := wctx.user admin, _ := assets.InsertAdminUser(t, db) maintainer, _ := assets.InsertAdminUser(t, db) diff --git a/engine/api/workflow/dao_run.go b/engine/api/workflow/dao_run.go index 8f12971754..3beb38c4ac 100644 --- a/engine/api/workflow/dao_run.go +++ b/engine/api/workflow/dao_run.go @@ -15,6 +15,7 @@ import ( "github.com/lib/pq" "go.opencensus.io/stats" + "github.com/ovh/cds/engine/api/authentication" "github.com/ovh/cds/engine/api/database/gorpmapping" "github.com/ovh/cds/sdk" "github.com/ovh/cds/sdk/log" @@ -33,7 +34,9 @@ workflow_run.last_sub_num, workflow_run.last_execution, workflow_run.to_delete, workflow_run.read_only, -workflow_run.version +workflow_run.version, +workflow_run.to_craft, +workflow_run.to_craft_opts ` // LoadRunOptions are options for loading a run (node or workflow) @@ -590,8 +593,34 @@ func InsertRunNum(db gorp.SqlExecutor, w *sdk.Workflow, num int64) error { return nil } +func LoadCratingWorkflowRunIDs(db gorp.SqlExecutor) ([]int64, error) { + query := ` + SELECT id + FROM workflow_run + WHERE to_craft = true + LIMIT 10 + ` + var ids []int64 + _, err := db.Select(&ids, query) + if err != nil { + return nil, sdk.WrapError(err, "unable to load crafting workflow runs") + } + return ids, nil +} + +func UpdateCraftedWorkflowRun(db gorp.SqlExecutor, id int64) error { + query := `UPDATE workflow_run + SET to_craft = false + WHERE id = $1 + ` + if _, err := db.Exec(query, id); err != nil { + return sdk.WrapError(err, "unable to update crafting workflow run %d", id) + } + return nil +} + // CreateRun creates a new workflow run and insert it -func CreateRun(db *gorp.DbMap, wf *sdk.Workflow, opts *sdk.WorkflowRunPostHandlerOption, ident sdk.Identifiable) (*sdk.WorkflowRun, error) { +func CreateRun(db *gorp.DbMap, wf *sdk.Workflow, opts sdk.WorkflowRunPostHandlerOption) (*sdk.WorkflowRun, error) { number, err := NextRunNumber(db, wf.ID) if err != nil { return nil, sdk.WrapError(err, "unable to get next run number") @@ -606,25 +635,31 @@ func CreateRun(db *gorp.DbMap, wf *sdk.Workflow, opts *sdk.WorkflowRunPostHandle Status: sdk.StatusPending, LastExecution: time.Now(), Tags: make([]sdk.WorkflowRunTag, 0), - Workflow: sdk.Workflow{Name: wf.Name}, + Workflow: *wf, //sdk.Workflow{Name: wf.Name}, + ToCraft: true, + ToCraftOpts: &opts, } - if opts != nil && opts.Hook != nil { + if opts.Hook != nil { if trigg, ok := opts.Hook.Payload["cds.triggered_by.username"]; ok { wr.Tag(tagTriggeredBy, trigg) } else { wr.Tag(tagTriggeredBy, "cds.hook") } } else { - wr.Tag(tagTriggeredBy, ident.GetUsername()) + c, err := authentication.LoadConsumerByID(context.Background(), db, opts.AuthConsumerID, authentication.LoadConsumerOptions.WithAuthentifiedUser, authentication.LoadConsumerOptions.WithConsumerGroups) + if err != nil { + return nil, err + } + wr.Tag(tagTriggeredBy, c.GetUsername()) } tags := wf.Metadata["default_tags"] var payload map[string]string - if opts != nil && opts.Hook != nil { + if opts.Hook != nil { payload = opts.Hook.Payload } - if opts != nil && opts.Manual != nil { + if opts.Manual != nil { e := dump.NewDefaultEncoder() e.Formatters = []dump.KeyFormatterFunc{dump.WithDefaultLowerCaseFormatter()} e.ExtraFields.DetailedMap = false diff --git a/engine/api/workflow/dao_run_test.go b/engine/api/workflow/dao_run_test.go index 6a002d3b83..76e6a3a957 100644 --- a/engine/api/workflow/dao_run_test.go +++ b/engine/api/workflow/dao_run_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "gopkg.in/yaml.v2" "github.com/ovh/cds/engine/api/application" @@ -230,8 +231,8 @@ vcs_ssh_key: proj-blabla test.NoError(t, err) for i := 0; i < 5; i++ { - wr, errWR := workflow.CreateRun(db.DbMap, w1, nil, u) - assert.NoError(t, errWR) + wr, errWR := workflow.CreateRun(db.DbMap, w1, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID}) + require.NoError(t, errWR) wr.Workflow = *w1 _, errWr := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wr, &sdk.WorkflowRunPostHandlerOption{ Manual: &sdk.WorkflowNodeRunManual{ @@ -317,7 +318,7 @@ func TestPurgeWorkflowRunWithRunningStatus(t *testing.T) { test.NoError(t, err) for i := 0; i < 5; i++ { - wfr, errWR := workflow.CreateRun(db.DbMap, w1, nil, u) + wfr, errWR := workflow.CreateRun(db.DbMap, w1, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID}) assert.NoError(t, errWR) wfr.Workflow = *w1 _, errWr := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wfr, &sdk.WorkflowRunPostHandlerOption{ @@ -504,7 +505,7 @@ vcs_ssh_key: proj-blabla }) test.NoError(t, err) - wr, errWR := workflow.CreateRun(db.DbMap, w1, nil, u) + wr, errWR := workflow.CreateRun(db.DbMap, w1, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID}) assert.NoError(t, errWR) wr.Workflow = *w1 _, errWr := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wr, &sdk.WorkflowRunPostHandlerOption{ @@ -519,7 +520,7 @@ vcs_ssh_key: proj-blabla test.NoError(t, errWr) for i := 0; i < 5; i++ { - wfr, errWR := workflow.CreateRun(db.DbMap, w1, nil, u) + wfr, errWR := workflow.CreateRun(db.DbMap, w1, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID}) assert.NoError(t, errWR) wfr.Workflow = *w1 _, errWr := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wfr, &sdk.WorkflowRunPostHandlerOption{ @@ -696,7 +697,7 @@ vcs_ssh_key: proj-blabla test.NoError(t, err) for i := 0; i < 5; i++ { - wfr, errWR := workflow.CreateRun(db.DbMap, w1, nil, u) + wfr, errWR := workflow.CreateRun(db.DbMap, w1, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID}) assert.NoError(t, errWR) wfr.Workflow = *w1 _, errWr := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wfr, &sdk.WorkflowRunPostHandlerOption{ @@ -788,7 +789,7 @@ func TestPurgeWorkflowRunWithoutTags(t *testing.T) { branches := []string{"master", "master", "master", "develop", "develop", "testBr", "testBr", "testBr", "testBr", "test4"} for i := 0; i < 10; i++ { - wr, errWR := workflow.CreateRun(db.DbMap, w1, nil, u) + wr, errWR := workflow.CreateRun(db.DbMap, w1, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID}) assert.NoError(t, errWR) wr.Workflow = *w1 _, errWr := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wr, &sdk.WorkflowRunPostHandlerOption{ @@ -874,7 +875,7 @@ func TestPurgeWorkflowRunWithoutTagsBiggerHistoryLength(t *testing.T) { branches := []string{"master", "master", "master", "develop", "develop", "testBr", "testBr", "testBr", "testBr", "test4"} for i := 0; i < 10; i++ { - wr, errWR := workflow.CreateRun(db.DbMap, w1, nil, u) + wr, errWR := workflow.CreateRun(db.DbMap, w1, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID}) assert.NoError(t, errWR) wr.Workflow = *w1 _, errWr := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wr, &sdk.WorkflowRunPostHandlerOption{ diff --git a/engine/api/workflow/dao_staticfiles_test.go b/engine/api/workflow/dao_staticfiles_test.go index 2759d9419a..5b50a3c2b7 100644 --- a/engine/api/workflow/dao_staticfiles_test.go +++ b/engine/api/workflow/dao_staticfiles_test.go @@ -81,7 +81,7 @@ func TestInsertStaticFiles(t *testing.T) { }) test.NoError(t, err) - wfr, errWR := workflow.CreateRun(db.DbMap, w1, nil, u) + wfr, errWR := workflow.CreateRun(db.DbMap, w1, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID}) assert.NoError(t, errWR) wfr.Workflow = *w1 _, errWr := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wfr, &sdk.WorkflowRunPostHandlerOption{ diff --git a/engine/api/workflow/factory_dao.go b/engine/api/workflow/factory_dao.go index 95747edc8d..1ab983a544 100644 --- a/engine/api/workflow/factory_dao.go +++ b/engine/api/workflow/factory_dao.go @@ -609,7 +609,6 @@ func (dao WorkflowDAO) withNotifications(db gorp.SqlExecutor, ws *[]Workflow) er for x := range *ws { w := &(*ws)[x] w.Notifications = notificationsMap[w.ID] - log.Debug("workflow %d notifications: %+v", w.ID, w.Notifications) } return nil } diff --git a/engine/api/workflow/process_node_test.go b/engine/api/workflow/process_node_test.go index 21013112f9..8437593e7d 100644 --- a/engine/api/workflow/process_node_test.go +++ b/engine/api/workflow/process_node_test.go @@ -17,13 +17,13 @@ import ( "github.com/ovh/cds/engine/api/application" "github.com/ovh/cds/engine/api/authentication" "github.com/ovh/cds/engine/api/bootstrap" - "github.com/ovh/cds/engine/cache" "github.com/ovh/cds/engine/api/pipeline" "github.com/ovh/cds/engine/api/repositoriesmanager" "github.com/ovh/cds/engine/api/services" "github.com/ovh/cds/engine/api/test" "github.com/ovh/cds/engine/api/test/assets" "github.com/ovh/cds/engine/api/workflow" + "github.com/ovh/cds/engine/cache" "github.com/ovh/cds/engine/gorpmapper" "github.com/ovh/cds/sdk" "github.com/ovh/cds/sdk/cdsclient" @@ -39,6 +39,7 @@ func TestHookRunWithoutPayloadProcessNodeBuildParameter(t *testing.T) { db, cache := test.SetupPG(t, bootstrap.InitiliazeDB) u, _ := assets.InsertAdminUser(t, db) + consumer, _ := authentication.LoadConsumerByTypeAndUserID(context.TODO(), db, sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser) webHookModel, err := workflow.LoadHookModelByName(db, sdk.WebHookModelName) assert.NoError(t, err) @@ -205,15 +206,15 @@ func TestHookRunWithoutPayloadProcessNodeBuildParameter(t *testing.T) { hookEvent.WorkflowNodeHookUUID = w.WorkflowData.Node.Hooks[0].UUID hookEvent.Payload = nil - opts := &sdk.WorkflowRunPostHandlerOption{ - Hook: &hookEvent, + opts := sdk.WorkflowRunPostHandlerOption{ + Hook: &hookEvent, + AuthConsumerID: consumer.ID, } - wr, err := workflow.CreateRun(db.DbMap, &w, opts, u) + wr, err := workflow.CreateRun(db.DbMap, &w, opts) assert.NoError(t, err) wr.Workflow = w - consumer, _ := authentication.LoadConsumerByTypeAndUserID(context.TODO(), db, sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser) - _, errR := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wr, opts, consumer, nil) + _, errR := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wr, &opts, consumer, nil) assert.NoError(t, errR) assert.Equal(t, 1, len(wr.WorkflowNodeRuns)) @@ -231,6 +232,7 @@ func TestHookRunWithHashOnlyProcessNodeBuildParameter(t *testing.T) { db, cache := test.SetupPG(t, bootstrap.InitiliazeDB) u, _ := assets.InsertAdminUser(t, db) + consumer, _ := authentication.LoadConsumerByTypeAndUserID(context.TODO(), db, sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser) webHookModel, err := workflow.LoadHookModelByName(db, sdk.WebHookModelName) assert.NoError(t, err) @@ -390,12 +392,12 @@ func TestHookRunWithHashOnlyProcessNodeBuildParameter(t *testing.T) { } opts := &sdk.WorkflowRunPostHandlerOption{ - Hook: &hookEvent, + Hook: &hookEvent, + AuthConsumerID: consumer.ID, } - wr, err := workflow.CreateRun(db.DbMap, &w, opts, u) + wr, err := workflow.CreateRun(db.DbMap, &w, *opts) assert.NoError(t, err) wr.Workflow = w - consumer, _ := authentication.LoadConsumerByTypeAndUserID(context.TODO(), db, sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser) _, errR := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wr, opts, consumer, nil) assert.NoError(t, errR) @@ -541,14 +543,14 @@ func TestManualRunWithPayloadProcessNodeBuildParameter(t *testing.T) { manualEvent.Payload = map[string]string{ "git.branch": "feat/branch", } - + consumer, _ := authentication.LoadConsumerByTypeAndUserID(context.TODO(), db, sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser) opts := &sdk.WorkflowRunPostHandlerOption{ - Manual: &manualEvent, + Manual: &manualEvent, + AuthConsumerID: consumer.ID, } - wr, err := workflow.CreateRun(db.DbMap, &w, opts, u) + wr, err := workflow.CreateRun(db.DbMap, &w, *opts) assert.NoError(t, err) wr.Workflow = w - consumer, _ := authentication.LoadConsumerByTypeAndUserID(context.TODO(), db, sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser) _, errR := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wr, opts, consumer, nil) assert.NoError(t, errR) @@ -688,13 +690,14 @@ func TestManualRunBranchAndCommitInPayloadProcessNodeBuildParameter(t *testing.T "git.hash": "currentcommit", } + consumer, _ := authentication.LoadConsumerByTypeAndUserID(context.TODO(), db, sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser) opts := &sdk.WorkflowRunPostHandlerOption{ - Manual: &manualEvent, + Manual: &manualEvent, + AuthConsumerID: consumer.ID, } - wr, err := workflow.CreateRun(db.DbMap, &w, opts, u) + wr, err := workflow.CreateRun(db.DbMap, &w, *opts) assert.NoError(t, err) wr.Workflow = w - consumer, _ := authentication.LoadConsumerByTypeAndUserID(context.TODO(), db, sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser) _, errR := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wr, opts, consumer, nil) assert.NoError(t, errR) @@ -915,14 +918,15 @@ func TestManualRunBranchAndRepositoryInPayloadProcessNodeBuildParameter(t *testi "git.repository": "richardlt/demo", } + consumer, _ := authentication.LoadConsumerByTypeAndUserID(context.TODO(), db, sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser) opts := &sdk.WorkflowRunPostHandlerOption{ - Manual: &manualEvent, + Manual: &manualEvent, + AuthConsumerID: consumer.ID, } - wr, err := workflow.CreateRun(db.DbMap, &w, opts, u) + wr, err := workflow.CreateRun(db.DbMap, &w, *opts) assert.NoError(t, err) wr.Workflow = w - consumer, _ := authentication.LoadConsumerByTypeAndUserID(context.TODO(), db, sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser) _, errR := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wr, opts, consumer, nil) assert.NoError(t, errR) @@ -1149,15 +1153,15 @@ func TestManualRunBuildParameterMultiApplication(t *testing.T) { "git.branch": "feat/branch", } + consumer, _ := authentication.LoadConsumerByTypeAndUserID(context.TODO(), db, sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser) opts := &sdk.WorkflowRunPostHandlerOption{ - Manual: &manualEvent, + Manual: &manualEvent, + AuthConsumerID: consumer.ID, } - wr, err := workflow.CreateRun(db.DbMap, &w, opts, u) + wr, err := workflow.CreateRun(db.DbMap, &w, *opts) assert.NoError(t, err) wr.Workflow = w - consumer, _ := authentication.LoadConsumerByTypeAndUserID(context.TODO(), db, sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser) - _, errR := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wr, opts, consumer, nil) assert.NoError(t, errR) @@ -1394,15 +1398,15 @@ func TestManualRunBuildParameterNoApplicationOnRoot(t *testing.T) { "git.branch": "feat/branch", } + consumer, _ := authentication.LoadConsumerByTypeAndUserID(context.TODO(), db, sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser) opts := &sdk.WorkflowRunPostHandlerOption{ - Manual: &manualEvent, + Manual: &manualEvent, + AuthConsumerID: consumer.ID, } - wr, err := workflow.CreateRun(db.DbMap, &w, opts, u) + wr, err := workflow.CreateRun(db.DbMap, &w, *opts) assert.NoError(t, err) wr.Workflow = w - consumer, _ := authentication.LoadConsumerByTypeAndUserID(context.TODO(), db, sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser) - _, errR := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wr, opts, consumer, nil) assert.NoError(t, errR) @@ -1585,13 +1589,14 @@ func TestGitParamOnPipelineWithoutApplication(t *testing.T) { "git.branch": "feat/branch", } + consumer, _ := authentication.LoadConsumerByTypeAndUserID(context.TODO(), db, sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser) opts := &sdk.WorkflowRunPostHandlerOption{ - Manual: &manualEvent, + Manual: &manualEvent, + AuthConsumerID: consumer.ID, } - wr, err := workflow.CreateRun(db.DbMap, &w, opts, u) + wr, err := workflow.CreateRun(db.DbMap, &w, *opts) assert.NoError(t, err) wr.Workflow = w - consumer, _ := authentication.LoadConsumerByTypeAndUserID(context.TODO(), db, sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser) _, errR := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wr, opts, consumer, nil) assert.NoError(t, errR) @@ -1771,13 +1776,14 @@ func TestGitParamOnApplicationWithoutRepo(t *testing.T) { "git.branch": "feat/branch", } + consumer, _ := authentication.LoadConsumerByTypeAndUserID(context.TODO(), db, sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser) opts := &sdk.WorkflowRunPostHandlerOption{ - Manual: &manualEvent, + Manual: &manualEvent, + AuthConsumerID: consumer.ID, } - wr, err := workflow.CreateRun(db.DbMap, &w, opts, u) + wr, err := workflow.CreateRun(db.DbMap, &w, *opts) assert.NoError(t, err) wr.Workflow = w - consumer, _ := authentication.LoadConsumerByTypeAndUserID(context.TODO(), db, sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser) _, errR := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wr, opts, consumer, nil) assert.NoError(t, errR) @@ -1968,13 +1974,14 @@ func TestGitParamOn2ApplicationSameRepo(t *testing.T) { "my.value": "bar", } + consumer, _ := authentication.LoadConsumerByTypeAndUserID(context.TODO(), db, sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser) opts := &sdk.WorkflowRunPostHandlerOption{ - Manual: &manualEvent, + Manual: &manualEvent, + AuthConsumerID: consumer.ID, } - wr, err := workflow.CreateRun(db.DbMap, &w, opts, u) + wr, err := workflow.CreateRun(db.DbMap, &w, *opts) assert.NoError(t, err) wr.Workflow = w - consumer, _ := authentication.LoadConsumerByTypeAndUserID(context.TODO(), db, sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser) _, errR := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wr, opts, consumer, nil) assert.NoError(t, errR) @@ -2179,13 +2186,14 @@ func TestGitParamWithJoin(t *testing.T) { "my.value": "bar", } + consumer, _ := authentication.LoadConsumerByTypeAndUserID(context.TODO(), db, sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser) opts := &sdk.WorkflowRunPostHandlerOption{ - Manual: &manualEvent, + Manual: &manualEvent, + AuthConsumerID: consumer.ID, } - wr, err := workflow.CreateRun(db.DbMap, &w, opts, u) + wr, err := workflow.CreateRun(db.DbMap, &w, *opts) assert.NoError(t, err) wr.Workflow = w - consumer, _ := authentication.LoadConsumerByTypeAndUserID(context.TODO(), db, sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser) _, errR := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wr, opts, consumer, nil) assert.NoError(t, errR) @@ -2397,13 +2405,14 @@ func TestGitParamOn2ApplicationSameRepoWithFork(t *testing.T) { "my.value": "bar", } + consumer, _ := authentication.LoadConsumerByTypeAndUserID(context.TODO(), db, sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser) opts := &sdk.WorkflowRunPostHandlerOption{ - Manual: &manualEvent, + Manual: &manualEvent, + AuthConsumerID: consumer.ID, } - wr, err := workflow.CreateRun(db.DbMap, &w, opts, u) + wr, err := workflow.CreateRun(db.DbMap, &w, *opts) assert.NoError(t, err) wr.Workflow = w - consumer, _ := authentication.LoadConsumerByTypeAndUserID(context.TODO(), db, sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser) _, errR := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wr, opts, consumer, nil) assert.NoError(t, errR) @@ -2588,13 +2597,14 @@ func TestManualRunWithPayloadAndRunCondition(t *testing.T) { "git.branch": "feat/branch", } + consumer, _ := authentication.LoadConsumerByTypeAndUserID(context.TODO(), db, sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser) opts := &sdk.WorkflowRunPostHandlerOption{ - Manual: &manualEvent, + Manual: &manualEvent, + AuthConsumerID: consumer.ID, } - wr, err := workflow.CreateRun(db.DbMap, &w, opts, u) + wr, err := workflow.CreateRun(db.DbMap, &w, *opts) assert.NoError(t, err) wr.Workflow = w - consumer, _ := authentication.LoadConsumerByTypeAndUserID(context.TODO(), db, sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser) _, errR := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wr, opts, consumer, nil) assert.NoError(t, errR) diff --git a/engine/api/workflow/process_start_test.go b/engine/api/workflow/process_start_test.go index 7240961c2b..fc4155f2e4 100644 --- a/engine/api/workflow/process_start_test.go +++ b/engine/api/workflow/process_start_test.go @@ -54,7 +54,7 @@ func TestProcessJoinDefaultCondition(t *testing.T) { require.NoError(t, workflow.Insert(context.TODO(), db, cache, *proj, &wr.Workflow)) // Create run - wrr, err := workflow.CreateRun(db.DbMap, &wr.Workflow, nil, u) + wrr, err := workflow.CreateRun(db.DbMap, &wr.Workflow, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID}) require.NoError(t, err) wr.ID = wrr.ID wr.WorkflowID = wr.Workflow.ID @@ -124,7 +124,7 @@ func TestProcessJoinCustomCondition(t *testing.T) { require.NoError(t, workflow.Insert(context.TODO(), db, cache, *proj, &wr.Workflow)) // Create run - wrr, err := workflow.CreateRun(db.DbMap, &wr.Workflow, nil, u) + wrr, err := workflow.CreateRun(db.DbMap, &wr.Workflow, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID}) require.NoError(t, err) wr.ID = wrr.ID wr.WorkflowID = wr.Workflow.ID diff --git a/engine/api/workflow/repository.go b/engine/api/workflow/repository.go index f65fbdf447..3f74f02872 100644 --- a/engine/api/workflow/repository.go +++ b/engine/api/workflow/repository.go @@ -4,15 +4,17 @@ import ( "archive/tar" "bytes" "context" + "fmt" "path/filepath" "github.com/fsamin/go-dump" "github.com/go-gorp/gorp" + "github.com/sirupsen/logrus" - "github.com/ovh/cds/engine/cache" "github.com/ovh/cds/engine/api/keys" "github.com/ovh/cds/engine/api/operation" "github.com/ovh/cds/engine/api/workflowtemplate" + "github.com/ovh/cds/engine/cache" "github.com/ovh/cds/sdk" "github.com/ovh/cds/sdk/exportentities" "github.com/ovh/cds/sdk/log" @@ -50,9 +52,16 @@ func CreateFromRepository(ctx context.Context, db *gorp.DbMap, store cache.Store return nil, nil, sdk.WrapError(err, "unable to post repository operation") } + log.Info(ctx, "polling operation %v for workflow %s/%s", newOperation.UUID, p.Key, wf.Name) ope, err := operation.Poll(ctx, db, newOperation.UUID) if err != nil { - return nil, nil, sdk.WrapError(err, "cannot analyse repository") + isErrWithStack := sdk.IsErrorWithStack(err) + fields := logrus.Fields{} + if isErrWithStack { + fields["stack_trace"] = fmt.Sprintf("%+v", err) + } + log.ErrorWithFields(ctx, fields, "cannot analyse repository (operation %s for workflow %s/%s): %v", newOperation.UUID, p.Key, wf.Name, err) + return nil, nil, sdk.WithStack(sdk.ErrRepoAnalyzeFailed) } var uuid string diff --git a/engine/api/workflow/run_workflow_test.go b/engine/api/workflow/run_workflow_test.go index d91dd0fdc9..a5f4081778 100644 --- a/engine/api/workflow/run_workflow_test.go +++ b/engine/api/workflow/run_workflow_test.go @@ -110,10 +110,10 @@ func TestManualRun1(t *testing.T) { t.Logf("w1: %+v", w1) require.NoError(t, err) - wr, errWR := workflow.CreateRun(db.DbMap, w1, nil, u) + consumer, _ := authentication.LoadConsumerByTypeAndUserID(context.TODO(), db, sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser) + wr, errWR := workflow.CreateRun(db.DbMap, w1, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID}) assert.NoError(t, errWR) wr.Workflow = *w1 - consumer, _ := authentication.LoadConsumerByTypeAndUserID(context.TODO(), db, sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser) _, errS := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wr, &sdk.WorkflowRunPostHandlerOption{ Manual: &sdk.WorkflowNodeRunManual{ @@ -125,7 +125,7 @@ func TestManualRun1(t *testing.T) { }, consumer, nil) require.NoError(t, errS) - wr2, errWR := workflow.CreateRun(db.DbMap, w1, nil, u) + wr2, errWR := workflow.CreateRun(db.DbMap, w1, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID}) assert.NoError(t, errWR) wr2.Workflow = *w1 _, errS = workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wr2, &sdk.WorkflowRunPostHandlerOption{ @@ -270,7 +270,7 @@ func TestManualRun2(t *testing.T) { require.NoError(t, err) consumer, _ := authentication.LoadConsumerByTypeAndUserID(context.TODO(), db, sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser) - wr, errWR := workflow.CreateRun(db.DbMap, w1, nil, u) + wr, errWR := workflow.CreateRun(db.DbMap, w1, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID}) assert.NoError(t, errWR) wr.Workflow = *w1 _, errS := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wr, &sdk.WorkflowRunPostHandlerOption{ @@ -278,7 +278,7 @@ func TestManualRun2(t *testing.T) { }, consumer, nil) require.NoError(t, errS) - wr2, errWR := workflow.CreateRun(db.DbMap, w1, nil, u) + wr2, errWR := workflow.CreateRun(db.DbMap, w1, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID}) assert.NoError(t, errWR) wr2.Workflow = *w1 _, errS = workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wr2, &sdk.WorkflowRunPostHandlerOption{ @@ -542,7 +542,7 @@ func TestManualRun3(t *testing.T) { }) require.NoError(t, err) - wr, errWR := workflow.CreateRun(db.DbMap, w1, nil, u) + wr, errWR := workflow.CreateRun(db.DbMap, w1, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID}) require.NoError(t, errWR) wr.Workflow = *w1 _, errS := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wr, &sdk.WorkflowRunPostHandlerOption{ @@ -851,7 +851,7 @@ func TestNoStage(t *testing.T) { require.NoError(t, err) consumer, _ := authentication.LoadConsumerByTypeAndUserID(context.TODO(), db, sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser) - wr, errWR := workflow.CreateRun(db.DbMap, w1, nil, u) + wr, errWR := workflow.CreateRun(db.DbMap, w1, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID}) assert.NoError(t, errWR) wr.Workflow = *w1 _, errS := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wr, &sdk.WorkflowRunPostHandlerOption{ @@ -927,7 +927,7 @@ func TestNoJob(t *testing.T) { }) require.NoError(t, err) - wr, errWR := workflow.CreateRun(db.DbMap, w1, nil, u) + wr, errWR := workflow.CreateRun(db.DbMap, w1, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID}) assert.NoError(t, errWR) wr.Workflow = *w1 _, errS := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wr, &sdk.WorkflowRunPostHandlerOption{ diff --git a/engine/api/workflow_ascode_rename_test.go b/engine/api/workflow_ascode_rename_test.go index 58ea45bde5..a5b54ac080 100644 --- a/engine/api/workflow_ascode_rename_test.go +++ b/engine/api/workflow_ascode_rename_test.go @@ -349,7 +349,7 @@ version: v1.0`), var wrun sdk.WorkflowRun require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &wrun)) - require.NoError(t, waitCraftinWorkflow(t, api.mustDB(), wrun.ID)) + require.NoError(t, waitCraftinWorkflow(t, api, api.mustDB(), wrun.ID)) wr, _ := workflow.LoadRunByID(db, wrun.ID, workflow.LoadRunOptions{}) assert.NotEqual(t, "Fail", wr.Status) diff --git a/engine/api/workflow_ascode_test.go b/engine/api/workflow_ascode_test.go index bd58314ee2..80a346be46 100644 --- a/engine/api/workflow_ascode_test.go +++ b/engine/api/workflow_ascode_test.go @@ -863,7 +863,7 @@ version: v1.0`), var wrun sdk.WorkflowRun require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &wrun)) - require.NoError(t, waitCraftinWorkflow(t, api.mustDB(), wrun.ID)) + require.NoError(t, waitCraftinWorkflow(t, api, api.mustDB(), wrun.ID)) wr, _ := workflow.LoadRunByID(db, wrun.ID, workflow.LoadRunOptions{}) assert.NotEqual(t, "Fail", wr.Status) diff --git a/engine/api/workflow_ascode_with_hooks_test.go b/engine/api/workflow_ascode_with_hooks_test.go index 6fbe635f7f..71ae2d986d 100644 --- a/engine/api/workflow_ascode_with_hooks_test.go +++ b/engine/api/workflow_ascode_with_hooks_test.go @@ -480,7 +480,7 @@ version: v1.0`), var wrun sdk.WorkflowRun require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &wrun)) - require.NoError(t, waitCraftinWorkflow(t, api.mustDB(), wrun.ID)) + require.NoError(t, waitCraftinWorkflow(t, api, api.mustDB(), wrun.ID)) wr, err := workflow.LoadRunByID(db, wrun.ID, workflow.LoadRunOptions{}) require.NoError(t, nil) require.NotEqual(t, "Fail", wr.Status) @@ -815,7 +815,7 @@ version: v1.0`), var wrun sdk.WorkflowRun require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &wrun)) - require.NoError(t, waitCraftinWorkflow(t, api.mustDB(), wrun.ID)) + require.NoError(t, waitCraftinWorkflow(t, api, api.mustDB(), wrun.ID)) wr, err := workflow.LoadRunByID(db, wrun.ID, workflow.LoadRunOptions{}) require.NoError(t, err) @@ -1350,7 +1350,7 @@ version: v1.0`), var wrun sdk.WorkflowRun require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &wrun)) - require.NoError(t, waitCraftinWorkflow(t, api.mustDB(), wrun.ID)) + require.NoError(t, waitCraftinWorkflow(t, api, api.mustDB(), wrun.ID)) wr, err := workflow.LoadRunByID(db, wrun.ID, workflow.LoadRunOptions{}) require.NoError(t, err) @@ -1418,7 +1418,7 @@ version: v1.0`), var wrun2 sdk.WorkflowRun require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &wrun2)) - require.NoError(t, waitCraftinWorkflow(t, api.mustDB(), wrun2.ID)) + require.NoError(t, waitCraftinWorkflow(t, api, api.mustDB(), wrun2.ID)) wr, err = workflow.LoadRunByID(db, wrun2.ID, workflow.LoadRunOptions{}) require.NoError(t, err) diff --git a/engine/api/workflow_ascode_with_secrets_test.go b/engine/api/workflow_ascode_with_secrets_test.go index 5ea8764e0d..c22da12949 100644 --- a/engine/api/workflow_ascode_with_secrets_test.go +++ b/engine/api/workflow_ascode_with_secrets_test.go @@ -349,7 +349,7 @@ version: v1.0`), var wr sdk.WorkflowRun require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &wr)) - require.NoError(t, waitCraftinWorkflow(t, db, wr.ID)) + require.NoError(t, waitCraftinWorkflow(t, api, db, wr.ID)) wrDB, errDB := workflow.LoadRunByID(db, wr.ID, workflow.LoadRunOptions{}) require.NoError(t, errDB) diff --git a/engine/api/workflow_event.go b/engine/api/workflow_event.go index f79b124c83..afec100ef9 100644 --- a/engine/api/workflow_event.go +++ b/engine/api/workflow_event.go @@ -5,9 +5,6 @@ import ( "github.com/ovh/cds/engine/api/notification" - "github.com/go-gorp/gorp" - - "github.com/ovh/cds/engine/cache" "github.com/ovh/cds/engine/api/event" "github.com/ovh/cds/engine/api/workflow" "github.com/ovh/cds/sdk" @@ -15,7 +12,7 @@ import ( ) // WorkflowSendEvent Send event on workflow run -func WorkflowSendEvent(ctx context.Context, db *gorp.DbMap, store cache.Store, proj sdk.Project, report *workflow.ProcessorReport) { +func (api *API) WorkflowSendEvent(ctx context.Context, proj sdk.Project, report *workflow.ProcessorReport) { if report == nil { return } @@ -23,7 +20,7 @@ func WorkflowSendEvent(ctx context.Context, db *gorp.DbMap, store cache.Store, p event.PublishWorkflowRun(ctx, wr, proj.Key) } for _, wnr := range report.Nodes() { - wr, err := workflow.LoadRunByID(db, wnr.WorkflowRunID, workflow.LoadRunOptions{ + wr, err := workflow.LoadRunByID(api.mustDB(), wnr.WorkflowRunID, workflow.LoadRunOptions{ DisableDetailledNodeRun: true, }) if err != nil { @@ -35,13 +32,13 @@ func WorkflowSendEvent(ctx context.Context, db *gorp.DbMap, store cache.Store, p if wnr.SubNumber > 0 { previousNodeRun = &wnr } else { - previousNodeRun, err = workflow.PreviousNodeRun(db, wnr, wnr.WorkflowNodeName, wr.WorkflowID) + previousNodeRun, err = workflow.PreviousNodeRun(api.mustDB(), wnr, wnr.WorkflowNodeName, wr.WorkflowID) if err != nil { log.Warning(ctx, "workflowSendEvent> Cannot load previous node run: %v", err) } } - nr, err := workflow.LoadNodeRunByID(db, wnr.ID, workflow.LoadRunOptions{ + nr, err := workflow.LoadNodeRunByID(api.mustDB(), wnr.ID, workflow.LoadRunOptions{ DisableDetailledNodeRun: false, // load build parameters, used in notif interpolate below }) if err != nil { @@ -49,26 +46,26 @@ func WorkflowSendEvent(ctx context.Context, db *gorp.DbMap, store cache.Store, p continue } - workDB, err := workflow.LoadWorkflowFromWorkflowRunID(db, wr.ID) + workDB, err := workflow.LoadWorkflowFromWorkflowRunID(api.mustDB(), wr.ID) if err != nil { log.Warning(ctx, "WorkflowSendEvent> Unable to load workflow for event: %v", err) continue } - eventsNotif := notification.GetUserWorkflowEvents(ctx, db, store, wr.Workflow.ProjectID, wr.Workflow.ProjectKey, workDB.Name, wr.Workflow.Notifications, previousNodeRun, *nr) + eventsNotif := notification.GetUserWorkflowEvents(ctx, api.mustDB(), api.Cache, wr.Workflow.ProjectID, wr.Workflow.ProjectKey, workDB.Name, wr.Workflow.Notifications, previousNodeRun, *nr) event.PublishWorkflowNodeRun(ctx, *nr, wr.Workflow, eventsNotif) e := &workflow.VCSEventMessenger{} - if err := e.SendVCSEvent(ctx, db, store, proj, *wr, wnr); err != nil { + if err := e.SendVCSEvent(ctx, api.mustDB(), api.Cache, proj, *wr, wnr); err != nil { log.Warning(ctx, "WorkflowSendEvent> Cannot send vcs notification") } } for _, jobrun := range report.Jobs() { - noderun, err := workflow.LoadNodeRunByID(db, jobrun.WorkflowNodeRunID, workflow.LoadRunOptions{}) + noderun, err := workflow.LoadNodeRunByID(api.mustDB(), jobrun.WorkflowNodeRunID, workflow.LoadRunOptions{}) if err != nil { log.Warning(ctx, "workflowSendEvent> Cannot load workflow node run %d: %s", jobrun.WorkflowNodeRunID, err) continue } - wr, err := workflow.LoadRunByID(db, noderun.WorkflowRunID, workflow.LoadRunOptions{ + wr, err := workflow.LoadRunByID(api.mustDB(), noderun.WorkflowRunID, workflow.LoadRunOptions{ WithLightTests: true, }) if err != nil { diff --git a/engine/api/workflow_hook.go b/engine/api/workflow_hook.go index e83ee8962e..307addd708 100644 --- a/engine/api/workflow_hook.go +++ b/engine/api/workflow_hook.go @@ -287,14 +287,14 @@ func (api *API) postWorkflowJobHookCallbackHandler() service.Handler { return sdk.WithStack(err) } - go WorkflowSendEvent(context.Background(), api.mustDB(), api.Cache, *proj, report) + go api.WorkflowSendEvent(context.Background(), *proj, report) - report, err = updateParentWorkflowRun(ctx, api.mustDB, api.Cache, wr) + report, err = api.updateParentWorkflowRun(ctx, wr) if err != nil { return sdk.WithStack(err) } - go WorkflowSendEvent(context.Background(), api.mustDB(), api.Cache, *proj, report) + go api.WorkflowSendEvent(context.Background(), *proj, report) return nil } diff --git a/engine/api/workflow_queue.go b/engine/api/workflow_queue.go index 9cb47ace30..47935d6ee2 100644 --- a/engine/api/workflow_queue.go +++ b/engine/api/workflow_queue.go @@ -112,7 +112,7 @@ func (api *API) postTakeWorkflowJobHandler() service.Handler { } workflow.ResyncNodeRunsWithCommits(ctx, api.mustDB(), api.Cache, *p, report) - go WorkflowSendEvent(context.Background(), api.mustDB(), api.Cache, *p, report) + go api.WorkflowSendEvent(context.Background(), *p, report) return service.WriteJSON(w, pbji, http.StatusOK) } @@ -392,7 +392,7 @@ func (api *API) postWorkflowJobResultHandler() service.Handler { telemetry.Tag(telemetry.TagProjectKey, proj.Key), ) - report, err := postJobResult(customCtx, api.mustDBWithCtx, api.Cache, proj, wk, &res) + report, err := api.postJobResult(customCtx, proj, wk, &res) if err != nil { return sdk.WrapError(err, "unable to post job result") } @@ -411,26 +411,26 @@ func (api *API) postWorkflowJobResultHandler() service.Handler { workflow.ResyncNodeRunsWithCommits(ctx, api.mustDB(), api.Cache, *proj, report) next() - go WorkflowSendEvent(context.Background(), api.mustDB(), api.Cache, *proj, report) + go api.WorkflowSendEvent(context.Background(), *proj, report) return nil } } -func postJobResult(ctx context.Context, dbFunc func(context.Context) *gorp.DbMap, store cache.Store, proj *sdk.Project, wr *sdk.Worker, res *sdk.Result) (*workflow.ProcessorReport, error) { +func (api *API) postJobResult(ctx context.Context, proj *sdk.Project, wr *sdk.Worker, res *sdk.Result) (*workflow.ProcessorReport, error) { var end func() ctx, end = telemetry.Span(ctx, "postJobResult") defer end() //Start the transaction - tx, errb := dbFunc(ctx).Begin() + tx, errb := api.mustDBWithCtx(ctx).Begin() if errb != nil { return nil, sdk.WrapError(errb, "postJobResult> Cannot begin tx") } defer tx.Rollback() // nolint //Load workflow node job run - job, errj := workflow.LoadAndLockNodeJobRunSkipLocked(ctx, tx, store, res.BuildID) + job, errj := workflow.LoadAndLockNodeJobRunSkipLocked(ctx, tx, api.Cache, res.BuildID) if errj != nil { return nil, sdk.WrapError(errj, "cannot load node run job %d", res.BuildID) } @@ -513,10 +513,7 @@ func postJobResult(ctx context.Context, dbFunc func(context.Context) *gorp.DbMap // Update action status log.Debug("postJobResult> Updating %d to %s in queue", job.ID, res.Status) - newDBFunc := func() *gorp.DbMap { - return dbFunc(context.Background()) - } - report, err := workflow.UpdateNodeJobRunStatus(ctx, tx, store, *proj, job, res.Status) + report, err := workflow.UpdateNodeJobRunStatus(ctx, tx, api.Cache, *proj, job, res.Status) if err != nil { return nil, sdk.WrapError(err, "cannot update NodeJobRun %d status", job.ID) } @@ -528,12 +525,12 @@ func postJobResult(ctx context.Context, dbFunc func(context.Context) *gorp.DbMap for i := range report.WorkflowRuns() { run := &report.WorkflowRuns()[i] - reportParent, err := updateParentWorkflowRun(ctx, newDBFunc, store, run) + reportParent, err := api.updateParentWorkflowRun(ctx, run) if err != nil { return nil, sdk.WithStack(err) } - go WorkflowSendEvent(context.Background(), dbFunc(ctx), store, *proj, reportParent) + go api.WorkflowSendEvent(context.Background(), *proj, reportParent) } return report, nil diff --git a/engine/api/workflow_queue_storage.go b/engine/api/workflow_queue_storage.go index d382b24236..112d46fb90 100644 --- a/engine/api/workflow_queue_storage.go +++ b/engine/api/workflow_queue_storage.go @@ -10,9 +10,9 @@ import ( "github.com/gorilla/mux" - "github.com/ovh/cds/engine/cache" "github.com/ovh/cds/engine/api/objectstore" "github.com/ovh/cds/engine/api/workflow" + "github.com/ovh/cds/engine/cache" "github.com/ovh/cds/engine/service" "github.com/ovh/cds/sdk" "github.com/ovh/cds/sdk/log" diff --git a/engine/api/workflow_queue_test.go b/engine/api/workflow_queue_test.go index f72eb29846..76379b7448 100644 --- a/engine/api/workflow_queue_test.go +++ b/engine/api/workflow_queue_test.go @@ -55,7 +55,10 @@ type testRunWorkflowCtx struct { model *sdk.Model } -func testRunWorkflow(t *testing.T, api *API, db gorpmapper.SqlExecutorWithTx, router *Router) testRunWorkflowCtx { +func testRunWorkflow(t *testing.T, api *API, router *Router) testRunWorkflowCtx { + db, err := api.mustDB().Begin() + require.NoError(t, err) + u, pass := assets.InsertLambdaUser(t, db) key := "proj-" + sdk.RandomString(10) proj := assets.InsertTestProject(t, db, api.Cache, key, key) @@ -80,14 +83,14 @@ func testRunWorkflow(t *testing.T, api *API, db gorpmapper.SqlExecutorWithTx, ro ProjectKey: proj.Key, Name: "pip-" + sdk.RandomString(10), } - require.NoError(t, pipeline.InsertPipeline(api.mustDB(), &pip)) + require.NoError(t, pipeline.InsertPipeline(db, &pip)) - script := assets.GetBuiltinOrPluginActionByName(t, api.mustDB(), sdk.ScriptAction) + script := assets.GetBuiltinOrPluginActionByName(t, db, sdk.ScriptAction) s := sdk.NewStage("stage-" + sdk.RandomString(10)) s.Enabled = true s.PipelineID = pip.ID - pipeline.InsertStage(api.mustDB(), s) + pipeline.InsertStage(db, s) j := &sdk.Job{ Enabled: true, Action: sdk.Action{ @@ -97,7 +100,7 @@ func testRunWorkflow(t *testing.T, api *API, db gorpmapper.SqlExecutorWithTx, ro }, }, } - pipeline.InsertJob(api.mustDB(), j, s.ID, &pip) + pipeline.InsertJob(db, j, s.ID, &pip) s.Jobs = append(s.Jobs, *j) pip.Stages = append(pip.Stages, *s) @@ -134,7 +137,7 @@ func testRunWorkflow(t *testing.T, api *API, db gorpmapper.SqlExecutorWithTx, ro Name: "env-" + sdk.RandomString(10), ProjectID: proj.ID, } - if err := environment.InsertEnvironment(api.mustDB(), env); err != nil { + if err := environment.InsertEnvironment(db, env); err != nil { t.Fatal(err) } @@ -175,14 +178,15 @@ func testRunWorkflow(t *testing.T, api *API, db gorpmapper.SqlExecutorWithTx, ro }, } - proj2, errP := project.Load(context.TODO(), api.mustDB(), proj.Key, project.LoadOptions.WithPipelines, project.LoadOptions.WithGroups) + proj2, errP := project.Load(context.TODO(), db, proj.Key, project.LoadOptions.WithPipelines, project.LoadOptions.WithGroups) require.NoError(t, errP) require.NoError(t, workflow.Insert(context.TODO(), db, api.Cache, *proj2, &w)) - w1, err := workflow.Load(context.TODO(), api.mustDB(), api.Cache, *proj, w.Name, workflow.LoadOptions{}) + w1, err := workflow.Load(context.TODO(), db, api.Cache, *proj, w.Name, workflow.LoadOptions{}) require.NoError(t, err) log.Debug("workflow %d groups: %+v", w1.ID, w1.Groups) + require.NoError(t, db.Commit()) //Prepare request vars := map[string]string{ @@ -208,6 +212,8 @@ func testRunWorkflow(t *testing.T, api *API, db gorpmapper.SqlExecutorWithTx, ro t.FailNow() } + waitCraftinWorkflow(t, api, api.mustDB(), wr.ID) + // Wait building status cpt := 0 for { @@ -372,7 +378,7 @@ func TestGetWorkflowJobQueueHandler(t *testing.T) { _, jwt := assets.InsertAdminUser(t, db) t.Log("checkin as a user") - ctx := testRunWorkflow(t, api, db, router) + ctx := testRunWorkflow(t, api, router) testGetWorkflowJobAsRegularUser(t, api, router, jwt, &ctx) assert.NotNil(t, ctx.job) @@ -430,7 +436,7 @@ func TestGetWorkflowJobQueueHandler(t *testing.T) { func Test_postTakeWorkflowJobHandler(t *testing.T) { api, db, router := newTestAPI(t) - ctx := testRunWorkflow(t, api, db, router) + ctx := testRunWorkflow(t, api, router) testGetWorkflowJobAsWorker(t, api, db, router, &ctx) require.NotNil(t, ctx.job) @@ -486,7 +492,7 @@ func Test_postTakeWorkflowInvalidJobHandler(t *testing.T) { _ = services.Delete(db, s) }() - ctx := testRunWorkflow(t, api, db, router) + ctx := testRunWorkflow(t, api, router) testGetWorkflowJobAsWorker(t, api, db, router, &ctx) require.NotNil(t, ctx.job) @@ -526,7 +532,7 @@ func Test_postTakeWorkflowInvalidJobHandler(t *testing.T) { func Test_postBookWorkflowJobHandler(t *testing.T) { api, db, router := newTestAPI(t) - ctx := testRunWorkflow(t, api, db, router) + ctx := testRunWorkflow(t, api, router) testGetWorkflowJobAsHatchery(t, api, db, router, &ctx) assert.NotNil(t, ctx.job) @@ -553,7 +559,7 @@ func Test_postWorkflowJobResultHandler(t *testing.T) { _ = services.Delete(db, s) }() - ctx := testRunWorkflow(t, api, db, router) + ctx := testRunWorkflow(t, api, router) testGetWorkflowJobAsWorker(t, api, db, router, &ctx) assert.NotNil(t, ctx.job) @@ -637,7 +643,7 @@ func Test_postWorkflowJobTestsResultsHandler(t *testing.T) { _ = services.Delete(db, s) }() - ctx := testRunWorkflow(t, api, db, router) + ctx := testRunWorkflow(t, api, router) testGetWorkflowJobAsWorker(t, api, db, router, &ctx) assert.NotNil(t, ctx.job) @@ -746,7 +752,7 @@ func Test_postWorkflowJobArtifactHandler(t *testing.T) { _ = services.Delete(db, s) }() - ctx := testRunWorkflow(t, api, db, router) + ctx := testRunWorkflow(t, api, router) testGetWorkflowJobAsWorker(t, api, db, router, &ctx) assert.NotNil(t, ctx.job) @@ -897,7 +903,7 @@ func Test_postWorkflowJobStaticFilesHandler(t *testing.T) { _ = services.Delete(db, s) }() - ctx := testRunWorkflow(t, api, db, router) + ctx := testRunWorkflow(t, api, router) testGetWorkflowJobAsWorker(t, api, db, router, &ctx) require.NotNil(t, ctx.job) @@ -1047,7 +1053,7 @@ func TestWorkerPrivateKey(t *testing.T) { workflowDeepPipeline, err := workflow.LoadByID(context.TODO(), db, api.Cache, *p, w.ID, workflow.LoadOptions{DeepPipeline: true}) assert.NoError(t, err) - wrDB, errwr := workflow.CreateRun(api.mustDB(), workflowDeepPipeline, nil, u) + wrDB, errwr := workflow.CreateRun(api.mustDB(), workflowDeepPipeline, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID}) assert.NoError(t, errwr) wrDB.Workflow = *workflowDeepPipeline @@ -1159,7 +1165,7 @@ func TestPostVulnerabilityReportHandler(t *testing.T) { workflowDeepPipeline, err := workflow.LoadByID(context.TODO(), db, api.Cache, *p, w.ID, workflow.LoadOptions{DeepPipeline: true}) assert.NoError(t, err) - wrDB, errwr := workflow.CreateRun(api.mustDB(), workflowDeepPipeline, nil, u) + wrDB, errwr := workflow.CreateRun(api.mustDB(), workflowDeepPipeline, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID}) assert.NoError(t, errwr) wrDB.Workflow = *workflowDeepPipeline @@ -1392,15 +1398,15 @@ func TestInsertNewCodeCoverageReport(t *testing.T) { }, ) + consumer, _ := authentication.LoadConsumerByTypeAndUserID(context.TODO(), db, sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser) + // Create previous run on default branch - wrDB, errwr := workflow.CreateRun(api.mustDB(), &w, nil, u) + wrDB, errwr := workflow.CreateRun(api.mustDB(), &w, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID}) assert.NoError(t, errwr) workflowWithDeepPipeline, err := workflow.LoadByID(context.TODO(), db, api.Cache, *proj, w.ID, workflow.LoadOptions{DeepPipeline: true}) assert.NoError(t, err) - consumer, _ := authentication.LoadConsumerByTypeAndUserID(context.TODO(), db, sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser) - wrDB.Workflow = *workflowWithDeepPipeline _, errmr := workflow.StartWorkflowRun(context.Background(), db, api.Cache, *p, wrDB, &sdk.WorkflowRunPostHandlerOption{ Manual: &sdk.WorkflowNodeRunManual{ @@ -1414,7 +1420,7 @@ func TestInsertNewCodeCoverageReport(t *testing.T) { assert.NoError(t, errmr) // Create previous run on a branch - wrCB, errwr2 := workflow.CreateRun(api.mustDB(), &w, nil, u) + wrCB, errwr2 := workflow.CreateRun(api.mustDB(), &w, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID}) assert.NoError(t, errwr2) wrCB.Workflow = w _, errmr = workflow.StartWorkflowRun(context.Background(), db, api.Cache, *p, wrCB, &sdk.WorkflowRunPostHandlerOption{ @@ -1470,7 +1476,7 @@ func TestInsertNewCodeCoverageReport(t *testing.T) { // Run test // Create a workflow run - wrToTest, errwr3 := workflow.CreateRun(api.mustDB(), &w, nil, u) + wrToTest, errwr3 := workflow.CreateRun(api.mustDB(), &w, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID}) assert.NoError(t, errwr3) wrToTest.Workflow = *workflowWithDeepPipeline @@ -1535,7 +1541,7 @@ func Test_postWorkflowJobSetVersionHandler(t *testing.T) { _ = services.Delete(db, s) }() - ctx := testRunWorkflow(t, api, db, router) + ctx := testRunWorkflow(t, api, router) testGetWorkflowJobAsWorker(t, api, db, router, &ctx) require.NotNil(t, ctx.job) diff --git a/engine/api/workflow_run.go b/engine/api/workflow_run.go index e8ed4b6ab1..f2457bd43b 100644 --- a/engine/api/workflow_run.go +++ b/engine/api/workflow_run.go @@ -16,7 +16,7 @@ import ( "github.com/sirupsen/logrus" "github.com/ovh/cds/engine/api/ascode" - "github.com/ovh/cds/engine/cache" + "github.com/ovh/cds/engine/api/authentication" "github.com/ovh/cds/engine/api/event" "github.com/ovh/cds/engine/api/integration" "github.com/ovh/cds/engine/api/objectstore" @@ -365,12 +365,12 @@ func (api *API) stopWorkflowRunHandler() service.Handler { return sdk.WrapError(err, "unable to load project") } - report, err := stopWorkflowRun(ctx, api.mustDB, api.Cache, proj, run, getAPIConsumer(ctx), 0) + report, err := api.stopWorkflowRun(ctx, proj, run, 0) if err != nil { return sdk.WrapError(err, "unable to stop workflow") } - go WorkflowSendEvent(context.Background(), api.mustDB(), api.Cache, *proj, report) + go api.WorkflowSendEvent(context.Background(), *proj, report) go func(ID int64) { wRun, err := workflow.LoadRunByID(api.mustDB(), ID, workflow.LoadRunOptions{DisableDetailledNodeRun: true}) @@ -402,11 +402,11 @@ func (api *API) stopWorkflowRunHandler() service.Handler { } } -func stopWorkflowRun(ctx context.Context, dbFunc func() *gorp.DbMap, store cache.Store, p *sdk.Project, - run *sdk.WorkflowRun, ident sdk.Identifiable, parentWorkflowRunID int64) (*workflow.ProcessorReport, error) { +func (api *API) stopWorkflowRun(ctx context.Context, p *sdk.Project, run *sdk.WorkflowRun, parentWorkflowRunID int64) (*workflow.ProcessorReport, error) { + ident := getAPIConsumer(ctx) report := new(workflow.ProcessorReport) - tx, err := dbFunc().Begin() + tx, err := api.mustDB().Begin() if err != nil { return nil, sdk.WrapError(err, "unable to create transaction") } @@ -431,7 +431,7 @@ func stopWorkflowRun(ctx context.Context, dbFunc func() *gorp.DbMap, store cache continue } - r1, err := workflow.StopWorkflowNodeRun(ctx, dbFunc, store, *p, *run, wnr, stopInfos) + r1, err := workflow.StopWorkflowNodeRun(ctx, api.mustDB, api.Cache, *p, *run, wnr, stopInfos) if err != nil { return nil, sdk.WrapError(err, "unable to stop workflow node run %d", wnr.ID) } @@ -445,7 +445,7 @@ func stopWorkflowRun(ctx context.Context, dbFunc func() *gorp.DbMap, store cache } model, has := run.Workflow.OutGoingHookModels[wnr.OutgoingHook.HookModelID] if !has { - m, errM := workflow.LoadOutgoingHookModelByID(dbFunc(), wnr.OutgoingHook.HookModelID) + m, errM := workflow.LoadOutgoingHookModelByID(api.mustDB(), wnr.OutgoingHook.HookModelID) if errM != nil { log.Error(ctx, "stopWorkflowRun> Unable to load outgoing hook model: %v", errM) continue @@ -458,19 +458,19 @@ func stopWorkflowRun(ctx context.Context, dbFunc func() *gorp.DbMap, store cache targetProject := wnr.OutgoingHook.Config[sdk.HookConfigTargetProject].Value targetWorkflow := wnr.OutgoingHook.Config[sdk.HookConfigTargetWorkflow].Value - targetRun, errL := workflow.LoadRun(ctx, dbFunc(), targetProject, targetWorkflow, *wnr.Callback.WorkflowRunNumber, workflow.LoadRunOptions{}) + targetRun, errL := workflow.LoadRun(ctx, api.mustDB(), targetProject, targetWorkflow, *wnr.Callback.WorkflowRunNumber, workflow.LoadRunOptions{}) if errL != nil { log.Error(ctx, "stopWorkflowRun> Unable to load last workflow run: %v", errL) continue } - targetProj, errP := project.Load(ctx, dbFunc(), targetProject) + targetProj, errP := project.Load(ctx, api.mustDB(), targetProject) if errP != nil { log.Error(ctx, "stopWorkflowRun> Unable to load project %v", errP) continue } - r2, err := stopWorkflowRun(ctx, dbFunc, store, targetProj, targetRun, ident, run.ID) + r2, err := api.stopWorkflowRun(ctx, targetProj, targetRun, run.ID) if err != nil { log.Error(ctx, "stopWorkflowRun> Unable to stop workflow %v", err) continue @@ -493,23 +493,23 @@ func stopWorkflowRun(ctx context.Context, dbFunc func() *gorp.DbMap, store cache } if parentWorkflowRunID == 0 { - report, err := updateParentWorkflowRun(ctx, dbFunc, store, run) + report, err := api.updateParentWorkflowRun(ctx, run) if err != nil { return nil, sdk.WithStack(err) } - go WorkflowSendEvent(context.Background(), dbFunc(), store, *p, report) + go api.WorkflowSendEvent(context.Background(), *p, report) } return report, nil } -func updateParentWorkflowRun(ctx context.Context, dbFunc func() *gorp.DbMap, store cache.Store, run *sdk.WorkflowRun) (*workflow.ProcessorReport, error) { +func (api *API) updateParentWorkflowRun(ctx context.Context, run *sdk.WorkflowRun) (*workflow.ProcessorReport, error) { if !run.HasParentWorkflow() { return nil, nil } parentProj, err := project.Load(context.Background(), - dbFunc(), run.RootRun().HookEvent.ParentWorkflow.Key, + api.mustDB(), run.RootRun().HookEvent.ParentWorkflow.Key, project.LoadOptions.WithVariables, project.LoadOptions.WithIntegrations, project.LoadOptions.WithApplicationVariables, @@ -520,7 +520,7 @@ func updateParentWorkflowRun(ctx context.Context, dbFunc func() *gorp.DbMap, sto } parentWR, err := workflow.LoadRun(ctx, - dbFunc(), + api.mustDB(), run.RootRun().HookEvent.ParentWorkflow.Key, run.RootRun().HookEvent.ParentWorkflow.Name, run.RootRun().HookEvent.ParentWorkflow.Run, @@ -531,11 +531,11 @@ func updateParentWorkflowRun(ctx context.Context, dbFunc func() *gorp.DbMap, sto return nil, sdk.WrapError(err, "unable to load parent run: %v", run.RootRun().HookEvent) } - report, err := workflow.UpdateParentWorkflowRun(ctx, dbFunc, store, run, *parentProj, parentWR) + report, err := workflow.UpdateParentWorkflowRun(ctx, api.mustDB, api.Cache, run, *parentProj, parentWR) if err != nil { return nil, sdk.WithStack(err) } - go WorkflowSendEvent(context.Background(), dbFunc(), store, *parentProj, report) + go api.WorkflowSendEvent(context.Background(), *parentProj, report) return report, nil } @@ -712,7 +712,7 @@ func (api *API) stopWorkflowNodeRunHandler() service.Handler { } sdk.GoRoutine(context.Background(), fmt.Sprintf("stopWorkflowNodeRunHandler-%d", workflowNodeRunID), func(ctx context.Context) { - WorkflowSendEvent(context.Background(), api.mustDB(), api.Cache, *p, r1) + api.WorkflowSendEvent(context.Background(), *p, r1) }) tx, err := api.mustDB().Begin() @@ -746,7 +746,7 @@ func (api *API) stopWorkflowNodeRunHandler() service.Handler { } sdk.GoRoutine(context.Background(), fmt.Sprintf("stopWorkflowNodeRunHandler-%d-resync-run-%d", workflowNodeRunID, workflowRun.ID), func(ctx context.Context) { - WorkflowSendEvent(context.Background(), api.mustDB(), api.Cache, *p, r2) + api.WorkflowSendEvent(context.Background(), *p, r2) }) go func(ID int64) { @@ -820,11 +820,11 @@ func (api *API) postWorkflowRunHandler() service.Handler { return sdk.WrapError(errP, "cannot load project") } - // GET BODY - opts := &sdk.WorkflowRunPostHandlerOption{} - if err := service.UnmarshalBody(r, opts); err != nil { + opts := sdk.WorkflowRunPostHandlerOption{} + if err := service.UnmarshalBody(r, &opts); err != nil { return err } + opts.AuthConsumerID = getAPIConsumer(ctx).ID // Request check if opts.Manual != nil && opts.Manual.OnlyFailedJobs && opts.Manual.Resync { @@ -841,7 +841,6 @@ func (api *API) postWorkflowRunHandler() service.Handler { } } - c := getAPIConsumer(ctx) // To handle conditions on hooks if opts.Hook != nil { hook, errH := workflow.LoadHookByUUID(api.mustDB(), opts.Hook.WorkflowNodeHookUUID) @@ -880,7 +879,7 @@ func (api *API) postWorkflowRunHandler() service.Handler { return sdk.NewErrorFrom(sdk.ErrForbidden, "this workflow execution is on read only mode, it cannot be run anymore") } - if opts != nil && opts.Manual != nil && opts.Manual.Resync { + if opts.Manual != nil && opts.Manual.Resync { log.Debug("Resync workflow %d for run %d", lastRun.Workflow.ID, lastRun.ID) if err := workflow.Resync(ctx, api.mustDB(), api.Cache, *p, lastRun); err != nil { return err @@ -905,17 +904,22 @@ func (api *API) postWorkflowRunHandler() service.Handler { } lastRun.Status = sdk.StatusWaiting + // Workflow Run initialization + sdk.GoRoutine(context.Background(), fmt.Sprintf("api.initWorkflowRun-%d", lastRun.ID), func(ctx context.Context) { + api.initWorkflowRun(ctx, p.Key, wf, lastRun, opts) + }, api.PanicDump()) + } else { - var errWf error - wf, errWf = workflow.Load(ctx, api.mustDB(), api.Cache, *p, name, workflow.LoadOptions{ + var err error + wf, err = workflow.Load(ctx, api.mustDB(), api.Cache, *p, name, workflow.LoadOptions{ DeepPipeline: true, WithAsCodeUpdateEvent: true, WithIcon: true, WithIntegrations: true, WithTemplate: true, }) - if errWf != nil { - return sdk.WrapError(errWf, "unable to load workflow %s", name) + if err != nil { + return sdk.WrapError(err, "unable to load workflow %s", name) } // Check node permission @@ -925,24 +929,32 @@ func (api *API) postWorkflowRunHandler() service.Handler { // CREATE WORKFLOW RUN var errCreateRun error - lastRun, errCreateRun = workflow.CreateRun(api.mustDB(), wf, opts, c) + lastRun, errCreateRun = workflow.CreateRun(api.mustDB(), wf, opts) if errCreateRun != nil { return errCreateRun } } - // Workflow Run initialization - sdk.GoRoutine(context.Background(), fmt.Sprintf("api.initWorkflowRun-%d", lastRun.ID), func(ctx context.Context) { - api.initWorkflowRun(ctx, p.Key, wf, lastRun, opts, c) - }, api.PanicDump()) - return service.WriteJSON(w, lastRun, http.StatusAccepted) } } -func (api *API) initWorkflowRun(ctx context.Context, projKey string, wf *sdk.Workflow, wfRun *sdk.WorkflowRun, opts *sdk.WorkflowRunPostHandlerOption, u *sdk.AuthConsumer) { +func (api *API) initWorkflowRun(ctx context.Context, projKey string, wf *sdk.Workflow, wfRun *sdk.WorkflowRun, opts sdk.WorkflowRunPostHandlerOption) { + ctx, end := telemetry.Span(ctx, "api.initWorkflowRun", + telemetry.Tag(telemetry.TagProjectKey, projKey), + telemetry.Tag(telemetry.TagWorkflow, wf.Name), + ) + defer end() + var asCodeInfosMsg []sdk.Message - report := new(workflow.ProcessorReport) + var report = new(workflow.ProcessorReport) + + u, err := authentication.LoadConsumerByID(ctx, api.mustDB(), opts.AuthConsumerID, authentication.LoadConsumerOptions.WithAuthentifiedUser, authentication.LoadConsumerOptions.WithConsumerGroups) + if err != nil { + r := failInitWorkflowRun(ctx, api.mustDB(), wfRun, err) + report.Merge(ctx, r) + return + } p, err := project.Load(ctx, api.mustDB(), projKey, project.LoadOptions.WithVariables, @@ -956,7 +968,7 @@ func (api *API) initWorkflowRun(ctx context.Context, projKey string, wf *sdk.Wor } defer func() { - go WorkflowSendEvent(context.Background(), api.mustDB(), api.Cache, *p, report) + go api.WorkflowSendEvent(context.Background(), *p, report) }() var workflowSecrets *workflow.PushSecrets @@ -1008,7 +1020,7 @@ func (api *API) initWorkflowRun(ctx context.Context, projKey string, wf *sdk.Wor log.Debug("workflow.CreateFromRepository> %s", wf.Name) oldWf := *wf var asCodeInfosMsg []sdk.Message - workflowSecrets, asCodeInfosMsg, err = workflow.CreateFromRepository(ctx, api.mustDB(), api.Cache, p1, wf, *opts, *u, project.DecryptWithBuiltinKey) + workflowSecrets, asCodeInfosMsg, err = workflow.CreateFromRepository(ctx, api.mustDB(), api.Cache, p1, wf, opts, *u, project.DecryptWithBuiltinKey) infos := make([]sdk.SpawnMsg, len(asCodeInfosMsg)) for i, msg := range asCodeInfosMsg { infos[i] = sdk.SpawnMsg{ @@ -1051,7 +1063,7 @@ func (api *API) initWorkflowRun(ctx context.Context, projKey string, wf *sdk.Wor return } - r, err := workflow.StartWorkflowRun(ctx, tx, api.Cache, *p, wfRun, opts, u, asCodeInfosMsg) + r, err := workflow.StartWorkflowRun(ctx, tx, api.Cache, *p, wfRun, &opts, u, asCodeInfosMsg) report.Merge(ctx, r) if err != nil { _ = tx.Rollback() @@ -1092,11 +1104,11 @@ func (api *API) initWorkflowRun(ctx context.Context, projKey string, wf *sdk.Wor // Update parent for i := range report.WorkflowRuns() { run := &report.WorkflowRuns()[i] - reportParent, err := updateParentWorkflowRun(ctx, api.mustDB, api.Cache, run) + reportParent, err := api.updateParentWorkflowRun(ctx, run) if err != nil { log.Error(ctx, "unable to update parent workflow run: %v", err) } - go WorkflowSendEvent(context.Background(), api.mustDB(), api.Cache, *p, reportParent) + go api.WorkflowSendEvent(context.Background(), *p, reportParent) } } @@ -1249,7 +1261,7 @@ func failInitWorkflowRun(ctx context.Context, db *gorp.DbMap, wfRun *sdk.Workflo if isErrWithStack { fields["stack_trace"] = fmt.Sprintf("%+v", err) } - log.ErrorWithFields(ctx, fields, "%s", err) + log.ErrorWithFields(ctx, fields, "failInitWorkflowRun error: %v", err) wfRun.Status = sdk.StatusFail info = sdk.SpawnMsg{ ID: sdk.MsgWorkflowError.ID, diff --git a/engine/api/workflow_run_craft.go b/engine/api/workflow_run_craft.go new file mode 100644 index 0000000000..edd830a721 --- /dev/null +++ b/engine/api/workflow_run_craft.go @@ -0,0 +1,117 @@ +package api + +import ( + "context" + "strconv" + "time" + + "github.com/ovh/cds/engine/api/project" + "github.com/ovh/cds/engine/api/workflow" + "github.com/ovh/cds/engine/cache" + "github.com/ovh/cds/sdk" + "github.com/ovh/cds/sdk/log" + "github.com/ovh/cds/sdk/telemetry" + "github.com/pkg/errors" + "go.opencensus.io/trace" +) + +func (api *API) WorkflowRunCraft(ctx context.Context, tick time.Duration) error { + ticker := time.NewTicker(tick) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + ids, err := workflow.LoadCratingWorkflowRunIDs(api.mustDB()) + if err != nil { + log.Error(ctx, "WorkflowRunCraft> unable to start tx: %v", err) + continue + } + for _, id := range ids { + sdk.GoRoutine( + ctx, + "workflowRunCraft-"+strconv.FormatInt(id, 10), + func(ctx context.Context) { + ctx, span := telemetry.New(ctx, api, "api.workflowRunCraft", nil, trace.SpanKindUnspecified) + defer span.End() + if err := api.workflowRunCraft(ctx, id); err != nil { + log.Error(ctx, "WorkflowRunCraft> error on workflow run %d: %v", id, err) + } + }, + api.PanicDump(), + ) + } + } + } + +} + +func (api *API) workflowRunCraft(ctx context.Context, id int64) error { + _, next := telemetry.Span(ctx, "api.workflowRunCraft.lock") + lockKey := cache.Key("api:workflowRunCraft", strconv.FormatInt(id, 10)) + b, err := api.Cache.Lock(lockKey, 5*time.Minute, 0, 1) + if err != nil { + next() + return err + } + if !b { + log.Debug("api.workflowRunCraft> run %d is locked in cache", id) + next() + return nil + } + next() + defer func() { + _ = api.Cache.Unlock(lockKey) + }() + + _, next = telemetry.Span(ctx, "api.workflowRunCraft.LoadRunByID") + run, err := workflow.LoadRunByID(api.mustDB(), id, workflow.LoadRunOptions{}) + if sdk.ErrorIs(err, sdk.ErrNotFound) { + next() + return nil + } + if err != nil { + next() + return sdk.WrapError(err, "unable to load workflow run %d", id) + } + next() + + if !run.ToCraft { + return nil + } + + if run.ToCraftOpts == nil { + return errors.New("unable to craft workflow run without options...") + } + + _, next = telemetry.Span(ctx, "api.workflowRunCraft.LoadProjectByID") + proj, err := project.LoadByID(api.mustDB(), run.ProjectID, + project.LoadOptions.WithVariables, + project.LoadOptions.WithIntegrations) + if err != nil { + next() + return sdk.WrapError(err, "unable to load project %d", run.ProjectID) + } + next() + + wf, err := workflow.LoadByID(ctx, api.mustDB(), api.Cache, *proj, run.WorkflowID, workflow.LoadOptions{ + DeepPipeline: true, + WithAsCodeUpdateEvent: true, + WithIcon: true, + WithIntegrations: true, + WithTemplate: true, + }) + if err != nil { + return sdk.WrapError(err, "unable to load workflow %d", run.WorkflowID) + } + + log.Debug("api.workflowRunCraft> crafting workflow %s/%s #%d.%d (%d)", proj.Key, wf.Name, run.Number, run.LastSubNumber, run.ID) + + api.initWorkflowRun(ctx, proj.Key, wf, run, *run.ToCraftOpts) + + log.Info(ctx, "api.workflowRunCraft> workflow %s/%s #%d.%d (%d) crafted", proj.Key, wf.Name, run.Number, run.LastSubNumber, run.ID) + + return workflow.UpdateCraftedWorkflowRun(api.mustDB(), run.ID) +} diff --git a/engine/api/workflow_run_test.go b/engine/api/workflow_run_test.go index 0d7214fc63..261e7df271 100644 --- a/engine/api/workflow_run_test.go +++ b/engine/api/workflow_run_test.go @@ -119,7 +119,7 @@ func Test_getWorkflowNodeRunHistoryHandler(t *testing.T) { w1, err := workflow.Load(context.TODO(), db, api.Cache, *proj, "test_1", workflow.LoadOptions{}) require.NoError(t, err) - wrCreate, err := workflow.CreateRun(api.mustDB(), w1, nil, u) + wrCreate, err := workflow.CreateRun(api.mustDB(), w1, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID}) assert.NoError(t, err) wrCreate.Workflow = *w1 _, errMR := workflow.StartWorkflowRun(context.TODO(), db, api.Cache, *proj, wrCreate, &sdk.WorkflowRunPostHandlerOption{ @@ -247,7 +247,7 @@ func Test_getWorkflowRunsHandler(t *testing.T) { require.NoError(t, err) for i := 0; i < 10; i++ { - wr, err := workflow.CreateRun(api.mustDB(), w1, nil, u) + wr, err := workflow.CreateRun(api.mustDB(), w1, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID}) assert.NoError(t, err) wr.Workflow = *w1 _, err = workflow.StartWorkflowRun(context.TODO(), db, api.Cache, *proj, wr, &sdk.WorkflowRunPostHandlerOption{ @@ -401,7 +401,7 @@ func Test_getWorkflowRunsHandlerWithFilter(t *testing.T) { w1, err := workflow.Load(context.TODO(), api.mustDB(), api.Cache, *proj, "test_1", workflow.LoadOptions{}) require.NoError(t, err) - wr, err := workflow.CreateRun(api.mustDB(), w1, nil, u) + wr, err := workflow.CreateRun(api.mustDB(), w1, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID}) assert.NoError(t, err) wr.Workflow = *w1 _, err = workflow.StartWorkflowRun(context.TODO(), db, api.Cache, *proj, wr, &sdk.WorkflowRunPostHandlerOption{ @@ -518,7 +518,7 @@ func Test_getLatestWorkflowRunHandler(t *testing.T) { require.NoError(t, err) for i := 0; i < 10; i++ { - wr, err := workflow.CreateRun(api.mustDB(), w1, nil, u) + wr, err := workflow.CreateRun(api.mustDB(), w1, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID}) wr.Workflow = *w1 assert.NoError(t, err) _, err = workflow.StartWorkflowRun(context.TODO(), db, api.Cache, *proj, wr, &sdk.WorkflowRunPostHandlerOption{ @@ -652,7 +652,7 @@ func Test_getWorkflowRunHandler(t *testing.T) { require.NoError(t, err) for i := 0; i < 10; i++ { - wr, err := workflow.CreateRun(api.mustDB(), w1, nil, u) + wr, err := workflow.CreateRun(api.mustDB(), w1, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID}) assert.NoError(t, err) wr.Workflow = *w1 _, err = workflow.StartWorkflowRun(context.TODO(), db, api.Cache, *proj, wr, &sdk.WorkflowRunPostHandlerOption{ @@ -776,7 +776,7 @@ func Test_getWorkflowNodeRunHandler(t *testing.T) { w1, err := workflow.Load(context.TODO(), api.mustDB(), api.Cache, *proj, "test_1", workflow.LoadOptions{}) require.NoError(t, err) - wr, err := workflow.CreateRun(api.mustDB(), w1, nil, u) + wr, err := workflow.CreateRun(api.mustDB(), w1, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID}) assert.NoError(t, err) wr.Workflow = *w1 _, err = workflow.StartWorkflowRun(context.TODO(), db, api.Cache, *proj2, wr, &sdk.WorkflowRunPostHandlerOption{ @@ -1096,7 +1096,7 @@ func Test_postWorkflowRunHandler(t *testing.T) { assert.Equal(t, int64(1), wr.Number) // wait for the workflow to finish crafting - assert.NoError(t, waitCraftinWorkflow(t, db, wr.ID)) + assert.NoError(t, waitCraftinWorkflow(t, api, db, wr.ID)) lastRun, err := workflow.LoadLastRun(api.mustDB(), proj.Key, w1.Name, workflow.LoadRunOptions{}) test.NoError(t, err) @@ -1151,17 +1151,22 @@ func Test_postWorkflowRunHandler(t *testing.T) { } -func waitCraftinWorkflow(t *testing.T, db gorp.SqlExecutor, id int64) error { +func waitCraftinWorkflow(t *testing.T, api *API, db gorp.SqlExecutor, id int64) error { + t.Logf("(%v) waitCraftingWorkflow %d", time.Now(), id) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - tick := time.NewTicker(100 * time.Millisecond) + + go api.WorkflowRunCraft(ctx, 10*time.Millisecond) + + tick := time.NewTicker(1 * time.Second) defer tick.Stop() for { select { case <-ctx.Done(): + t.Logf("(%v) exiting waitCraftingWorkflow %d", time.Now(), id) return ctx.Err() case <-tick.C: - w, _ := workflow.LoadRunByID(db, id, workflow.LoadRunOptions{}) + w, _ := workflow.LoadRunByID(api.mustDB(), id, workflow.LoadRunOptions{}) if w == nil { continue } @@ -1171,7 +1176,6 @@ func waitCraftinWorkflow(t *testing.T, db gorp.SqlExecutor, id int64) error { return nil } } - } /** @@ -1370,6 +1374,10 @@ func Test_postWorkflowRunAsyncFailedHandler(t *testing.T) { router.Mux.ServeHTTP(rec, req) require.Equal(t, 202, rec.Code) + lastRun, err := workflow.LoadLastRun(api.mustDB(), proj.Key, w.Name, workflow.LoadRunOptions{}) + test.NoError(t, err) + waitCraftinWorkflow(t, api, db, lastRun.ID) + cpt := 0 for { t.Logf("Attempt getWorkflowRunHandler #%d", cpt) @@ -1818,6 +1826,10 @@ func Test_postWorkflowRunHandlerHookWithMutex(t *testing.T) { defer req.Body.Close() assert.Equal(t, 202, rec.Code) + lastRun, err := workflow.LoadLastRun(api.mustDB(), proj.Key, w.Name, workflow.LoadRunOptions{}) + test.NoError(t, err) + waitCraftinWorkflow(t, api, db, lastRun.ID) + req2 := assets.NewAuthentifiedRequest(t, u, pass, "POST", uri, opts) //Do the request, start a new run @@ -1829,10 +1841,14 @@ func Test_postWorkflowRunHandlerHookWithMutex(t *testing.T) { defer req2.Body.Close() assert.Equal(t, 202, rec2.Code) + lastRun, err = workflow.LoadLastRun(api.mustDB(), proj.Key, w.Name, workflow.LoadRunOptions{}) + test.NoError(t, err) + waitCraftinWorkflow(t, api, db, lastRun.ID) + // it's an async call, wait a bit the let cds take care of the previous request time.Sleep(3 * time.Second) - lastRun, err := workflow.LoadLastRun(api.mustDB(), proj.Key, w1.Name, workflow.LoadRunOptions{}) + lastRun, err = workflow.LoadLastRun(api.mustDB(), proj.Key, w1.Name, workflow.LoadRunOptions{}) test.NoError(t, err) assert.Equal(t, int64(2), lastRun.Number) assert.Equal(t, sdk.StatusBuilding, lastRun.Status) @@ -1882,6 +1898,10 @@ func Test_postWorkflowRunHandlerMutexRelease(t *testing.T) { router.Mux.ServeHTTP(rec, req) require.Equal(t, 202, rec.Code) + lastRun, err := workflow.LoadLastRun(api.mustDB(), proj.Key, wkf.Name, workflow.LoadRunOptions{}) + test.NoError(t, err) + waitCraftinWorkflow(t, api, db, lastRun.ID) + var try int for { if try > 10 { @@ -1924,6 +1944,10 @@ func Test_postWorkflowRunHandlerMutexRelease(t *testing.T) { router.Mux.ServeHTTP(rec, req) require.Equal(t, 202, rec.Code) + lastRun, err = workflow.LoadLastRun(api.mustDB(), proj.Key, wkf.Name, workflow.LoadRunOptions{}) + test.NoError(t, err) + waitCraftinWorkflow(t, api, db, lastRun.ID) + try = 0 for { if try > 10 { @@ -1969,6 +1993,10 @@ func Test_postWorkflowRunHandlerMutexRelease(t *testing.T) { router.Mux.ServeHTTP(rec, req) require.Equal(t, 202, rec.Code) + lastRun, err = workflow.LoadLastRun(api.mustDB(), proj.Key, wkf.Name, workflow.LoadRunOptions{}) + test.NoError(t, err) + waitCraftinWorkflow(t, api, db, lastRun.ID) + try = 0 for { if try > 10 { @@ -2261,7 +2289,7 @@ func Test_postWorkflowRunHandlerHook(t *testing.T) { require.NoError(t, json.Unmarshal(rec.Body.Bytes(), wr)) assert.Equal(t, int64(1), wr.Number) - assert.NoError(t, waitCraftinWorkflow(t, db, wr.ID)) + assert.NoError(t, waitCraftinWorkflow(t, api, db, wr.ID)) lastRun, err := workflow.LoadLastRun(api.mustDB(), proj.Key, w1.Name, workflow.LoadRunOptions{}) test.NoError(t, err) assert.NotNil(t, lastRun.RootRun()) @@ -2423,10 +2451,14 @@ func Test_postWorkflowRunHandler_ConditionNotOK(t *testing.T) { assert.Equal(t, 202, rec.Code) + lastRun, err := workflow.LoadLastRun(api.mustDB(), proj.Key, w.Name, workflow.LoadRunOptions{}) + test.NoError(t, err) + waitCraftinWorkflow(t, api, db, lastRun.ID) + // it's an async call, wait a bit the let cds take care of the previous request time.Sleep(3 * time.Second) - lastRun, err := workflow.LoadLastRun(api.mustDB(), proj.Key, w.Name, workflow.LoadRunOptions{}) + lastRun, err = workflow.LoadLastRun(api.mustDB(), proj.Key, w.Name, workflow.LoadRunOptions{}) test.NoError(t, err) assert.Equal(t, int64(1), lastRun.Number) assert.Equal(t, sdk.StatusNeverBuilt, lastRun.Status) @@ -2600,7 +2632,7 @@ func initGetWorkflowNodeRunJobTest(t *testing.T, api *API, db gorpmapper.SqlExec }) require.NoError(t, err) - wr, err := workflow.CreateRun(api.mustDB(), w1, nil, u) + wr, err := workflow.CreateRun(api.mustDB(), w1, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID}) assert.NoError(t, err) wr.Workflow = *w1 _, err = workflow.StartWorkflowRun(context.TODO(), db, api.Cache, *proj, wr, &sdk.WorkflowRunPostHandlerOption{ @@ -2790,7 +2822,7 @@ func Test_deleteWorkflowRunsBranchHandler(t *testing.T) { w1, err := workflow.Load(context.TODO(), api.mustDB(), api.Cache, *proj, "test_1", workflow.LoadOptions{}) require.NoError(t, err) - wr, err := workflow.CreateRun(api.mustDB(), w1, nil, u) + wr, err := workflow.CreateRun(api.mustDB(), w1, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID}) assert.NoError(t, err) wr.Workflow = *w1 wr.Tag("git.branch", "master") @@ -2857,6 +2889,7 @@ func Test_deleteWorkflowRunHandler(t *testing.T) { u, pass := assets.InsertAdminUser(t, db) key := sdk.RandomString(10) proj := assets.InsertTestProject(t, db, api.Cache, key, key) + consumer, _ := authentication.LoadConsumerByTypeAndUserID(context.TODO(), db, sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser) //First pipeline pip := sdk.Pipeline{ @@ -2934,7 +2967,7 @@ func Test_deleteWorkflowRunHandler(t *testing.T) { w1, err := workflow.Load(context.TODO(), api.mustDB(), api.Cache, *proj, "test_1", workflow.LoadOptions{}) require.NoError(t, err) - wr, err := workflow.CreateRun(api.mustDB(), w1, nil, u) + wr, err := workflow.CreateRun(api.mustDB(), w1, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID}) assert.NoError(t, err) //Prepare request vars := map[string]string{ @@ -3003,6 +3036,7 @@ func Test_postWorkflowRunHandlerRestartOnlyFailed(t *testing.T) { u, pass := assets.InsertAdminUser(t, db) key := sdk.RandomString(10) proj := assets.InsertTestProject(t, db, api.Cache, key, key) + consumer, _ := authentication.LoadConsumerByTypeAndUserID(context.TODO(), db, sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser) pip := sdk.Pipeline{ ProjectID: proj.ID, @@ -3064,7 +3098,7 @@ func Test_postWorkflowRunHandlerRestartOnlyFailed(t *testing.T) { uri := router.GetRoute("POST", api.postWorkflowRunHandler, vars) test.NotEmpty(t, uri) - opts := &sdk.WorkflowRunPostHandlerOption{ + opts := sdk.WorkflowRunPostHandlerOption{ Manual: &sdk.WorkflowNodeRunManual{ OnlyFailedJobs: false, Resync: false, @@ -3082,7 +3116,7 @@ func Test_postWorkflowRunHandlerRestartOnlyFailed(t *testing.T) { assert.Equal(t, int64(1), wr.Number) // wait for the workflow to finish crafting - assert.NoError(t, waitCraftinWorkflow(t, db, wr.ID)) + assert.NoError(t, waitCraftinWorkflow(t, api, db, wr.ID)) wrr, _ := workflow.LoadRun(context.TODO(), db, proj2.Key, w1.Name, 1, workflow.LoadRunOptions{}) assert.Equal(t, sdk.StatusBuilding, wrr.Status) @@ -3120,17 +3154,16 @@ func Test_postWorkflowRunHandlerRestartOnlyFailed(t *testing.T) { } assert.NoError(t, workflow.UpdateNodeRun(db, nr)) - opts = &sdk.WorkflowRunPostHandlerOption{ + opts = sdk.WorkflowRunPostHandlerOption{ Manual: &sdk.WorkflowNodeRunManual{ OnlyFailedJobs: true, Resync: false, }, - FromNodeIDs: []int64{w1.WorkflowData.Node.ID}, - Number: &wrr.Number, + FromNodeIDs: []int64{w1.WorkflowData.Node.ID}, + Number: &wrr.Number, + AuthConsumerID: consumer.ID, } - api.initWorkflowRun(context.TODO(), proj2.Key, &wrr.Workflow, wrr, opts, &sdk.AuthConsumer{ - AuthentifiedUser: u, - }) + api.initWorkflowRun(context.TODO(), proj2.Key, &wrr.Workflow, wrr, opts) wrr, _ = workflow.LoadRun(context.TODO(), db, proj2.Key, w1.Name, 1, workflow.LoadRunOptions{}) @@ -3226,7 +3259,7 @@ func Test_postWorkflowRunHandlerRestartResync(t *testing.T) { assert.Equal(t, int64(1), wr.Number) // wait for the workflow to finish crafting - assert.NoError(t, waitCraftinWorkflow(t, db, wr.ID)) + assert.NoError(t, waitCraftinWorkflow(t, api, db, wr.ID)) wrr, _ := workflow.LoadRun(context.TODO(), db, proj2.Key, w1.Name, 1, workflow.LoadRunOptions{}) assert.Equal(t, sdk.StatusBuilding, wrr.Status) diff --git a/engine/api/workflow_test.go b/engine/api/workflow_test.go index 460ead8432..3488d86ca7 100644 --- a/engine/api/workflow_test.go +++ b/engine/api/workflow_test.go @@ -217,7 +217,7 @@ func Test_getWorkflowNotificationsConditionsHandler(t *testing.T) { w1, err := workflow.Load(context.TODO(), db, api.Cache, *proj, "test_1", workflow.LoadOptions{}) test.NoError(t, err) - wrCreate, err := workflow.CreateRun(api.mustDB(), w1, nil, u) + wrCreate, err := workflow.CreateRun(api.mustDB(), w1, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID}) assert.NoError(t, err) wrCreate.Workflow = *w1 _, errMR := workflow.StartWorkflowRun(context.TODO(), db, api.Cache, *proj, wrCreate, &sdk.WorkflowRunPostHandlerOption{ @@ -1897,7 +1897,9 @@ func Test_getSearchWorkflowHandler(t *testing.T) { // Run the workflow consumer, _ := authentication.LoadConsumerByTypeAndUserID(context.TODO(), api.mustDB(), sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser) - wr, err := workflow.CreateRun(api.mustDB(), &wf, nil, admin) + consumerAdmin, _ := authentication.LoadConsumerByTypeAndUserID(context.TODO(), api.mustDB(), sdk.ConsumerLocal, admin.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser) + + wr, err := workflow.CreateRun(api.mustDB(), &wf, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumerAdmin.ID}) assert.NoError(t, err) wr.Workflow = wf wr.Tag("git.branch", "master") diff --git a/engine/sql/api/214_workflow_run_crafting.sql b/engine/sql/api/214_workflow_run_crafting.sql new file mode 100644 index 0000000000..9f65a6b5eb --- /dev/null +++ b/engine/sql/api/214_workflow_run_crafting.sql @@ -0,0 +1,15 @@ +-- +migrate Up + +ALTER TABLE "workflow_run" ADD COLUMN to_craft BOOLEAN; +UPDATE "workflow_run" set to_craft = false; +ALTER TABLE "workflow_run" ALTER COLUMN to_craft SET DEFAULT false; + +ALTER TABLE "workflow_run" ADD COLUMN to_craft_opts JSONB; + +SELECT create_index('workflow_run', 'IDX_WORKFLOW_RUN_TO_CRAFT', 'id,to_craft'); + +-- +migrate Down +ALTER TABLE "workflow_run" DROP COLUMN to_craft; +ALTER TABLE "workflow_run" DROP COLUMN to_craft_opts; + + diff --git a/sdk/error.go b/sdk/error.go index 69a06ea103..d4bdf52b54 100644 --- a/sdk/error.go +++ b/sdk/error.go @@ -198,6 +198,7 @@ var ( ErrUnsupportedMediaType = Error{ID: 188, Status: http.StatusUnsupportedMediaType} ErrNothingToPush = Error{ID: 189, Status: http.StatusBadRequest} ErrWorkerErrorCommand = Error{ID: 190, Status: http.StatusBadRequest} + ErrRepoAnalyzeFailed = Error{ID: 191, Status: http.StatusInternalServerError} ) var errorsAmericanEnglish = map[int]string{ @@ -378,6 +379,7 @@ var errorsAmericanEnglish = map[int]string{ ErrUnsupportedMediaType.ID: "Request format invalid", ErrNothingToPush.ID: "No diff to push", ErrWorkerErrorCommand.ID: "Worker command in error", + ErrRepoAnalyzeFailed.ID: "Unable to analyse repository", } var errorsFrench = map[int]string{ @@ -558,6 +560,7 @@ var errorsFrench = map[int]string{ ErrUnsupportedMediaType.ID: "Le format de la requête est invalide", ErrNothingToPush.ID: "Aucune modification à pousser", ErrWorkerErrorCommand.ID: "Commande du worker en erreur", + ErrRepoAnalyzeFailed.ID: "L'analyse du repository a echoué", } // Error type. diff --git a/sdk/telemetry/tracing_utils.go b/sdk/telemetry/tracing_utils.go index ad360f9a87..dd32391453 100644 --- a/sdk/telemetry/tracing_utils.go +++ b/sdk/telemetry/tracing_utils.go @@ -14,9 +14,15 @@ func New(ctx context.Context, s Service, name string, sampler trace.Sampler, spa if exp == nil { return ctx, nil } - return trace.StartSpan(ctx, name, + ctx, span := trace.StartSpan(ctx, name, trace.WithSampler(sampler), trace.WithSpanKind(spanKind)) + ctx = SpanContextToContext(ctx, span.SpanContext()) + ctx = ContextWithTag(ctx, + TagServiceType, s.Type(), + TagServiceName, s.Name(), + ) + return ctx, span } // Start may start a tracing span diff --git a/sdk/workflow_run.go b/sdk/workflow_run.go index 58cc7c4a32..277dbe5d06 100644 --- a/sdk/workflow_run.go +++ b/sdk/workflow_run.go @@ -1,6 +1,8 @@ package sdk import ( + "database/sql/driver" + json "encoding/json" "fmt" "net/url" "sort" @@ -56,6 +58,8 @@ type WorkflowRun struct { Header WorkflowRunHeaders `json:"header,omitempty" db:"-"` URLs URL `json:"urls" yaml:"-" db:"-" cli:"-"` ReadOnly bool `json:"read_only" yaml:"-" db:"read_only" cli:"-"` + ToCraft bool `json:"-" yaml:"-" db:"to_craft" cli:"-"` + ToCraftOpts *WorkflowRunPostHandlerOption `json:"-" yaml:"-" db:"to_craft_opts" cli:"-"` } type WorkflowRunSecret struct { @@ -77,10 +81,29 @@ type WorkflowNodeRunRelease struct { // WorkflowRunPostHandlerOption contains the body content for launch a workflow type WorkflowRunPostHandlerOption struct { - Hook *WorkflowNodeRunHookEvent `json:"hook,omitempty"` - Manual *WorkflowNodeRunManual `json:"manual,omitempty"` - Number *int64 `json:"number,omitempty"` - FromNodeIDs []int64 `json:"from_nodes,omitempty"` + Hook *WorkflowNodeRunHookEvent `json:"hook,omitempty"` + Manual *WorkflowNodeRunManual `json:"manual,omitempty"` + Number *int64 `json:"number,omitempty"` + FromNodeIDs []int64 `json:"from_nodes,omitempty"` + AuthConsumerID string `json:"auth_consumer,omitempty"` +} + +// Value returns driver.Value from WorkflowRunPostHandlerOption. +func (a *WorkflowRunPostHandlerOption) Value() (driver.Value, error) { + j, err := json.Marshal(a) + return j, WrapError(err, "cannot marshal Author") +} + +// Scan WorkflowRunPostHandlerOption. +func (a *WorkflowRunPostHandlerOption) Scan(src interface{}) error { + if src == nil { + return nil + } + source, ok := src.([]byte) + if !ok { + return WithStack(fmt.Errorf("type assertion .([]byte) failed (%T)", src)) + } + return WrapError(json.Unmarshal(source, a), "cannot unmarshal WorkflowRunPostHandlerOption") } //WorkflowRunNumber contains a workflow run number