Skip to content

Commit

Permalink
feat(worker): exit after checking or running a job if ttl is null (#3204
Browse files Browse the repository at this point in the history
)
  • Loading branch information
fsamin authored and yesnault committed Aug 16, 2018
1 parent b8f0dc1 commit 007a627
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 16 deletions.
2 changes: 1 addition & 1 deletion engine/hatchery/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (h *HatcherySwarm) Init() error {
}
h.dockerClients["default"] = &dockerClient{
Client: *d,
MaxContainers: h.Configuration().Provision.MaxWorker,
MaxContainers: h.Config.MaxContainers,
name: "default",
}
log.Info("hatchery> swarm> connected to default docker engine")
Expand Down
53 changes: 39 additions & 14 deletions engine/worker/cmd_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,16 @@ func runCmd(w *currentWorker) func(cmd *cobra.Command, args []string) {
log.Info("auto-update: %t", w.autoUpdate)
log.Info("single-use: %t", w.singleUse)

w.initServer(ctx)
httpServerCtx, stopHTTPServer := context.WithCancel(context.Background())
w.initServer(httpServerCtx)

// Gracefully shutdown connections
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGKILL)
defer func() {
log.Info("Run signal.Stop. My hostname is %s", hostname)
log.Info("Run signal.Stop. Hostname: %s", hostname)
signal.Stop(c)
stopHTTPServer()
cancel()
}()

Expand All @@ -74,9 +76,9 @@ func runCmd(w *currentWorker) func(cmd *cobra.Command, args []string) {
}
}()

time.AfterFunc(time.Duration(FlagInt(cmd, flagTTL))*time.Minute, func() {
ttl := FlagInt(cmd, flagTTL)
time.AfterFunc(time.Duration(ttl)*time.Minute, func() {
if w.nbActionsDone == 0 {
log.Debug("Suicide")
cancel()
}
})
Expand Down Expand Up @@ -133,18 +135,19 @@ func runCmd(w *currentWorker) func(cmd *cobra.Command, args []string) {

go func(ctx context.Context, exceptID *int64) {
if err := w.client.QueuePolling(ctx, wjobs, pbjobs, errs, 2*time.Second, 0, exceptID); err != nil {
log.Error("Queues polling stopped: %v", err)
log.Info("Queues polling stopped: %v", err)
}
}(ctx, &exceptJobID)

//Definition of the function which must be called to stop the worker
var endFunc = func() {
log.Info("Enter endFunc")
log.Info("Stopping the worker")
w.drainLogsAndCloseLogger(ctx)
registerTick.Stop()
updateTick.Stop()
w.unregister()
cancel()
stopHTTPServer()

if FlagBool(cmd, flagForceExit) {
log.Info("Exiting worker with force_exit true")
Expand Down Expand Up @@ -207,17 +210,27 @@ func runCmd(w *currentWorker) func(cmd *cobra.Command, args []string) {
if canWorkOnAnotherJob {
continue
}
} else {
} else if ttl > 0 {
if err := w.client.WorkerSetStatus(sdk.StatusWaiting); err != nil {
log.Error("WorkerSetStatus> error on WorkerSetStatus(sdk.StatusWaiting): %s", err)
}
log.Debug("Unable to run this pipeline build job, requirements not OK, let's continue %d%s", j.ID, t)
log.Debug("Unable to run pipeline build job %d, requirements not OK, let's continue %s", j.ID, t)
continue
}

var continueTakeJob bool
if !w.singleUse {
//Continue
log.Debug("PipelineBuildJob is done. single_use to false, keep worker alive")
continueTakeJob = true
}
// If the bookedJob has been proceed and the TTL is null the worker has to stop
if j.ID != w.bookedPBJobID && ttl == 0 {
log.Debug("PipelineBuildJob is done. ttl not null, keep worker alive")
continueTakeJob = true
}

if continueTakeJob {
//Continue
if err := w.client.WorkerSetStatus(sdk.StatusWaiting); err != nil {
log.Error("WorkerSetStatus> error on WorkerSetStatus(sdk.StatusWaiting): %s", err)
}
Expand Down Expand Up @@ -254,17 +267,29 @@ func runCmd(w *currentWorker) func(cmd *cobra.Command, args []string) {
continue
}
}
} else {
} else if ttl > 0 {
// If requirements are KO and the ttl > 0, keep alive
if err := w.client.WorkerSetStatus(sdk.StatusWaiting); err != nil {
log.Error("WorkerSetStatus> error on WorkerSetStatus(sdk.StatusWaiting): %s", err)
}
log.Debug("Unable to run this workflow job, requirements not ok, let's continue %d%s", j.ID, t)
log.Debug("Unable to run this job %d%s, requirements not ok. let's continue", j.ID, t)
continue
}

if !w.singleUse {
var continueTakeJob = true

// Is the worker is "single use": unregister and exit the worker
if w.singleUse {
continueTakeJob = false
}

// If the TTL is null: unregister and exit the worker
if ttl == 0 {
continueTakeJob = false
}

if continueTakeJob {
//Continue
log.Debug("Job is done. single_use to false, keep worker alive")
if err := w.client.WorkerSetStatus(sdk.StatusWaiting); err != nil {
log.Error("WorkerSetStatus> error on WorkerSetStatus(sdk.StatusWaiting): %s", err)
}
Expand Down Expand Up @@ -393,7 +418,7 @@ func (w *currentWorker) doRegister() error {
log.Info("Cannot register: %s", err)
return err
}
log.Debug("I am registered, with groupID:%d and model:%v", w.groupID, w.model)
log.Debug("Registered (groupID:%d, model:%v)", w.groupID, w.model.Name)
}
return nil
}
2 changes: 1 addition & 1 deletion engine/worker/requirement.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func checkRequirements(w *currentWorker, a *sdk.Action, execGroups []sdk.Group,
}
}

log.Debug("checkRequirements> checkRequirements:%t errRequirements:%s", requirementsOK, errRequirements)
log.Debug("checkRequirements> checkRequirements:%t errRequirements:%v", requirementsOK, errRequirements)
return requirementsOK, errRequirements
}

Expand Down

0 comments on commit 007a627

Please sign in to comment.