Skip to content

Commit

Permalink
feat(api): craft workflow runs
Browse files Browse the repository at this point in the history
Signed-off-by: francois  samin <[email protected]>
  • Loading branch information
fsamin committed Jul 31, 2020
1 parent 488401b commit e50bf74
Show file tree
Hide file tree
Showing 30 changed files with 472 additions and 213 deletions.
5 changes: 4 additions & 1 deletion engine/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/ovh/cds/engine/api/authentication/local"
"github.com/ovh/cds/engine/api/bootstrap"
"github.com/ovh/cds/engine/api/broadcast"
"github.com/ovh/cds/engine/cache"
"github.com/ovh/cds/engine/api/database/gorpmapping"
"github.com/ovh/cds/engine/api/event"
"github.com/ovh/cds/engine/api/integration"
Expand All @@ -46,6 +45,7 @@ import (
"github.com/ovh/cds/engine/api/worker"
"github.com/ovh/cds/engine/api/workermodel"
"github.com/ovh/cds/engine/api/workflow"
"github.com/ovh/cds/engine/cache"
"github.com/ovh/cds/engine/database"
"github.com/ovh/cds/engine/featureflipping"
"github.com/ovh/cds/engine/gorpmapper"
Expand Down Expand Up @@ -650,6 +650,9 @@ func (a *API) Serve(ctx context.Context) error {
sdk.GoRoutine(ctx, "authentication.SessionCleaner", func(ctx context.Context) {
authentication.SessionCleaner(ctx, a.mustDB, 10*time.Second)
}, a.PanicDump())
sdk.GoRoutine(ctx, "api.WorkflowRunCraft", func(ctx context.Context) {
a.WorkflowRunCraft(ctx, 100*time.Millisecond)
}, a.PanicDump())

migrate.Add(ctx, sdk.Migration{Name: "RunsSecrets", Release: "0.47.0", Blocker: false, Automatic: true, ExecFunc: func(ctx context.Context) error {
return migrate.RunsSecrets(ctx, a.DBConnectionFactory.GetDBMap(gorpmapping.Mapper))
Expand Down
2 changes: 1 addition & 1 deletion engine/api/operation/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (

"github.com/go-gorp/gorp"

"github.com/ovh/cds/engine/cache"
"github.com/ovh/cds/engine/api/repositoriesmanager"
"github.com/ovh/cds/engine/api/services"
"github.com/ovh/cds/engine/cache"
"github.com/ovh/cds/engine/gorpmapper"
"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/exportentities"
Expand Down
5 changes: 4 additions & 1 deletion engine/api/project/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@ import (
"database/sql"
"reflect"
"runtime"
"strings"
"time"

"github.com/go-gorp/gorp"

"github.com/ovh/cds/engine/cache"
"github.com/ovh/cds/engine/api/database/gorpmapping"
"github.com/ovh/cds/engine/api/environment"
"github.com/ovh/cds/engine/api/group"
"github.com/ovh/cds/engine/api/keys"
"github.com/ovh/cds/engine/api/repositoriesmanager"
"github.com/ovh/cds/engine/cache"
"github.com/ovh/cds/engine/gorpmapper"
"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/log"
Expand Down Expand Up @@ -310,6 +311,8 @@ func unwrap(ctx context.Context, db gorp.SqlExecutor, p *dbProject, opts []LoadO
continue
}
name := runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name()
nameSplitted := strings.Split(name, "/")
name = nameSplitted[len(nameSplitted)-1]
_, end = telemetry.Span(ctx, name)
if err := f(db, &proj); err != nil && sdk.Cause(err) != sql.ErrNoRows {
end()
Expand Down
2 changes: 1 addition & 1 deletion engine/api/repositories_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ vcs_ssh_key: proj-blabla
t.Log("Inserting workflow run=====")

// creates a run
wr, errWR := workflow.CreateRun(db.DbMap, w1, nil, u)
wr, errWR := workflow.CreateRun(db.DbMap, w1, sdk.WorkflowRunPostHandlerOption{AuthConsumer: consumer})
assert.NoError(t, errWR)
wr.Workflow = *w1
t.Log("Starting workflow run=====")
Expand Down
2 changes: 1 addition & 1 deletion engine/api/router_middleware_auth_permission_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
func Test_checkWorkflowPermissions(t *testing.T) {
api, db, _ := newTestAPI(t)

wctx := testRunWorkflow(t, api, db, api.Router)
wctx := testRunWorkflow(t, api, api.Router)
user := wctx.user
admin, _ := assets.InsertAdminUser(t, db)
maintainer, _ := assets.InsertAdminUser(t, db)
Expand Down
44 changes: 37 additions & 7 deletions engine/api/workflow/dao_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ workflow_run.status,
workflow_run.last_sub_num,
workflow_run.last_execution,
workflow_run.to_delete,
workflow_run.read_only
workflow_run.read_only,
workflow_run.to_craft,
workflow_run.to_craft_opts
`

// LoadRunOptions are options for loading a run (node or workflow)
Expand Down Expand Up @@ -589,8 +591,34 @@ func InsertRunNum(db gorp.SqlExecutor, w *sdk.Workflow, num int64) error {
return nil
}

func LoadCratingWorkflowRunIDs(db gorp.SqlExecutor) ([]int64, error) {
query := `
SELECT id
FROM workflow_run
WHERE to_craft = true
LIMIT 10
`
var ids []int64
_, err := db.Select(&ids, query)
if err != nil {
return nil, sdk.WrapError(err, "unable to load crafting workflow runs")
}
return ids, nil
}

func UpdateCraftedWorkflowRun(db gorp.SqlExecutor, id int64) error {
query := `UPDATE workflow_run
SET to_craft = false
WHERE id = $1
`
if _, err := db.Exec(query, id); err != nil {
return sdk.WrapError(err, "unable to update crafting workflow run %d", id)
}
return nil
}

// CreateRun creates a new workflow run and insert it
func CreateRun(db *gorp.DbMap, wf *sdk.Workflow, opts *sdk.WorkflowRunPostHandlerOption, ident sdk.Identifiable) (*sdk.WorkflowRun, error) {
func CreateRun(db *gorp.DbMap, wf *sdk.Workflow, opts sdk.WorkflowRunPostHandlerOption) (*sdk.WorkflowRun, error) {
number, err := NextRunNumber(db, wf.ID)
if err != nil {
return nil, sdk.WrapError(err, "unable to get next run number")
Expand All @@ -605,25 +633,27 @@ func CreateRun(db *gorp.DbMap, wf *sdk.Workflow, opts *sdk.WorkflowRunPostHandle
Status: sdk.StatusPending,
LastExecution: time.Now(),
Tags: make([]sdk.WorkflowRunTag, 0),
Workflow: sdk.Workflow{Name: wf.Name},
Workflow: *wf, //sdk.Workflow{Name: wf.Name},
ToCraft: true,
ToCraftOpts: &opts,
}

if opts != nil && opts.Hook != nil {
if opts.Hook != nil {
if trigg, ok := opts.Hook.Payload["cds.triggered_by.username"]; ok {
wr.Tag(tagTriggeredBy, trigg)
} else {
wr.Tag(tagTriggeredBy, "cds.hook")
}
} else {
wr.Tag(tagTriggeredBy, ident.GetUsername())
wr.Tag(tagTriggeredBy, opts.AuthConsumer.GetUsername())
}

tags := wf.Metadata["default_tags"]
var payload map[string]string
if opts != nil && opts.Hook != nil {
if opts.Hook != nil {
payload = opts.Hook.Payload
}
if opts != nil && opts.Manual != nil {
if opts.Manual != nil {
e := dump.NewDefaultEncoder()
e.Formatters = []dump.KeyFormatterFunc{dump.WithDefaultLowerCaseFormatter()}
e.ExtraFields.DetailedMap = false
Expand Down
14 changes: 7 additions & 7 deletions engine/api/workflow/dao_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ vcs_ssh_key: proj-blabla
test.NoError(t, err)

for i := 0; i < 5; i++ {
wr, errWR := workflow.CreateRun(db.DbMap, w1, nil, u)
wr, errWR := workflow.CreateRun(db.DbMap, w1, sdk.WorkflowRunPostHandlerOption{AuthConsumer: consumer})
assert.NoError(t, errWR)
wr.Workflow = *w1
_, errWr := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wr, &sdk.WorkflowRunPostHandlerOption{
Expand Down Expand Up @@ -317,7 +317,7 @@ func TestPurgeWorkflowRunWithRunningStatus(t *testing.T) {
test.NoError(t, err)

for i := 0; i < 5; i++ {
wfr, errWR := workflow.CreateRun(db.DbMap, w1, nil, u)
wfr, errWR := workflow.CreateRun(db.DbMap, w1, sdk.WorkflowRunPostHandlerOption{AuthConsumer: consumer})
assert.NoError(t, errWR)
wfr.Workflow = *w1
_, errWr := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wfr, &sdk.WorkflowRunPostHandlerOption{
Expand Down Expand Up @@ -504,7 +504,7 @@ vcs_ssh_key: proj-blabla
})
test.NoError(t, err)

wr, errWR := workflow.CreateRun(db.DbMap, w1, nil, u)
wr, errWR := workflow.CreateRun(db.DbMap, w1, sdk.WorkflowRunPostHandlerOption{AuthConsumer: consumer})
assert.NoError(t, errWR)
wr.Workflow = *w1
_, errWr := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wr, &sdk.WorkflowRunPostHandlerOption{
Expand All @@ -519,7 +519,7 @@ vcs_ssh_key: proj-blabla
test.NoError(t, errWr)

for i := 0; i < 5; i++ {
wfr, errWR := workflow.CreateRun(db.DbMap, w1, nil, u)
wfr, errWR := workflow.CreateRun(db.DbMap, w1, sdk.WorkflowRunPostHandlerOption{AuthConsumer: consumer})
assert.NoError(t, errWR)
wfr.Workflow = *w1
_, errWr := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wfr, &sdk.WorkflowRunPostHandlerOption{
Expand Down Expand Up @@ -696,7 +696,7 @@ vcs_ssh_key: proj-blabla
test.NoError(t, err)

for i := 0; i < 5; i++ {
wfr, errWR := workflow.CreateRun(db.DbMap, w1, nil, u)
wfr, errWR := workflow.CreateRun(db.DbMap, w1, sdk.WorkflowRunPostHandlerOption{AuthConsumer: consumer})
assert.NoError(t, errWR)
wfr.Workflow = *w1
_, errWr := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wfr, &sdk.WorkflowRunPostHandlerOption{
Expand Down Expand Up @@ -788,7 +788,7 @@ func TestPurgeWorkflowRunWithoutTags(t *testing.T) {

branches := []string{"master", "master", "master", "develop", "develop", "testBr", "testBr", "testBr", "testBr", "test4"}
for i := 0; i < 10; i++ {
wr, errWR := workflow.CreateRun(db.DbMap, w1, nil, u)
wr, errWR := workflow.CreateRun(db.DbMap, w1, sdk.WorkflowRunPostHandlerOption{AuthConsumer: consumer})
assert.NoError(t, errWR)
wr.Workflow = *w1
_, errWr := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wr, &sdk.WorkflowRunPostHandlerOption{
Expand Down Expand Up @@ -874,7 +874,7 @@ func TestPurgeWorkflowRunWithoutTagsBiggerHistoryLength(t *testing.T) {

branches := []string{"master", "master", "master", "develop", "develop", "testBr", "testBr", "testBr", "testBr", "test4"}
for i := 0; i < 10; i++ {
wr, errWR := workflow.CreateRun(db.DbMap, w1, nil, u)
wr, errWR := workflow.CreateRun(db.DbMap, w1, sdk.WorkflowRunPostHandlerOption{AuthConsumer: consumer})
assert.NoError(t, errWR)
wr.Workflow = *w1
_, errWr := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wr, &sdk.WorkflowRunPostHandlerOption{
Expand Down
2 changes: 1 addition & 1 deletion engine/api/workflow/dao_staticfiles_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestInsertStaticFiles(t *testing.T) {
})
test.NoError(t, err)

wfr, errWR := workflow.CreateRun(db.DbMap, w1, nil, u)
wfr, errWR := workflow.CreateRun(db.DbMap, w1, sdk.WorkflowRunPostHandlerOption{AuthConsumer: consumer})
assert.NoError(t, errWR)
wfr.Workflow = *w1
_, errWr := workflow.StartWorkflowRun(context.TODO(), db, cache, *proj, wfr, &sdk.WorkflowRunPostHandlerOption{
Expand Down
1 change: 0 additions & 1 deletion engine/api/workflow/factory_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,6 @@ func (dao WorkflowDAO) withNotifications(db gorp.SqlExecutor, ws *[]Workflow) er
for x := range *ws {
w := &(*ws)[x]
w.Notifications = notificationsMap[w.ID]
log.Debug("workflow %d notifications: %+v", w.ID, w.Notifications)
}
return nil
}
Expand Down
Loading

0 comments on commit e50bf74

Please sign in to comment.