Skip to content

Commit

Permalink
fix(api): craft workflow runs (#5355)
Browse files Browse the repository at this point in the history
  • Loading branch information
fsamin authored Aug 20, 2020
1 parent bcf8355 commit a111301
Show file tree
Hide file tree
Showing 30 changed files with 484 additions and 213 deletions.
3 changes: 3 additions & 0 deletions engine/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,9 @@ func (a *API) Serve(ctx context.Context) error {
sdk.GoRoutine(ctx, "authentication.SessionCleaner", func(ctx context.Context) {
authentication.SessionCleaner(ctx, a.mustDB, 10*time.Second)
}, a.PanicDump())
sdk.GoRoutine(ctx, "api.WorkflowRunCraft", func(ctx context.Context) {
a.WorkflowRunCraft(ctx, 100*time.Millisecond)
}, a.PanicDump())

migrate.Add(ctx, sdk.Migration{Name: "RunsSecrets", Release: "0.47.0", Blocker: false, Automatic: true, ExecFunc: func(ctx context.Context) error {
return migrate.RunsSecrets(ctx, a.DBConnectionFactory.GetDBMap(gorpmapping.Mapper))
Expand Down
2 changes: 1 addition & 1 deletion engine/api/operation/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (

"github.com/go-gorp/gorp"

"github.com/ovh/cds/engine/cache"
"github.com/ovh/cds/engine/api/repositoriesmanager"
"github.com/ovh/cds/engine/api/services"
"github.com/ovh/cds/engine/cache"
"github.com/ovh/cds/engine/gorpmapper"
"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/exportentities"
Expand Down
5 changes: 4 additions & 1 deletion engine/api/project/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@ import (
"database/sql"
"reflect"
"runtime"
"strings"
"time"

"github.com/go-gorp/gorp"

"github.com/ovh/cds/engine/cache"
"github.com/ovh/cds/engine/api/database/gorpmapping"
"github.com/ovh/cds/engine/api/environment"
"github.com/ovh/cds/engine/api/group"
"github.com/ovh/cds/engine/api/keys"
"github.com/ovh/cds/engine/api/repositoriesmanager"
"github.com/ovh/cds/engine/cache"
"github.com/ovh/cds/engine/gorpmapper"
"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/log"
Expand Down Expand Up @@ -310,6 +311,8 @@ func unwrap(ctx context.Context, db gorp.SqlExecutor, p *dbProject, opts []LoadO
continue
}
name := runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name()
nameSplitted := strings.Split(name, "/")
name = nameSplitted[len(nameSplitted)-1]
_, end = telemetry.Span(ctx, name)
if err := f(db, &proj); err != nil && sdk.Cause(err) != sql.ErrNoRows {
end()
Expand Down
2 changes: 1 addition & 1 deletion engine/api/repositories_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ vcs_ssh_key: proj-blabla
t.Log("Inserting workflow run=====")

// creates a run
wr, errWR := workflow.CreateRun(db.DbMap, w1, nil, u)
wr, errWR := workflow.CreateRun(db.DbMap, w1, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID})
assert.NoError(t, errWR)
wr.Workflow = *w1
t.Log("Starting workflow run=====")
Expand Down
2 changes: 1 addition & 1 deletion engine/api/router_middleware_auth_permission_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
func Test_checkWorkflowPermissions(t *testing.T) {
api, db, _ := newTestAPI(t)

wctx := testRunWorkflow(t, api, db, api.Router)
wctx := testRunWorkflow(t, api, api.Router)
user := wctx.user
admin, _ := assets.InsertAdminUser(t, db)
maintainer, _ := assets.InsertAdminUser(t, db)
Expand Down
49 changes: 42 additions & 7 deletions engine/api/workflow/dao_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/lib/pq"
"go.opencensus.io/stats"

"github.com/ovh/cds/engine/api/authentication"
"github.com/ovh/cds/engine/api/database/gorpmapping"
"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/log"
Expand All @@ -33,7 +34,9 @@ workflow_run.last_sub_num,
workflow_run.last_execution,
workflow_run.to_delete,
workflow_run.read_only,
workflow_run.version
workflow_run.version,
workflow_run.to_craft,
workflow_run.to_craft_opts
`

// LoadRunOptions are options for loading a run (node or workflow)
Expand Down Expand Up @@ -590,8 +593,34 @@ func InsertRunNum(db gorp.SqlExecutor, w *sdk.Workflow, num int64) error {
return nil
}

func LoadCratingWorkflowRunIDs(db gorp.SqlExecutor) ([]int64, error) {
query := `
SELECT id
FROM workflow_run
WHERE to_craft = true
LIMIT 10
`
var ids []int64
_, err := db.Select(&ids, query)
if err != nil {
return nil, sdk.WrapError(err, "unable to load crafting workflow runs")
}
return ids, nil
}

func UpdateCraftedWorkflowRun(db gorp.SqlExecutor, id int64) error {
query := `UPDATE workflow_run
SET to_craft = false
WHERE id = $1
`
if _, err := db.Exec(query, id); err != nil {
return sdk.WrapError(err, "unable to update crafting workflow run %d", id)
}
return nil
}

// CreateRun creates a new workflow run and insert it
func CreateRun(db *gorp.DbMap, wf *sdk.Workflow, opts *sdk.WorkflowRunPostHandlerOption, ident sdk.Identifiable) (*sdk.WorkflowRun, error) {
func CreateRun(db *gorp.DbMap, wf *sdk.Workflow, opts sdk.WorkflowRunPostHandlerOption) (*sdk.WorkflowRun, error) {
number, err := NextRunNumber(db, wf.ID)
if err != nil {
return nil, sdk.WrapError(err, "unable to get next run number")
Expand All @@ -606,25 +635,31 @@ func CreateRun(db *gorp.DbMap, wf *sdk.Workflow, opts *sdk.WorkflowRunPostHandle
Status: sdk.StatusPending,
LastExecution: time.Now(),
Tags: make([]sdk.WorkflowRunTag, 0),
Workflow: sdk.Workflow{Name: wf.Name},
Workflow: *wf, //sdk.Workflow{Name: wf.Name},
ToCraft: true,
ToCraftOpts: &opts,
}

if opts != nil && opts.Hook != nil {
if opts.Hook != nil {
if trigg, ok := opts.Hook.Payload["cds.triggered_by.username"]; ok {
wr.Tag(tagTriggeredBy, trigg)
} else {
wr.Tag(tagTriggeredBy, "cds.hook")
}
} else {
wr.Tag(tagTriggeredBy, ident.GetUsername())
c, err := authentication.LoadConsumerByID(context.Background(), db, opts.AuthConsumerID, authentication.LoadConsumerOptions.WithAuthentifiedUser, authentication.LoadConsumerOptions.WithConsumerGroups)
if err != nil {
return nil, err
}
wr.Tag(tagTriggeredBy, c.GetUsername())
}

tags := wf.Metadata["default_tags"]
var payload map[string]string
if opts != nil && opts.Hook != nil {
if opts.Hook != nil {
payload = opts.Hook.Payload
}
if opts != nil && opts.Manual != nil {
if opts.Manual != nil {
e := dump.NewDefaultEncoder()
e.Formatters = []dump.KeyFormatterFunc{dump.WithDefaultLowerCaseFormatter()}
e.ExtraFields.DetailedMap = false
Expand Down
17 changes: 9 additions & 8 deletions engine/api/workflow/dao_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v2"

"github.com/ovh/cds/engine/api/application"
Expand Down Expand Up @@ -230,8 +231,8 @@ vcs_ssh_key: proj-blabla
test.NoError(t, err)

for i := 0; i < 5; i++ {
wr, errWR := workflow.CreateRun(db.DbMap, w1, nil, u)
assert.NoError(t, errWR)
wr, errWR := workflow.CreateRun(db.DbMap, w1, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID})
require.NoError(t, errWR)
wr.Workflow = *w1
_, errWr := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wr, &sdk.WorkflowRunPostHandlerOption{
Manual: &sdk.WorkflowNodeRunManual{
Expand Down Expand Up @@ -317,7 +318,7 @@ func TestPurgeWorkflowRunWithRunningStatus(t *testing.T) {
test.NoError(t, err)

for i := 0; i < 5; i++ {
wfr, errWR := workflow.CreateRun(db.DbMap, w1, nil, u)
wfr, errWR := workflow.CreateRun(db.DbMap, w1, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID})
assert.NoError(t, errWR)
wfr.Workflow = *w1
_, errWr := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wfr, &sdk.WorkflowRunPostHandlerOption{
Expand Down Expand Up @@ -504,7 +505,7 @@ vcs_ssh_key: proj-blabla
})
test.NoError(t, err)

wr, errWR := workflow.CreateRun(db.DbMap, w1, nil, u)
wr, errWR := workflow.CreateRun(db.DbMap, w1, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID})
assert.NoError(t, errWR)
wr.Workflow = *w1
_, errWr := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wr, &sdk.WorkflowRunPostHandlerOption{
Expand All @@ -519,7 +520,7 @@ vcs_ssh_key: proj-blabla
test.NoError(t, errWr)

for i := 0; i < 5; i++ {
wfr, errWR := workflow.CreateRun(db.DbMap, w1, nil, u)
wfr, errWR := workflow.CreateRun(db.DbMap, w1, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID})
assert.NoError(t, errWR)
wfr.Workflow = *w1
_, errWr := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wfr, &sdk.WorkflowRunPostHandlerOption{
Expand Down Expand Up @@ -696,7 +697,7 @@ vcs_ssh_key: proj-blabla
test.NoError(t, err)

for i := 0; i < 5; i++ {
wfr, errWR := workflow.CreateRun(db.DbMap, w1, nil, u)
wfr, errWR := workflow.CreateRun(db.DbMap, w1, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID})
assert.NoError(t, errWR)
wfr.Workflow = *w1
_, errWr := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wfr, &sdk.WorkflowRunPostHandlerOption{
Expand Down Expand Up @@ -788,7 +789,7 @@ func TestPurgeWorkflowRunWithoutTags(t *testing.T) {

branches := []string{"master", "master", "master", "develop", "develop", "testBr", "testBr", "testBr", "testBr", "test4"}
for i := 0; i < 10; i++ {
wr, errWR := workflow.CreateRun(db.DbMap, w1, nil, u)
wr, errWR := workflow.CreateRun(db.DbMap, w1, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID})
assert.NoError(t, errWR)
wr.Workflow = *w1
_, errWr := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wr, &sdk.WorkflowRunPostHandlerOption{
Expand Down Expand Up @@ -874,7 +875,7 @@ func TestPurgeWorkflowRunWithoutTagsBiggerHistoryLength(t *testing.T) {

branches := []string{"master", "master", "master", "develop", "develop", "testBr", "testBr", "testBr", "testBr", "test4"}
for i := 0; i < 10; i++ {
wr, errWR := workflow.CreateRun(db.DbMap, w1, nil, u)
wr, errWR := workflow.CreateRun(db.DbMap, w1, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID})
assert.NoError(t, errWR)
wr.Workflow = *w1
_, errWr := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wr, &sdk.WorkflowRunPostHandlerOption{
Expand Down
2 changes: 1 addition & 1 deletion engine/api/workflow/dao_staticfiles_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestInsertStaticFiles(t *testing.T) {
})
test.NoError(t, err)

wfr, errWR := workflow.CreateRun(db.DbMap, w1, nil, u)
wfr, errWR := workflow.CreateRun(db.DbMap, w1, sdk.WorkflowRunPostHandlerOption{AuthConsumerID: consumer.ID})
assert.NoError(t, errWR)
wfr.Workflow = *w1
_, errWr := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wfr, &sdk.WorkflowRunPostHandlerOption{
Expand Down
1 change: 0 additions & 1 deletion engine/api/workflow/factory_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,6 @@ func (dao WorkflowDAO) withNotifications(db gorp.SqlExecutor, ws *[]Workflow) er
for x := range *ws {
w := &(*ws)[x]
w.Notifications = notificationsMap[w.ID]
log.Debug("workflow %d notifications: %+v", w.ID, w.Notifications)
}
return nil
}
Expand Down
Loading

0 comments on commit a111301

Please sign in to comment.