Skip to content

Commit

Permalink
feat(ui): workflow run v2 switch attempt (#6908)
Browse files Browse the repository at this point in the history
Signed-off-by: richardlt <[email protected]>
  • Loading branch information
richardlt authored Mar 20, 2024
1 parent 68776e0 commit 06d4fe3
Show file tree
Hide file tree
Showing 17 changed files with 205 additions and 180 deletions.
1 change: 1 addition & 0 deletions engine/api/api_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ func (api *API) InitRouter() {
r.Handle("/v2/project/{projectKey}/run/{runIdentifier}/infos", nil, r.GETv2(api.getWorkflowRunInfoV2Handler))
r.Handle("/v2/project/{projectKey}/run/{runIdentifier}/stop", nil, r.POSTv2(api.postStopWorkflowRunHandler))
r.Handle("/v2/project/{projectKey}/run/{runIdentifier}/job", nil, r.GETv2(api.getWorkflowRunJobsV2Handler))
r.Handle("/v2/project/{projectKey}/run/{runIdentifier}/result", nil, r.GETv2(api.getWorkflowRunResultsV2Handler))
r.Handle("/v2/project/{projectKey}/run/{runIdentifier}/job/{jobIdentifier}", nil, r.GETv2(api.getWorkflowRunJobHandler))
r.Handle("/v2/project/{projectKey}/run/{runIdentifier}/job/{jobIdentifier}/infos", nil, r.GETv2(api.getWorkflowRunJobInfosHandler))
r.Handle("/v2/project/{projectKey}/run/{runIdentifier}/job/{jobIdentifier}/run", nil, r.PUTv2(api.putWorkflowRunJobV2Handler))
Expand Down
2 changes: 1 addition & 1 deletion engine/api/v2_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func (api *API) getJobRunResultsHandler() ([]service.RbacChecker, service.Handle
return err
}

runResults, err := workflow_v2.LoadRunResults(ctx, api.mustDB(), runJob.WorkflowRunID, runJob.RunAttempt)
runResults, err := workflow_v2.LoadRunResultsByRunID(ctx, api.mustDB(), runJob.WorkflowRunID, runJob.RunAttempt)
if err != nil {
return err
}
Expand Down
9 changes: 7 additions & 2 deletions engine/api/v2_queue_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (api *API) postV2WorkerTakeJobHandler() ([]service.RbacChecker, service.Han
return sdk.NewErrorFrom(sdk.ErrForbidden, "unable take the job %s, current status %s", jobRunID, jobRun.Status)
}

run, err := workflow_v2.LoadRunByID(ctx, api.mustDB(), jobRun.WorkflowRunID, workflow_v2.WithRunResults)
run, err := workflow_v2.LoadRunByID(ctx, api.mustDB(), jobRun.WorkflowRunID)
if err != nil {
return err
}
Expand Down Expand Up @@ -139,6 +139,7 @@ func buildSensitiveData(value string) []string {
datas = append(datas, sdk.OneLineValue(value))
return datas
}

func computeRunJobContext(ctx context.Context, db gorpmapper.SqlExecutorWithTx, proj *sdk.Project, vcs *sdk.VCSProject, vss []sdk.ProjectVariableSet, run sdk.V2WorkflowRun, jobRun sdk.V2WorkflowRunJob, wk sdk.V2Worker) (*sdk.WorkflowRunJobsContext, []string, error) {
contexts := &sdk.WorkflowRunJobsContext{}
contexts.CDS = run.Contexts.CDS
Expand Down Expand Up @@ -219,8 +220,12 @@ func computeRunJobContext(ctx context.Context, db gorpmapper.SqlExecutorWithTx,
contexts.Env[k] = v
}

runResults, err := workflow_v2.LoadRunResultsByRunID(ctx, db, run.ID, run.RunAttempt)
if err != nil {
return nil, nil, err
}
runResultMap := make(map[string][]sdk.V2WorkflowRunResultVariableDetail)
for _, rr := range run.Results {
for _, rr := range runResults {
if rr.Type != sdk.V2WorkflowRunResultTypeVariable {
continue
}
Expand Down
69 changes: 61 additions & 8 deletions engine/api/v2_workflow_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func (api *API) getWorkflowRunJobsV2Handler() ([]service.RbacChecker, service.Ha
vars := mux.Vars(req)
pKey := vars["projectKey"]
runIdentifier := vars["runIdentifier"]

if !sdk.IsValidUUID(runIdentifier) {
return sdk.WithStack(sdk.ErrInvalidRunIdentifier)
}
Expand Down Expand Up @@ -69,6 +70,52 @@ func (api *API) getWorkflowRunJobsV2Handler() ([]service.RbacChecker, service.Ha
if err != nil {
return err
}

return service.WriteJSON(w, runJobs, http.StatusOK)
}
}

func (api *API) getWorkflowRunResultsV2Handler() ([]service.RbacChecker, service.Handler) {
return service.RBAC(api.projectRead),
func(ctx context.Context, w http.ResponseWriter, req *http.Request) error {
vars := mux.Vars(req)
pKey := vars["projectKey"]
runIdentifier := vars["runIdentifier"]

if !sdk.IsValidUUID(runIdentifier) {
return sdk.WithStack(sdk.ErrInvalidRunIdentifier)
}

u := getUserConsumer(ctx)
if u == nil {
return sdk.WithStack(sdk.ErrForbidden)
}

proj, err := project.Load(ctx, api.mustDB(), pKey)
if err != nil {
return err
}

wr, err := workflow_v2.LoadRunByProjectKeyAndID(ctx, api.mustDB(), proj.Key, runIdentifier)
if err != nil {
return err
}

attemptS := FormString(req, "attempt")

attempt := wr.RunAttempt
if attemptS != "" {
attempt, err = strconv.ParseInt(attemptS, 10, 64)
if err != nil {
return err
}
}

runJobs, err := workflow_v2.LoadRunResultsByRunID(ctx, api.mustDB(), wr.ID, attempt)
if err != nil {
return err
}

return service.WriteJSON(w, runJobs, http.StatusOK)
}
}
Expand Down Expand Up @@ -403,10 +450,11 @@ func (api *API) getWorkflowRunV2Handler() ([]service.RbacChecker, service.Handle
return err
}

wr, err := workflow_v2.LoadRunByProjectKeyAndID(ctx, api.mustDB(), proj.Key, runIdentifier, workflow_v2.WithRunResults)
wr, err := workflow_v2.LoadRunByProjectKeyAndID(ctx, api.mustDB(), proj.Key, runIdentifier)
if err != nil {
return err
}

return service.WriteJSON(w, wr, http.StatusOK)
}
}
Expand Down Expand Up @@ -849,7 +897,7 @@ func (api *API) putWorkflowRunJobV2Handler() ([]service.RbacChecker, service.Han
return err
}

wr, err := workflow_v2.LoadRunByProjectKeyAndID(ctx, api.mustDB(), proj.Key, runIdentifier, workflow_v2.WithRunResults)
wr, err := workflow_v2.LoadRunByProjectKeyAndID(ctx, api.mustDB(), proj.Key, runIdentifier)
if err != nil {
return err
}
Expand All @@ -865,11 +913,6 @@ func (api *API) putWorkflowRunJobV2Handler() ([]service.RbacChecker, service.Han
return err
}

runJobs, err := workflow_v2.LoadRunJobsByRunID(ctx, api.mustDB(), wr.ID, wr.RunAttempt)
if err != nil {
return err
}

var jobToRun *sdk.V2WorkflowRunJob
if sdk.IsValidUUID(jobIdentifier) {
jobToRun, err = workflow_v2.LoadRunJobByRunIDAndID(ctx, api.mustDB(), wr.ID, jobIdentifier)
Expand Down Expand Up @@ -927,9 +970,19 @@ func (api *API) putWorkflowRunJobV2Handler() ([]service.RbacChecker, service.Han
}
}

runJobs, err := workflow_v2.LoadRunJobsByRunID(ctx, api.mustDB(), wr.ID, wr.RunAttempt)
if err != nil {
return err
}

runResults, err := workflow_v2.LoadRunResultsByRunID(ctx, api.mustDB(), wr.ID, wr.RunAttempt)
if err != nil {
return err
}

// Check gate condition
// retrieve previous jobs context
runJobsContexts := computeExistingRunJobContexts(*wr, runJobs)
runJobsContexts := computeExistingRunJobContexts(runJobs, runResults)
jobContext := buildContextForJob(ctx, wr.WorkflowData.Workflow.Jobs, runJobsContexts, wr.Contexts, jobToRun.JobID)
jobContext.Gate = inputs
bts, err := json.Marshal(jobContext)
Expand Down
13 changes: 9 additions & 4 deletions engine/api/v2_workflow_run_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (api *API) workflowRunV2Trigger(ctx context.Context, wrEnqueue sdk.V2Workfl
}()

// Load run by id
run, err := workflow_v2.LoadRunByID(ctx, api.mustDB(), wrEnqueue.RunID, workflow_v2.WithRunResults)
run, err := workflow_v2.LoadRunByID(ctx, api.mustDB(), wrEnqueue.RunID)
if err != nil {
if sdk.ErrorIs(err, sdk.ErrNotFound) {
return nil
Expand Down Expand Up @@ -312,7 +312,7 @@ func (api *API) synchronizeRunResults(ctx context.Context, db gorp.SqlExecutor,
}

// Synchronize workflow runs
runResults, err := workflow_v2.LoadRunResults(ctx, db, runID, run.RunAttempt)
runResults, err := workflow_v2.LoadRunResultsByRunID(ctx, db, runID, run.RunAttempt)
if err != nil {
return err
}
Expand Down Expand Up @@ -666,12 +666,17 @@ func retrieveJobToQueue(ctx context.Context, db *gorp.DbMap, run *sdk.V2Workflow
runInfos := make([]sdk.V2WorkflowRunInfo, 0)
jobToQueue := make(map[string]sdk.V2Job)

// Load run_jobs
runJobs, err := workflow_v2.LoadRunJobsByRunID(ctx, db, run.ID, run.RunAttempt)
if err != nil {
return nil, nil, nil, sdk.WrapError(err, "unable to load workflow run jobs for run %s", wrEnqueue.RunID)
}
runJobsContexts := computeExistingRunJobContexts(*run, runJobs)

runResults, err := workflow_v2.LoadRunResultsByRunID(ctx, db, run.ID, run.RunAttempt)
if err != nil {
return nil, nil, nil, sdk.WrapError(err, "unable to load workflow run results for run %s", wrEnqueue.RunID)
}

runJobsContexts := computeExistingRunJobContexts(runJobs, runResults)

// temp map of run jobs
allrunJobsMap := make(map[string]sdk.V2WorkflowRunJob)
Expand Down
5 changes: 3 additions & 2 deletions engine/api/v2_workflow_run_engine_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package api

import (
"context"

"github.com/ovh/cds/sdk"
)

func computeExistingRunJobContexts(run sdk.V2WorkflowRun, runJobs []sdk.V2WorkflowRunJob) sdk.JobsResultContext {
func computeExistingRunJobContexts(runJobs []sdk.V2WorkflowRunJob, runResults []sdk.V2WorkflowRunResult) sdk.JobsResultContext {
runResultMap := make(map[string][]sdk.V2WorkflowRunResultVariableDetail)
for _, rr := range run.Results {
for _, rr := range runResults {
if rr.Type != sdk.V2WorkflowRunResultTypeVariable {
continue
}
Expand Down
102 changes: 0 additions & 102 deletions engine/api/workflow_v2/dao_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package workflow_v2

import (
"context"
"fmt"
"time"

"github.com/go-gorp/gorp"
Expand All @@ -15,106 +14,6 @@ import (
"github.com/rockbears/log"
)

func WithRunResults(ctx context.Context, _ *gorpmapper.Mapper, db gorp.SqlExecutor, i interface{}) error {
switch target := i.(type) {
case *[]dbWorkflowRun:
var ids []string
for _, r := range *target {
ids = append(ids, r.ID)
}
allResults, err := loadRunResultsByRunIDs(ctx, db, ids...)
if err != nil {
return err
}
for i := range *target {
r := &(*target)[i]
if results, has := allResults[r.ID]; has {
var runResults []sdk.V2WorkflowRunResult
for _, runResult := range results {
if r.RunAttempt != runResult.RunAttempt {
continue
}
runResults = append(runResults, runResult.V2WorkflowRunResult)
}
r.Results = runResults
}
}
case []sdk.V2WorkflowRun:
var ids []string
for _, r := range target {
ids = append(ids, r.ID)
}
allResults, err := loadRunResultsByRunIDs(ctx, db, ids...)
if err != nil {
return err
}
for i := range target {
r := &target[i]
if results, has := allResults[r.ID]; has {
var runResults []sdk.V2WorkflowRunResult
for _, runResult := range results {
if r.RunAttempt != runResult.RunAttempt {
continue
}
runResults = append(runResults, runResult.V2WorkflowRunResult)
}
r.Results = runResults
}
}
case *sdk.V2WorkflowRun:
allResults, err := loadRunResultsByRunIDs(ctx, db, target.ID)
if err != nil {
return err
}
if results, has := allResults[target.ID]; has {
var runResults []sdk.V2WorkflowRunResult
for _, r := range results {
if target.RunAttempt != r.RunAttempt {
continue
}
runResults = append(runResults, r.V2WorkflowRunResult)
}
target.Results = runResults
}
case *dbWorkflowRun:
allResults, err := loadRunResultsByRunIDs(ctx, db, target.ID)
if err != nil {
return err
}
if results, has := allResults[target.ID]; has {
var runResult []sdk.V2WorkflowRunResult
for _, r := range results {
if target.RunAttempt != r.RunAttempt {
continue
}
runResult = append(runResult, r.V2WorkflowRunResult)
}
target.Results = runResult
}
default:
panic(fmt.Sprintf("WithRunResults: unsupported target %T", i))
}

return nil
}

func loadRunResultsByRunIDs(ctx context.Context, db gorp.SqlExecutor, runIDs ...string) (map[string][]dbV2WorkflowRunResult, error) {
query := gorpmapping.NewQuery(`
select * from v2_workflow_run_result where workflow_run_id = ANY($1::uuid[]) order by workflow_run_id, issued_at ASC
`).Args(pq.StringArray(runIDs))
var result []dbV2WorkflowRunResult
if err := gorpmapping.GetAll(ctx, db, query, &result); err != nil {
return nil, sdk.WrapError(err, "unable to load run results for %v", runIDs)
}
var mapRes = make(map[string][]dbV2WorkflowRunResult)
for _, r := range result {
res := mapRes[r.WorkflowRunID]
res = append(res, r)
mapRes[r.WorkflowRunID] = res
}
return mapRes, nil
}

func getRuns(ctx context.Context, db gorp.SqlExecutor, query gorpmapping.Query, opts ...gorpmapper.GetOptionFunc) ([]sdk.V2WorkflowRun, error) {
var dbWkfRuns []dbWorkflowRun
if err := gorpmapping.GetAll(ctx, db, query, &dbWkfRuns, opts...); err != nil {
Expand Down Expand Up @@ -162,7 +61,6 @@ func WorkflowRunNextNumber(db gorp.SqlExecutor, repoID, workflowName string) (in
return 0, sdk.WrapError(err, "nextRunNumber")
}
return i, nil

}

func InsertRun(ctx context.Context, db gorpmapper.SqlExecutorWithTx, wr *sdk.V2WorkflowRun) error {
Expand Down
6 changes: 5 additions & 1 deletion engine/api/workflow_v2/dao_run_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,11 @@ func UpdateJobRun(ctx context.Context, db gorpmapper.SqlExecutorWithTx, wrj *sdk
func LoadRunJobsByRunID(ctx context.Context, db gorp.SqlExecutor, runID string, runAttempt int64) ([]sdk.V2WorkflowRunJob, error) {
ctx, next := telemetry.Span(ctx, "LoadRunJobsByRunID")
defer next()
query := gorpmapping.NewQuery("SELECT * from v2_workflow_run_job WHERE workflow_run_id = $1 AND run_attempt = $2").Args(runID, runAttempt)
query := gorpmapping.NewQuery(`
SELECT *
FROM v2_workflow_run_job
WHERE workflow_run_id = $1 AND run_attempt = $2
`).Args(runID, runAttempt)
return getAllRunJobs(ctx, db, query)
}

Expand Down
Loading

0 comments on commit 06d4fe3

Please sign in to comment.