Skip to content

Commit

Permalink
fix(worker): heartbeat (#3224)
Browse files Browse the repository at this point in the history
and add lastbeat in cdsctl worker list

Signed-off-by: Yvonnick Esnault <[email protected]>
  • Loading branch information
yesnault authored Aug 20, 2018
1 parent 64cc5f2 commit c7f0753
Show file tree
Hide file tree
Showing 11 changed files with 2,683 additions and 866 deletions.
2 changes: 1 addition & 1 deletion engine/api/artifact.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,6 @@ func generateHash() (string, error) {
str := hex.EncodeToString(bs)
token := []byte(str)[0:size]

log.Debug("generateHash> new generated id: %s", token)
log.Debug("api> generateHash> new generated id: %s", token)
return string(token), nil
}
2 changes: 1 addition & 1 deletion engine/api/hatchery/hatchery.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,6 @@ func generateID() (string, error) {
str := hex.EncodeToString(bs)
token := []byte(str)[0:size]

log.Debug("generateID> new generated id: %s", token)
log.Debug("hatchery> generateID> new generated id: %s", token)
return string(token), nil
}
4 changes: 2 additions & 2 deletions engine/api/hook/hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,13 +227,13 @@ func generateHash() (string, error) {
bs := make([]byte, size)
_, err := rand.Read(bs)
if err != nil {
log.Error("generateID: rand.Read failed: %s\n", err)
log.Error("hook> generateID: rand.Read failed: %s\n", err)
return "", err
}
str := hex.EncodeToString(bs)
token := []byte(str)[0:size]

log.Debug("generateID: new generated id: %s\n", token)
log.Debug("hook> generateID: new generated id: %s\n", token)
return string(token), nil
}

Expand Down
4 changes: 2 additions & 2 deletions engine/api/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,13 +230,13 @@ func generateID() (string, error) {
size := 64
bs := make([]byte, size)
if _, err := rand.Read(bs); err != nil {
log.Error("generateID: rand.Read failed: %s", err)
log.Error("api> worker> generateID: rand.Read failed: %s", err)
return "", err
}
str := hex.EncodeToString(bs)
token := []byte(str)[0:size]

log.Debug("generateID: new generated id: %s", token)
log.Debug("api> worker> generateID: new generated id: %s", token)
return string(token), nil
}

Expand Down
4 changes: 2 additions & 2 deletions engine/vcs/github/oauth_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ func generateHash() (string, error) {
size := 128
bs := make([]byte, size)
if _, err := rand.Read(bs); err != nil {
log.Error("generateID: rand.Read failed: %s\n", err)
log.Error("vcs> github> generateID: rand.Read failed: %s\n", err)
return "", err
}
str := hex.EncodeToString(bs)
token := []byte(str)[0:size]

log.Debug("generateID: new generated id: %s\n", token)
log.Debug("vcs> github> generateID: new generated id: %s\n", token)
return string(token), nil
}

Expand Down
4 changes: 2 additions & 2 deletions engine/vcs/gitlab/oauth_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ func generateHash() (string, error) {
size := 128
bs := make([]byte, size)
if _, err := rand.Read(bs); err != nil {
log.Error("generateID: rand.Read failed: %s\n", err)
log.Error("vcs> gitlab> generateID: rand.Read failed: %s\n", err)
return "", err
}
str := hex.EncodeToString(bs)
token := []byte(str)[0:size]

log.Debug("generateID: new generated id: %s\n", token)
log.Debug("vcs> gitlab> generateID: new generated id: %s\n", token)
return string(token), nil
}

Expand Down
36 changes: 24 additions & 12 deletions engine/worker/cmd_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func runCmd(w *currentWorker) func(cmd *cobra.Command, args []string) {

//Register every 10 seconds
registerTick := time.NewTicker(10 * time.Second)
refreshTick := time.NewTicker(30 * time.Second)

updateTick := time.NewTicker(5 * time.Minute)

Expand Down Expand Up @@ -144,6 +145,7 @@ func runCmd(w *currentWorker) func(cmd *cobra.Command, args []string) {
log.Info("Stopping the worker")
w.drainLogsAndCloseLogger(ctx)
registerTick.Stop()
refreshTick.Stop()
updateTick.Stop()
w.unregister()
cancel()
Expand Down Expand Up @@ -187,9 +189,18 @@ func runCmd(w *currentWorker) func(cmd *cobra.Command, args []string) {
select {
case <-ctx.Done():
return
case <-refreshTick.C:
if err := w.client.WorkerRefresh(); err != nil {
log.Error("Heartbeat failed: %v", err)
nbErrors++
if nbErrors == 5 {
errs <- err
}
}
nbErrors = 0
case <-registerTick.C:
if err := w.doRegister(); err != nil {
log.Error("Heartbeat failed: %v", err)
log.Error("Register failed: %v", err)
nbErrors++
if nbErrors == 5 {
errs <- err
Expand Down Expand Up @@ -426,17 +437,18 @@ func (w *currentWorker) doRegister() error {
info = fmt.Sprintf(", I was born to work on workflow node job %d", w.bookedWJobID)
}
log.Info("Registering on CDS engine%s Version:%s", info, sdk.VERSION)
}
form := sdk.WorkerRegistrationForm{
Name: w.status.Name,
Token: w.token,
Hatchery: w.hatchery.id,
HatcheryName: w.hatchery.name,
ModelID: w.model.ID,
}
if err := w.register(form); err != nil {
log.Error("Cannot register: %s", err)
return err

form := sdk.WorkerRegistrationForm{
Name: w.status.Name,
Token: w.token,
Hatchery: w.hatchery.id,
HatcheryName: w.hatchery.name,
ModelID: w.model.ID,
}
if err := w.register(form); err != nil {
log.Error("Cannot register: %s", err)
return err
}
}
return nil
}
8 changes: 8 additions & 0 deletions sdk/cdsclient/client_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ func (c *client) WorkerDisable(id string) error {
return nil
}

func (c *client) WorkerRefresh() error {
url := fmt.Sprintf("/worker/refresh")
if _, err := c.PostJSON(url, nil, nil); err != nil {
return err
}
return nil
}

func (c *client) WorkerRegister(r sdk.WorkerRegistrationForm) (*sdk.Worker, bool, error) {
var w sdk.Worker
code, err := c.PostJSON("/worker", r, &w)
Expand Down
1 change: 1 addition & 0 deletions sdk/cdsclient/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ type UserClient interface {
type WorkerClient interface {
WorkerModelBook(id int64) error
WorkerList() ([]sdk.Worker, error)
WorkerRefresh() error
WorkerDisable(id string) error
WorkerModelAdd(name, modelType, patternName string, dockerModel *sdk.ModelDocker, vmModel *sdk.ModelVirtualMachine, groupID int64) (sdk.Model, error)
WorkerModelUpdate(ID int64, name string, modelType string, dockerModel *sdk.ModelDocker, vmModel *sdk.ModelVirtualMachine, groupID int64) (sdk.Model, error)
Expand Down
2 changes: 1 addition & 1 deletion sdk/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
type Worker struct {
ID string `json:"id" cli:"-"`
Name string `json:"name" cli:"name,key"`
LastBeat time.Time `json:"-" cli:"-"`
LastBeat time.Time `json:"lastbeat" cli:"lastbeat"`
GroupID int64 `json:"group_id" cli:"-"`
ModelID int64 `json:"model_id" cli:"-"`
ActionBuildID int64 `json:"action_build_id" cli:"-"`
Expand Down
Loading

0 comments on commit c7f0753

Please sign in to comment.