Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(api): craft workflow runs #5355

Merged
merged 6 commits into from
Aug 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions engine/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,9 @@ func (a *API) Serve(ctx context.Context) error {
sdk.GoRoutine(ctx, "authentication.SessionCleaner", func(ctx context.Context) {
authentication.SessionCleaner(ctx, a.mustDB, 10*time.Second)
}, a.PanicDump())
sdk.GoRoutine(ctx, "api.WorkflowRunCraft", func(ctx context.Context) {
a.WorkflowRunCraft(ctx, 100*time.Millisecond)
}, a.PanicDump())

migrate.Add(ctx, sdk.Migration{Name: "RunsSecrets", Release: "0.47.0", Blocker: false, Automatic: true, ExecFunc: func(ctx context.Context) error {
return migrate.RunsSecrets(ctx, a.DBConnectionFactory.GetDBMap(gorpmapping.Mapper))
Expand Down
2 changes: 1 addition & 1 deletion engine/api/operation/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (

"github.com/go-gorp/gorp"

"github.com/ovh/cds/engine/cache"
"github.com/ovh/cds/engine/api/repositoriesmanager"
"github.com/ovh/cds/engine/api/services"
"github.com/ovh/cds/engine/cache"
"github.com/ovh/cds/engine/gorpmapper"
"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/exportentities"
Expand Down
5 changes: 4 additions & 1 deletion engine/api/project/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@ import (
"database/sql"
"reflect"
"runtime"
"strings"
"time"

"github.com/go-gorp/gorp"

"github.com/ovh/cds/engine/cache"
"github.com/ovh/cds/engine/api/database/gorpmapping"
"github.com/ovh/cds/engine/api/environment"
"github.com/ovh/cds/engine/api/group"
"github.com/ovh/cds/engine/api/keys"
"github.com/ovh/cds/engine/api/repositoriesmanager"
"github.com/ovh/cds/engine/cache"
"github.com/ovh/cds/engine/gorpmapper"
"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/log"
Expand Down Expand Up @@ -310,6 +311,8 @@ func unwrap(ctx context.Context, db gorp.SqlExecutor, p *dbProject, opts []LoadO
continue
}
name := runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name()
nameSplitted := strings.Split(name, "/")
name = nameSplitted[len(nameSplitted)-1]
_, end = telemetry.Span(ctx, name)
if err := f(db, &proj); err != nil && sdk.Cause(err) != sql.ErrNoRows {
end()
Expand Down
2 changes: 1 addition & 1 deletion engine/api/repositories_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ vcs_ssh_key: proj-blabla
t.Log("Inserting workflow run=====")

// creates a run
wr, errWR := workflow.CreateRun(db.DbMap, w1, nil, u)
wr, errWR := workflow.CreateRun(db.DbMap, w1, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID})
assert.NoError(t, errWR)
wr.Workflow = *w1
t.Log("Starting workflow run=====")
Expand Down
2 changes: 1 addition & 1 deletion engine/api/router_middleware_auth_permission_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
func Test_checkWorkflowPermissions(t *testing.T) {
api, db, _ := newTestAPI(t)

wctx := testRunWorkflow(t, api, db, api.Router)
wctx := testRunWorkflow(t, api, api.Router)
user := wctx.user
admin, _ := assets.InsertAdminUser(t, db)
maintainer, _ := assets.InsertAdminUser(t, db)
Expand Down
49 changes: 42 additions & 7 deletions engine/api/workflow/dao_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/lib/pq"
"go.opencensus.io/stats"

"github.com/ovh/cds/engine/api/authentication"
"github.com/ovh/cds/engine/api/database/gorpmapping"
"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/log"
Expand All @@ -33,7 +34,9 @@ workflow_run.last_sub_num,
workflow_run.last_execution,
workflow_run.to_delete,
workflow_run.read_only,
workflow_run.version
workflow_run.version,
workflow_run.to_craft,
workflow_run.to_craft_opts
`

// LoadRunOptions are options for loading a run (node or workflow)
Expand Down Expand Up @@ -590,8 +593,34 @@ func InsertRunNum(db gorp.SqlExecutor, w *sdk.Workflow, num int64) error {
return nil
}

func LoadCratingWorkflowRunIDs(db gorp.SqlExecutor) ([]int64, error) {
query := `
SELECT id
FROM workflow_run
WHERE to_craft = true
LIMIT 10
`
var ids []int64
_, err := db.Select(&ids, query)
if err != nil {
return nil, sdk.WrapError(err, "unable to load crafting workflow runs")
}
return ids, nil
}

func UpdateCraftedWorkflowRun(db gorp.SqlExecutor, id int64) error {
query := `UPDATE workflow_run
SET to_craft = false
WHERE id = $1
`
if _, err := db.Exec(query, id); err != nil {
return sdk.WrapError(err, "unable to update crafting workflow run %d", id)
}
return nil
}

// CreateRun creates a new workflow run and insert it
func CreateRun(db *gorp.DbMap, wf *sdk.Workflow, opts *sdk.WorkflowRunPostHandlerOption, ident sdk.Identifiable) (*sdk.WorkflowRun, error) {
func CreateRun(db *gorp.DbMap, wf *sdk.Workflow, opts sdk.WorkflowRunPostHandlerOption) (*sdk.WorkflowRun, error) {
number, err := NextRunNumber(db, wf.ID)
if err != nil {
return nil, sdk.WrapError(err, "unable to get next run number")
Expand All @@ -606,25 +635,31 @@ func CreateRun(db *gorp.DbMap, wf *sdk.Workflow, opts *sdk.WorkflowRunPostHandle
Status: sdk.StatusPending,
LastExecution: time.Now(),
Tags: make([]sdk.WorkflowRunTag, 0),
Workflow: sdk.Workflow{Name: wf.Name},
Workflow: *wf, //sdk.Workflow{Name: wf.Name},
ToCraft: true,
ToCraftOpts: &opts,
}

if opts != nil && opts.Hook != nil {
if opts.Hook != nil {
if trigg, ok := opts.Hook.Payload["cds.triggered_by.username"]; ok {
wr.Tag(tagTriggeredBy, trigg)
} else {
wr.Tag(tagTriggeredBy, "cds.hook")
}
} else {
wr.Tag(tagTriggeredBy, ident.GetUsername())
c, err := authentication.LoadConsumerByID(context.Background(), db, opts.AuthConsumerID, authentication.LoadConsumerOptions.WithAuthentifiedUser, authentication.LoadConsumerOptions.WithConsumerGroups)
if err != nil {
return nil, err
}
wr.Tag(tagTriggeredBy, c.GetUsername())
}

tags := wf.Metadata["default_tags"]
var payload map[string]string
if opts != nil && opts.Hook != nil {
if opts.Hook != nil {
payload = opts.Hook.Payload
}
if opts != nil && opts.Manual != nil {
if opts.Manual != nil {
e := dump.NewDefaultEncoder()
e.Formatters = []dump.KeyFormatterFunc{dump.WithDefaultLowerCaseFormatter()}
e.ExtraFields.DetailedMap = false
Expand Down
17 changes: 9 additions & 8 deletions engine/api/workflow/dao_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v2"

"github.com/ovh/cds/engine/api/application"
Expand Down Expand Up @@ -230,8 +231,8 @@ vcs_ssh_key: proj-blabla
test.NoError(t, err)

for i := 0; i < 5; i++ {
wr, errWR := workflow.CreateRun(db.DbMap, w1, nil, u)
assert.NoError(t, errWR)
wr, errWR := workflow.CreateRun(db.DbMap, w1, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID})
require.NoError(t, errWR)
wr.Workflow = *w1
_, errWr := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wr, &sdk.WorkflowRunPostHandlerOption{
Manual: &sdk.WorkflowNodeRunManual{
Expand Down Expand Up @@ -317,7 +318,7 @@ func TestPurgeWorkflowRunWithRunningStatus(t *testing.T) {
test.NoError(t, err)

for i := 0; i < 5; i++ {
wfr, errWR := workflow.CreateRun(db.DbMap, w1, nil, u)
wfr, errWR := workflow.CreateRun(db.DbMap, w1, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID})
assert.NoError(t, errWR)
wfr.Workflow = *w1
_, errWr := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wfr, &sdk.WorkflowRunPostHandlerOption{
Expand Down Expand Up @@ -504,7 +505,7 @@ vcs_ssh_key: proj-blabla
})
test.NoError(t, err)

wr, errWR := workflow.CreateRun(db.DbMap, w1, nil, u)
wr, errWR := workflow.CreateRun(db.DbMap, w1, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID})
assert.NoError(t, errWR)
wr.Workflow = *w1
_, errWr := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wr, &sdk.WorkflowRunPostHandlerOption{
Expand All @@ -519,7 +520,7 @@ vcs_ssh_key: proj-blabla
test.NoError(t, errWr)

for i := 0; i < 5; i++ {
wfr, errWR := workflow.CreateRun(db.DbMap, w1, nil, u)
wfr, errWR := workflow.CreateRun(db.DbMap, w1, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID})
assert.NoError(t, errWR)
wfr.Workflow = *w1
_, errWr := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wfr, &sdk.WorkflowRunPostHandlerOption{
Expand Down Expand Up @@ -696,7 +697,7 @@ vcs_ssh_key: proj-blabla
test.NoError(t, err)

for i := 0; i < 5; i++ {
wfr, errWR := workflow.CreateRun(db.DbMap, w1, nil, u)
wfr, errWR := workflow.CreateRun(db.DbMap, w1, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID})
assert.NoError(t, errWR)
wfr.Workflow = *w1
_, errWr := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wfr, &sdk.WorkflowRunPostHandlerOption{
Expand Down Expand Up @@ -788,7 +789,7 @@ func TestPurgeWorkflowRunWithoutTags(t *testing.T) {

branches := []string{"master", "master", "master", "develop", "develop", "testBr", "testBr", "testBr", "testBr", "test4"}
for i := 0; i < 10; i++ {
wr, errWR := workflow.CreateRun(db.DbMap, w1, nil, u)
wr, errWR := workflow.CreateRun(db.DbMap, w1, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID})
assert.NoError(t, errWR)
wr.Workflow = *w1
_, errWr := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wr, &sdk.WorkflowRunPostHandlerOption{
Expand Down Expand Up @@ -874,7 +875,7 @@ func TestPurgeWorkflowRunWithoutTagsBiggerHistoryLength(t *testing.T) {

branches := []string{"master", "master", "master", "develop", "develop", "testBr", "testBr", "testBr", "testBr", "test4"}
for i := 0; i < 10; i++ {
wr, errWR := workflow.CreateRun(db.DbMap, w1, nil, u)
wr, errWR := workflow.CreateRun(db.DbMap, w1, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID})
assert.NoError(t, errWR)
wr.Workflow = *w1
_, errWr := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wr, &sdk.WorkflowRunPostHandlerOption{
Expand Down
2 changes: 1 addition & 1 deletion engine/api/workflow/dao_staticfiles_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestInsertStaticFiles(t *testing.T) {
})
test.NoError(t, err)

wfr, errWR := workflow.CreateRun(db.DbMap, w1, nil, u)
wfr, errWR := workflow.CreateRun(db.DbMap, w1, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID})
assert.NoError(t, errWR)
wfr.Workflow = *w1
_, errWr := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wfr, &sdk.WorkflowRunPostHandlerOption{
Expand Down
1 change: 0 additions & 1 deletion engine/api/workflow/factory_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,6 @@ func (dao WorkflowDAO) withNotifications(db gorp.SqlExecutor, ws *[]Workflow) er
for x := range *ws {
w := &(*ws)[x]
w.Notifications = notificationsMap[w.ID]
log.Debug("workflow %d notifications: %+v", w.ID, w.Notifications)
}
return nil
}
Expand Down
Loading