Skip to content

Commit

Permalink
feat(api): add hatchery and worker details in job run (#5101)
Browse files Browse the repository at this point in the history
Signed-off-by: francois  samin <[email protected]>
  • Loading branch information
fsamin authored Apr 3, 2020
1 parent 5386f91 commit 11d98eb
Show file tree
Hide file tree
Showing 9 changed files with 88 additions and 14 deletions.
4 changes: 1 addition & 3 deletions engine/api/services/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@ import (
"context"
"time"

"github.com/ovh/cds/engine/api/worker"

"github.com/go-gorp/gorp"

"github.com/ovh/cds/engine/api/database/gorpmapping"
"github.com/ovh/cds/engine/api/worker"
"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/log"
)
Expand Down
6 changes: 3 additions & 3 deletions engine/api/workflow/execute_node_job_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,6 @@ func UpdateNodeJobRunStatus(ctx context.Context, db gorp.SqlExecutor, store cach

report := new(ProcessorReport)

log.Debug("UpdateNodeJobRunStatus> job.ID=%d status=%s", job.ID, status)

_, next := observability.Span(ctx, "workflow.LoadRunByID")
nodeRun, errLoad := LoadNodeRunByID(db, job.WorkflowNodeRunID, LoadRunOptions{})
next()
Expand Down Expand Up @@ -238,7 +236,7 @@ func PrepareSpawnInfos(infos []sdk.SpawnInfo) []sdk.SpawnInfo {

// TakeNodeJobRun Take an a job run for update
func TakeNodeJobRun(ctx context.Context, db gorp.SqlExecutor, store cache.Store, proj sdk.Project, jobID int64,
workerModel, workerName, workerID string, infos []sdk.SpawnInfo) (*sdk.WorkflowNodeJobRun, *ProcessorReport, error) {
workerModel, workerName, workerID string, infos []sdk.SpawnInfo, hatcheryName string) (*sdk.WorkflowNodeJobRun, *ProcessorReport, error) {
var end func()
ctx, end = observability.Span(ctx, "workflow.TakeNodeJobRun")
defer end()
Expand Down Expand Up @@ -267,6 +265,8 @@ func TakeNodeJobRun(ctx context.Context, db gorp.SqlExecutor, store cache.Store,
return nil, report, err
}

job.HatcheryName = hatcheryName
job.WorkerName = workerName
job.Model = workerModel
job.Job.WorkerName = workerName
job.Job.WorkerID = workerID
Expand Down
7 changes: 5 additions & 2 deletions engine/api/workflow/execute_node_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ func syncTakeJobInNodeRun(ctx context.Context, db gorp.SqlExecutor, n *sdk.Workf
_, end := observability.Span(ctx, "workflow.syncTakeJobInNodeRun")
defer end()

log.Debug("workflow.syncTakeJobInNodeRun> job parameters= %+v", j.Parameters)

report := new(ProcessorReport)

//If status is not waiting neither build: nothing to do
Expand All @@ -70,6 +68,8 @@ func syncTakeJobInNodeRun(ctx context.Context, db gorp.SqlExecutor, n *sdk.Workf
rj.Model = j.Model
rj.ModelType = j.ModelType
rj.ContainsService = j.ContainsService
rj.WorkerName = j.WorkerName
rj.HatcheryName = j.HatcheryName
rj.Job = j.Job
rj.Header = j.Header
rj.Parameters = j.Parameters
Expand Down Expand Up @@ -635,6 +635,9 @@ func syncStage(ctx context.Context, db gorp.SqlExecutor, store cache.Store, stag
runJob.ModelType = runJobDB.ModelType
runJob.ContainsService = runJobDB.ContainsService
runJob.Job = runJobDB.Job
runJob.Model = runJobDB.Model
runJob.WorkerName = runJobDB.WorkerName
runJob.HatcheryName = runJobDB.HatcheryName
}
}
}
Expand Down
9 changes: 9 additions & 0 deletions engine/api/workflow/gorp_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/ovh/cds/engine/api/database/gorpmapping"
"github.com/ovh/cds/sdk/log"

"github.com/ovh/cds/sdk"
)
Expand Down Expand Up @@ -79,6 +80,8 @@ type JobRun struct {
ContainsService bool `db:"contains_service"`
ModelType sql.NullString `db:"model_type"`
Header sql.NullString `db:"header"`
HatcheryName string `db:"hatchery_name"`
WorkerName string `db:"worker_name"`
}

// ToJobRun transform the JobRun with data of the provided sdk.WorkflowNodeJobRun
Expand All @@ -103,6 +106,9 @@ func (j *JobRun) ToJobRun(jr *sdk.WorkflowNodeJobRun) (err error) {
j.ModelType = sql.NullString{Valid: true, String: string(jr.ModelType)}
j.ContainsService = jr.ContainsService
j.ExecGroups, err = gorpmapping.JSONToNullString(jr.ExecGroups)
j.WorkerName = jr.WorkerName
j.HatcheryName = jr.HatcheryName
log.Debug("ToJobRun> %+v", j)
if err != nil {
return sdk.WrapError(err, "column exec_groups")
}
Expand Down Expand Up @@ -131,6 +137,9 @@ func (j JobRun) WorkflowNodeRunJob() (sdk.WorkflowNodeJobRun, error) {
Done: j.Done,
BookedBy: j.BookedBy,
ContainsService: j.ContainsService,
HatcheryName: j.HatcheryName,
WorkerName: j.WorkerName,
Model: j.Model,
}
if err := gorpmapping.JSONNullString(j.Job, &jr.Job); err != nil {
return jr, sdk.WrapError(err, "column job")
Expand Down
35 changes: 33 additions & 2 deletions engine/api/workflow/run_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,15 +605,16 @@ queueRun:
}

//TakeNodeJobRun
takenJob, _, _ := workflow.TakeNodeJobRun(context.TODO(), db, cache, *proj, j.ID, "model", "worker", "1", []sdk.SpawnInfo{
takenJobID := j.ID
takenJob, _, _ := workflow.TakeNodeJobRun(context.TODO(), db, cache, *proj, takenJobID, "model", "worker", "1", []sdk.SpawnInfo{
{
APITime: time.Now(),
RemoteTime: time.Now(),
Message: sdk.SpawnMsg{
ID: sdk.MsgSpawnInfoJobTaken.ID,
},
},
})
}, "hatchery_name")

//Load workflow node run
nodeRun, err := workflow.LoadNodeRunByID(db, takenJob.WorkflowNodeRunID, workflow.LoadRunOptions{})
Expand Down Expand Up @@ -647,6 +648,12 @@ queueRun:
t.FailNow()
}

j, err = workflow.LoadNodeJobRun(context.TODO(), db, cache, j.ID)
require.NoError(t, err)
assert.Equal(t, "hatchery_name", j.HatcheryName)
assert.NotEmpty(t, j.WorkerName)
assert.NotEmpty(t, j.Model)

//TestUpdateNodeJobRunStatus
_, err = workflow.UpdateNodeJobRunStatus(context.TODO(), db, cache, *proj, j, sdk.StatusSuccess)
assert.NoError(t, err)
Expand All @@ -655,6 +662,30 @@ queueRun:
t.FailNow()
}

workflowRun, err = workflow.LoadRunByID(db, wr.ID, workflow.LoadRunOptions{})
require.NoError(t, err)
var jobRunFound bool
checkJobRun:
for _, noderuns := range workflowRun.WorkflowNodeRuns {
for _, noderun := range noderuns {
for _, stage := range noderun.Stages {
for _, jobrun := range stage.RunJobs {
t.Logf("checking job %d with %d", jobrun.ID, takenJobID)
if jobrun.ID == j.ID {
assert.Equal(t, "hatchery_name", jobrun.HatcheryName)
assert.NotEmpty(t, jobrun.WorkerName)
assert.NotEmpty(t, jobrun.Model)
jobRunFound = true
break checkJobRun
}
}
}
}
}
if !jobRunFound {
t.Fatalf("unable to retrieve job run in the workflow run")
}

logs, err := workflow.LoadLogs(db, takenJob.ID)
assert.NoError(t, err)
if t.Failed() {
Expand Down
22 changes: 19 additions & 3 deletions engine/api/workflow_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/ovh/venom"
"github.com/sguiheux/go-coverage"

"github.com/ovh/cds/engine/api/authentication"
"github.com/ovh/cds/engine/api/cache"
"github.com/ovh/cds/engine/api/event"
"github.com/ovh/cds/engine/api/group"
Expand Down Expand Up @@ -39,6 +40,21 @@ func (api *API) postTakeWorkflowJobHandler() service.Handler {
return sdk.WithStack(sdk.ErrForbidden)
}

consumer := getAPIConsumer(ctx)
// Locking for the parent consumer
var hatcheryName string
if consumer.ParentID != nil {
parentConsumer, err := authentication.LoadConsumerByID(ctx, api.mustDB(), *consumer.ParentID)
if err != nil {
return err
}
s, err := services.LoadByConsumerID(ctx, api.mustDB(), parentConsumer.ID)
if err != nil {
return err
}
hatcheryName = s.Name
}

wk, err := worker.LoadByID(ctx, api.mustDB(), getAPIConsumer(ctx).Worker.ID)
if err != nil {
return err
Expand Down Expand Up @@ -77,7 +93,7 @@ func (api *API) postTakeWorkflowJobHandler() service.Handler {
}

pbji := &sdk.WorkflowNodeJobRunData{}
report, err := takeJob(ctx, api.mustDB, api.Cache, p, id, workerModelName, pbji, wk)
report, err := takeJob(ctx, api.mustDB, api.Cache, p, id, workerModelName, pbji, wk, hatcheryName)
if err != nil {
return sdk.WrapError(err, "cannot takeJob nodeJobRunID:%d", id)
}
Expand All @@ -89,7 +105,7 @@ func (api *API) postTakeWorkflowJobHandler() service.Handler {
}
}

func takeJob(ctx context.Context, dbFunc func() *gorp.DbMap, store cache.Store, p *sdk.Project, id int64, workerModel string, wnjri *sdk.WorkflowNodeJobRunData, wk *sdk.Worker) (*workflow.ProcessorReport, error) {
func takeJob(ctx context.Context, dbFunc func() *gorp.DbMap, store cache.Store, p *sdk.Project, id int64, workerModel string, wnjri *sdk.WorkflowNodeJobRunData, wk *sdk.Worker, hatcheryName string) (*workflow.ProcessorReport, error) {
// Start a tx
tx, errBegin := dbFunc().Begin()
if errBegin != nil {
Expand All @@ -110,7 +126,7 @@ func takeJob(ctx context.Context, dbFunc func() *gorp.DbMap, store cache.Store,
}

// Take node job run
job, report, err := workflow.TakeNodeJobRun(ctx, tx, store, *p, id, workerModel, wk.Name, wk.ID, infos)
job, report, err := workflow.TakeNodeJobRun(ctx, tx, store, *p, id, workerModel, wk.Name, wk.ID, infos, hatcheryName)
if err != nil {
return nil, sdk.WrapError(err, "cannot take job %d", id)
}
Expand Down
7 changes: 6 additions & 1 deletion engine/api/workflow_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type testRunWorkflowCtx struct {
workerToken string
hatchery *sdk.Service
hatcheryToken string
model *sdk.Model
}

func testRunWorkflow(t *testing.T, api *API, router *Router) testRunWorkflowCtx {
Expand Down Expand Up @@ -329,6 +330,7 @@ func testRegisterWorker(t *testing.T, api *API, router *Router, ctx *testRunWork
w, workerJWT := RegisterWorker(t, api, g.ID, model.Name)
ctx.workerToken = workerJWT
ctx.worker = w
ctx.model = model
}

func testRegisterHatchery(t *testing.T, api *API, router *Router, ctx *testRunWorkflowCtx) {
Expand Down Expand Up @@ -451,7 +453,10 @@ func Test_postTakeWorkflowJobHandler(t *testing.T) {

run, err := workflow.LoadNodeJobRun(context.TODO(), api.mustDB(), api.Cache, ctx.job.ID)
require.NoError(t, err)
require.Equal(t, "Building", run.Status)
assert.Equal(t, "Building", run.Status)
assert.Equal(t, ctx.model.Name, run.Model)
assert.Equal(t, ctx.worker.Name, run.WorkerName)
assert.NotEmpty(t, run.HatcheryName)
}

func Test_postBookWorkflowJobHandler(t *testing.T) {
Expand Down
7 changes: 7 additions & 0 deletions engine/sql/194_workflow_node_job_run_infos.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- +migrate Up
ALTER TABLE workflow_node_run_job ADD COLUMN hatchery_name TEXT;
ALTER TABLE workflow_node_run_job ADD COLUMN worker_name TEXT;

-- +migrate Down
ALTER TABLE workflow_node_run_job DROP COLUMN hatchery_name;
ALTER TABLE workflow_node_run_job DROP COLUMN worker_name;
5 changes: 5 additions & 0 deletions sdk/workflow_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,8 @@ type WorkflowNodeJobRun struct {
IntegrationPluginBinaries []GRPCPluginBinary `json:"integration_plugin_binaries,omitempty"`
Header WorkflowRunHeaders `json:"header,omitempty"`
ContainsService bool `json:"contains_service,omitempty"`
HatcheryName string `json:"hatchery_name,omitempty"`
WorkerName string `json:"worker_name,omitempty"`
}

// WorkflowNodeJobRunSummary is a light representation of WorkflowNodeJobRun for CDS event
Expand All @@ -384,6 +386,9 @@ type WorkflowNodeJobRunSummary struct {
Done int64 `json:"done,omitempty"`
Job ExecutedJobSummary `json:"job_summary,omitempty"`
SpawnInfos []SpawnInfo `json:"spawninfos"`
HatcheryName string `json:"hatchery_name,omitempty"`
WorkerName string `json:"worker_name,omitempty"`
WorkerModelName string `json:"worker_model_name,omitempty"`
}

// ToSummary transforms a WorkflowNodeJobRun into a WorkflowNodeJobRunSummary
Expand Down

0 comments on commit 11d98eb

Please sign in to comment.