Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(hatchery): add routine status #3381

Merged
merged 7 commits into from
Oct 3, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion engine/api/api_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func (api *API) InitRouter() {
r.Handle("/queue/workflows/{id}/attempt", r.POST(api.postIncWorkflowJobAttemptHandler, NeedHatchery(), EnableTracing()))
r.Handle("/queue/workflows/{id}/infos", r.GET(api.getWorkflowJobHandler, NeedWorker(), EnableTracing()))
r.Handle("/queue/workflows/{permID}/vulnerability", r.POSTEXECUTE(api.postVulnerabilityReportHandler, NeedWorker(), EnableTracing()))
r.Handle("/queue/workflows/{id}/spawn/infos", r.POST(r.Asynchronous(api.postSpawnInfosWorkflowJobHandler, 1), NeedHatchery()))
r.Handle("/queue/workflows/{id}/spawn/infos", r.POST(r.Asynchronous(api.postSpawnInfosWorkflowJobHandler, 1), NeedHatchery(), EnableTracing()))
r.Handle("/queue/workflows/{permID}/result", r.POSTEXECUTE(api.postWorkflowJobResultHandler, NeedWorker(), EnableTracing()))
r.Handle("/queue/workflows/{permID}/log", r.POSTEXECUTE(r.Asynchronous(api.postWorkflowJobLogsHandler, 1), NeedWorker()))
r.Handle("/queue/workflows/log/service", r.POSTEXECUTE(r.Asynchronous(api.postWorkflowJobServiceLogsHandler, 1), NeedHatchery()))
Expand Down
2 changes: 1 addition & 1 deletion engine/api/workflow/dao_node_job_run_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func insertNodeRunJobInfo(db gorp.SqlExecutor, info *sdk.WorkflowNodeJobRunInfo)
if n, err := db.Exec(query, info.WorkflowNodeRunID, info.WorkflowNodeJobRunID, spawnJSON, time.Now()); err != nil {
return sdk.WrapError(err, "insertNodeRunJobInfo> err while inserting spawninfos into workflow_node_run_job_info")
} else if n, _ := n.RowsAffected(); n == 0 {
return fmt.Errorf("insertNodeRunJobInfo> Unable to inerto into workflow_node_run_job_info id = %d", info.WorkflowNodeJobRunID)
return fmt.Errorf("insertNodeRunJobInfo> Unable to insert into workflow_node_run_job_info id = %d", info.WorkflowNodeJobRunID)
}

log.Debug("insertNodeRunJobInfo> on node run: %d (%d)", info.ID, info.WorkflowNodeJobRunID)
Expand Down
7 changes: 7 additions & 0 deletions engine/api/workflow_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,21 +348,28 @@ func (api *API) postVulnerabilityReportHandler() service.Handler {

func (api *API) postSpawnInfosWorkflowJobHandler() service.AsynchronousHandler {
return func(ctx context.Context, r *http.Request) error {
_, next := observability.Span(ctx, "receiveSpawnInfosWorkflowJob")
id, errc := requestVarInt(r, "id")
if errc != nil {
next()
return sdk.WrapError(errc, "postSpawnInfosWorkflowJobHandler> invalid id")
yesnault marked this conversation as resolved.
Show resolved Hide resolved
}
var s []sdk.SpawnInfo
if err := UnmarshalBody(r, &s); err != nil {
next()
return sdk.WrapError(err, "postSpawnInfosWorkflowJobHandler> cannot unmarshal request")
yesnault marked this conversation as resolved.
Show resolved Hide resolved
}
observability.Current(ctx, observability.Tag(observability.TagWorkflowNodeJobRun, id))
next()

tx, errBegin := api.mustDB().Begin()
if errBegin != nil {
return sdk.WrapError(errBegin, "postSpawnInfosWorkflowJobHandler> Cannot start transaction")
}
defer tx.Rollback()

_, next = observability.Span(ctx, "workflow.AddSpawnInfosNodeJobRun")
defer next()
if err := workflow.AddSpawnInfosNodeJobRun(tx, id, s); err != nil {
return sdk.WrapError(err, "postSpawnInfosWorkflowJobHandler> Cannot save spawn info on node job run %d for %s name %s", id, getAgent(r), r.Header.Get(cdsclient.RequestedNameHeader))
yesnault marked this conversation as resolved.
Show resolved Hide resolved
}
Expand Down
12 changes: 11 additions & 1 deletion engine/hatchery/marathon/marathon.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/gorilla/mux"

"github.com/ovh/cds/engine/api"
"github.com/ovh/cds/engine/api/observability"
"github.com/ovh/cds/engine/api/services"
"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/cdsclient"
Expand Down Expand Up @@ -217,6 +218,9 @@ func (h *HatcheryMarathon) CanSpawn(model *sdk.Model, jobID int64, requirements
// SpawnWorker creates an application on mesos via marathon
// requirements services are not supported
func (h *HatcheryMarathon) SpawnWorker(ctx context.Context, spawnArgs hatchery.SpawnArguments) (string, error) {
ctx, end := observability.Span(ctx, "hatcheryMarathon.SpawnWorker")
defer end()

if spawnArgs.JobID > 0 {
yesnault marked this conversation as resolved.
Show resolved Hide resolved
log.Debug("spawnWorker> spawning worker %s (%s) for job %d - %s", spawnArgs.Model.Name, spawnArgs.Model.ModelDocker.Image, spawnArgs.JobID, spawnArgs.LogInfo)
} else {
Expand Down Expand Up @@ -351,9 +355,12 @@ func (h *HatcheryMarathon) SpawnWorker(ctx context.Context, spawnArgs hatchery.S
Labels: &h.marathonLabels,
}

_, next := observability.Span(ctx, "marathonClient.CreateApplication")
if _, err := h.marathonClient.CreateApplication(application); err != nil {
next()
return "", err
yesnault marked this conversation as resolved.
Show resolved Hide resolved
}
next()

ticker := time.NewTicker(time.Second * 5)
// ticker.Stop -> do not close goroutine..., so
Expand All @@ -377,8 +384,9 @@ func (h *HatcheryMarathon) SpawnWorker(ctx context.Context, spawnArgs hatchery.S
}()

log.Debug("spawnMarathonDockerWorker> %s worker %s spawning in progress, please wait...", logJob, application.ID)

_, next = observability.Span(ctx, "marathonClient.ApplicationDeployments")
deployments, err := h.marathonClient.ApplicationDeployments(application.ID)
next()
if err != nil {
yesnault marked this conversation as resolved.
Show resolved Hide resolved
ticker.Stop()
return "", fmt.Errorf("spawnMarathonDockerWorker> %s failed to list deployments: %s", logJob, err.Error())
Expand All @@ -389,6 +397,7 @@ func (h *HatcheryMarathon) SpawnWorker(ctx context.Context, spawnArgs hatchery.S
return "", nil
}

_, next = observability.Span(ctx, "waitDeployment")
wg := &sync.WaitGroup{}
var done bool
var successChan = make(chan bool, len(deployments))
Expand Down Expand Up @@ -422,6 +431,7 @@ func (h *HatcheryMarathon) SpawnWorker(ctx context.Context, spawnArgs hatchery.S
}

wg.Wait()
next()

var success = true
for b := range successChan {
Expand Down
2 changes: 1 addition & 1 deletion sdk/hatchery/hatchery.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func Create(h Interface) error {
)

// run the starters pool
workersStartChan, workerStartResultChan := startWorkerStarters(h)
workersStartChan, workerStartResultChan := startWorkerStarters(ctx, h)

hostname, errh := os.Hostname()
if errh != nil {
Expand Down
26 changes: 13 additions & 13 deletions sdk/hatchery/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,32 +50,30 @@ func PanicDump(h Interface) func(s string) (io.WriteCloser, error) {

// Start all goroutines which manage the hatchery worker spawning routine.
// the purpose is to avoid go routines leak when there is a bunch of worker to start
func startWorkerStarters(h Interface) (chan<- workerStarterRequest, chan workerStarterResult) {
func startWorkerStarters(ctx context.Context, h Interface) (chan<- workerStarterRequest, chan workerStarterResult) {
jobs := make(chan workerStarterRequest, 1)
results := make(chan workerStarterResult, 1)

maxProv := h.Configuration().Provision.MaxConcurrentProvisioning
if maxProv < 1 {
maxProv = defaultMaxProvisioning
}
for i := 0; i < maxProv; i++ {
for workerNum := 0; workerNum < maxProv; workerNum++ {
sdk.GoRoutine("workerStarter",
func() {
workerStarter(h, jobs, results)
workerStarter(ctx, h, fmt.Sprintf("%d", workerNum), jobs, results)
},
PanicDump(h),
)
}

return jobs, results
}

func workerStarter(h Interface, jobs <-chan workerStarterRequest, results chan<- workerStarterResult) {
func workerStarter(ctx context.Context, h Interface, workerNum string, jobs <-chan workerStarterRequest, results chan<- workerStarterResult) {
for j := range jobs {
// Start a worker for a job
if m := j.registerWorkerModel; m == nil {
_, end := observability.Span(j.ctx, "hatchery.workerStarter")
//Try to start the worker
ctx2, end := observability.Span(j.ctx, "hatchery.workerStarter")
isRun, err := spawnWorkerForJob(h, j)
//Check the result
res := workerStarterResult{
Expand All @@ -84,16 +82,18 @@ func workerStarter(h Interface, jobs <-chan workerStarterRequest, results chan<-
isRun: isRun,
temptToSpawn: true,
}

_, cend := observability.Span(ctx2, "sendResult")
//Send the result back
results <- res
end()
cend()

if err != nil {
j.cancel(err.Error())
} else {
j.cancel("")
}

end()
} else { // Start a worker for registering
log.Debug("Spawning worker for register model %s", m.Name)
if atomic.LoadInt64(&nbWorkerToStart) > int64(h.Configuration().Provision.MaxConcurrentProvisioning) {
Expand Down Expand Up @@ -162,8 +162,9 @@ func spawnWorkerForJob(h Interface, j workerStarterRequest) (bool, error) {
log.Info("hatchery> spawnWorkerForJob> SpawnWorker> starting model %s for job %d", j.model.Name, j.id)
_, next = observability.Span(ctx, "hatchery.SpawnWorker")
workerName, errSpawn := h.SpawnWorker(j.ctx, SpawnArguments{Model: j.model, IsWorkflowJob: j.isWorkflowJob, JobID: j.id, Requirements: j.requirements, LogInfo: "spawn for job"})
next()
if errSpawn != nil {
next()
_, next = observability.Span(ctx, "hatchery.QueueJobSendSpawnInfo", observability.Tag("status", "errSpawn"))
log.Warning("spawnWorkerForJob> %d - cannot spawn worker %s for job %d: %s", j.timestamp, j.model.Name, j.id, errSpawn)
infos = append(infos, sdk.SpawnInfo{
RemoteTime: time.Now(),
Expand All @@ -173,10 +174,9 @@ func spawnWorkerForJob(h Interface, j workerStarterRequest) (bool, error) {
log.Warning("spawnWorkerForJob> %d - cannot client.QueueJobSendSpawnInfo for job (err spawn)%d: %s", j.timestamp, j.id, err)
}
log.Error("hatchery %s cannot spawn worker %s for job %d: %v", h.Service().Name, j.model.Name, j.id, errSpawn)

next()
return false, nil
}
next()

infos = append(infos, sdk.SpawnInfo{
RemoteTime: time.Now(),
Expand All @@ -189,7 +189,7 @@ func spawnWorkerForJob(h Interface, j workerStarterRequest) (bool, error) {
},
})

_, next = observability.Span(ctx, "hatchery.QueueJobSendSpawnInfo")
_, next = observability.Span(ctx, "hatchery.QueueJobSendSpawnInfo", observability.Tag("status", "spawnOK"))
if err := h.CDSClient().QueueJobSendSpawnInfo(j.isWorkflowJob, j.id, infos); err != nil {
next()
log.Warning("spawnWorkerForJob> %d - cannot client.QueueJobSendSpawnInfo for job %d: %s", j.timestamp, j.id, err)
Expand Down