Skip to content

Commit

Permalink
fix(services): heartbeat when the service was fully started (#5673)
Browse files Browse the repository at this point in the history
  • Loading branch information
richardlt authored Feb 5, 2021
1 parent 0088472 commit bd4f0ce
Show file tree
Hide file tree
Showing 13 changed files with 262 additions and 225 deletions.
4 changes: 2 additions & 2 deletions engine/cdn/cdn.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ func (s *Service) CheckConfiguration(config interface{}) error {
return nil
}

func (s *Service) Start(ctx context.Context, cfg cdsclient.ServiceConfig) error {
if err := s.Common.Start(ctx, cfg); err != nil {
func (s *Service) Start(ctx context.Context) error {
if err := s.Common.Start(ctx); err != nil {
return err
}

Expand Down
48 changes: 21 additions & 27 deletions engine/cmd_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"context"
"encoding/json"
"fmt"
"os"
"os/signal"
Expand Down Expand Up @@ -306,7 +305,9 @@ See $ engine config command for more details.

wg.Add(1)
go func(srv serviceConf) {
start(ctx, srv.service, srv.cfg, srv.arg)
if err := start(ctx, srv.service, srv.arg, srv.cfg); err != nil {
log.Error(ctx, "%s> service has been stopped: %+v", srv.arg, err)
}
wg.Done()
}(s)

Expand Down Expand Up @@ -336,50 +337,43 @@ func unregisterServices(ctx context.Context, serviceConfs []serviceConf) {
}
}

func start(ctx context.Context, s service.Service, cfg interface{}, serviceName string) {
if err := serve(ctx, s, serviceName, cfg); err != nil {
fmt.Printf("%s> Service has been stopped: %+v\n", serviceName, err)
}
}

func serve(c context.Context, s service.Service, serviceName string, cfg interface{}) error {
func start(c context.Context, s service.Service, serviceName string, cfg interface{}) error {
ctx, cancel := context.WithCancel(c)
defer cancel()

ctx = context.WithValue(ctx, cdslog.Service, serviceName)
x, err := s.Init(cfg)
srvConfig, err := s.Init(cfg)
if err != nil {
return err
}

// first signin
if err := s.Start(ctx, x); err != nil {
log.Error(ctx, "%s> Unable to start service: %+v", serviceName, err)
return err
// Signin and register to CDS api
if err := s.Signin(c, srvConfig); err != nil {
return sdk.WrapError(err, "unable to signin: %s", serviceName)
}
log.Info(ctx, "%s> Service signed in", serviceName)

var srvConfig sdk.ServiceConfig
b, _ := json.Marshal(cfg)
json.Unmarshal(b, &srvConfig) // nolint

// then register
if err := s.Register(c, srvConfig); err != nil {
log.Error(ctx, "%s> Unable to register: %v", serviceName, err)
return err
// Signin and register to CDS api
if err := s.Register(c, cfg); err != nil {
return sdk.WrapError(err, "unable to register: %s", serviceName)
}
log.Info(ctx, "%s> Service registered", serviceName)

// finally start the heartbeat goroutine
if err := s.Start(ctx); err != nil {
return sdk.WrapError(err, "unable to start service: %s", serviceName)
}

go func() {
if err := s.Heartbeat(ctx, s.Status); err != nil {
log.Error(ctx, "%s> Error heartbeat: %+v", err)
if err := s.Serve(ctx); err != nil {
log.Error(ctx, "%s> Error serve: %+v", serviceName, err)
cancel()
}
}()

// finally start the heartbeat goroutine
go func() {
if err := s.Serve(ctx); err != nil {
log.Error(ctx, "%s> Error serve: %+v", serviceName, err)
if err := s.Heartbeat(ctx, s.Status); err != nil {
log.Error(ctx, "%s> Error heartbeat: %+v", serviceName, err)
cancel()
}
}()
Expand Down
21 changes: 9 additions & 12 deletions engine/hatchery/hatchery_local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package hatchery_test

import (
"context"
"encoding/json"
"os"
"os/exec"
"strings"
Expand Down Expand Up @@ -57,6 +56,7 @@ func TestHatcheryLocal(t *testing.T) {
require.NoError(t, h.ApplyConfiguration(cfg))

srvCfg, err := h.Init(cfg)
require.NoError(t, err)
require.NotNil(t, srvCfg)
t.Logf("service config: %+v", srvCfg)

Expand All @@ -67,24 +67,21 @@ func TestHatcheryLocal(t *testing.T) {
return nil
}

require.NoError(t, h.Start(context.TODO(), srvCfg))
require.NoError(t, h.Signin(context.TODO(), srvCfg))
require.NoError(t, h.Register(context.TODO(), cfg))
require.NoError(t, h.Start(context.TODO()))

var srvConfig sdk.ServiceConfig
b, _ := json.Marshal(cfg)
json.Unmarshal(b, &srvConfig) // nolint

require.NoError(t, h.Register(context.Background(), srvConfig))
// Wait 30 sec to let the queue polling exec run one time
serveCtx, cancel := context.WithTimeout(context.TODO(), 30*time.Second)
defer cancel()
err = h.Serve(serveCtx)
require.Contains(t, err.Error(), "Server closed")

heartbeatCtx, cancel := context.WithTimeout(context.TODO(), 1*time.Second)
defer cancel()
err = h.Heartbeat(heartbeatCtx, h.Status)
require.Contains(t, err.Error(), "context deadline exceeded")

serveCtx, cancel := context.WithTimeout(context.TODO(), 30*time.Second)
defer cancel()
err = h.Serve(serveCtx)
require.Contains(t, err.Error(), "context deadline exceeded")

// Mock assertions

t.Logf("Checking mock assertions")
Expand Down
5 changes: 5 additions & 0 deletions engine/hatchery/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,11 @@ func (h *HatcheryKubernetes) CheckConfiguration(cfg interface{}) error {
return nil
}

// Start inits client and routines for hatchery
func (h *HatcheryKubernetes) Start(ctx context.Context) error {
return hatchery.Create(ctx, h)
}

// Serve start the hatchery server
func (h *HatcheryKubernetes) Serve(ctx context.Context) error {
return h.CommonServe(ctx, h)
Expand Down
12 changes: 9 additions & 3 deletions engine/hatchery/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/ovh/cds/engine/service"
"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/cdsclient"
"github.com/ovh/cds/sdk/hatchery"
)

// New instanciates a new hatchery local
Expand Down Expand Up @@ -114,20 +115,25 @@ func (h *HatcheryLocal) CheckConfiguration(cfg interface{}) error {
return nil
}

// Start inits client and routines for hatchery
func (h *HatcheryLocal) Start(ctx context.Context) error {
return hatchery.Create(ctx, h)
}

// Serve start the hatchery server
func (h *HatcheryLocal) Serve(ctx context.Context) error {
h.BasedirDedicated = filepath.Dir(filepath.Join(h.Config.Basedir, h.Configuration().Name))
if ok, err := sdk.DirectoryExists(h.BasedirDedicated); !ok {
log.Debug(ctx, "creating directory %s", h.BasedirDedicated)
if err := os.MkdirAll(h.BasedirDedicated, 0700); err != nil {
return sdk.WrapError(err, "error while creating directory %s", h.BasedirDedicated)
return sdk.NewErrorFrom(err, "error while creating directory %s", h.BasedirDedicated)
}
} else if err != nil {
return fmt.Errorf("Invalid basedir: %v", err)
return sdk.NewErrorFrom(err, "invalid basedir")
}

if err := h.downloadWorker(); err != nil {
return fmt.Errorf("Cannot download worker binary from api: %v", err)
return sdk.NewErrorFrom(err, "cannot download worker binary from api")
}

return h.CommonServe(ctx, h)
Expand Down
5 changes: 5 additions & 0 deletions engine/hatchery/marathon/marathon.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ func (h *HatcheryMarathon) CheckConfiguration(cfg interface{}) error {
return nil
}

// Start inits client and routines for hatchery
func (h *HatcheryMarathon) Start(ctx context.Context) error {
return hatchery.Create(ctx, h)
}

// Serve start the hatchery server
func (h *HatcheryMarathon) Serve(ctx context.Context) error {
return h.CommonServe(ctx, h)
Expand Down
5 changes: 5 additions & 0 deletions engine/hatchery/openstack/openstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ func (h *HatcheryOpenstack) CheckConfiguration(cfg interface{}) error {
return nil
}

// Start inits client and routines for hatchery
func (h *HatcheryOpenstack) Start(ctx context.Context) error {
return hatchery.Create(ctx, h)
}

// Serve start the hatchery server
func (h *HatcheryOpenstack) Serve(ctx context.Context) error {
return h.CommonServe(ctx, h)
Expand Down
26 changes: 10 additions & 16 deletions engine/hatchery/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,26 +66,20 @@ func (c *Common) CommonServe(ctx context.Context, h hatchery.Interface) error {
MaxHeaderBytes: 1 << 20,
}

h.GetGoRoutines().Exec(ctx, "hatchery.httpserver", func(ctx context.Context) {
// Gracefully shutdown the http server
h.GetGoRoutines().Exec(ctx, "hatchery.httpserver-shutdown", func(ctx context.Context) {
<-ctx.Done()
log.Info(ctx, "%s> Shutdown HTTP Server", c.Name())
_ = server.Shutdown(ctx)
})

// Start the http server
log.Info(ctx, "%s> Starting HTTP Server on port %d", c.Name(), h.Configuration().HTTP.Port)
if err := server.ListenAndServe(); err != nil {
log.Error(ctx, "%s> Listen and serve failed: %v", c.Name(), err)
}
// Gracefully shutdown the http server
h.GetGoRoutines().Exec(ctx, "hatchery.httpserver-shutdown", func(ctx context.Context) {
<-ctx.Done()
log.Info(ctx, "%s> Shutdown HTTP Server", c.Name())
_ = server.Shutdown(ctx)
})

if err := hatchery.Create(ctx, h); err != nil {
return err
// Start the http server
log.Info(ctx, "%s> Starting HTTP Server on port %d", c.Name(), h.Configuration().HTTP.Port)
if err := server.ListenAndServe(); err != nil {
return sdk.WrapError(err, "listen and serve failed: %s", c.Name())
}

return ctx.Err()
return nil
}

func (c *Common) initRouter(ctx context.Context, h hatchery.Interface) {
Expand Down
5 changes: 5 additions & 0 deletions engine/hatchery/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,11 @@ func (h *HatcherySwarm) GetLogger() *logrus.Logger {
return h.ServiceLogger
}

// Start inits client and routines for hatchery
func (h *HatcherySwarm) Start(ctx context.Context) error {
return hatchery.Create(ctx, h)
}

// Serve start the hatchery server
func (h *HatcherySwarm) Serve(ctx context.Context) error {
return h.CommonServe(ctx, h)
Expand Down
5 changes: 5 additions & 0 deletions engine/hatchery/vsphere/vsphere.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ func (h *HatcheryVSphere) CanSpawn(ctx context.Context, model *sdk.Model, jobID
return true
}

// Start inits client and routines for hatchery
func (h *HatcheryVSphere) Start(ctx context.Context) error {
return hatchery.Create(ctx, h)
}

// Serve start the hatchery server
func (h *HatcheryVSphere) Serve(ctx context.Context) error {
return h.CommonServe(ctx, h)
Expand Down
47 changes: 32 additions & 15 deletions engine/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package service

import (
"context"
"encoding/json"
"fmt"
"os"
"time"
Expand Down Expand Up @@ -59,12 +60,28 @@ func (c *Common) Name() string {
return c.ServiceName
}

func (c *Common) Start(ctx context.Context, cfg cdsclient.ServiceConfig) error {
// no register for api
func (c *Common) Start(ctx context.Context) error {
if c.ServiceType == "api" {
return nil
}

ctx = telemetry.ContextWithTag(ctx,
telemetry.TagServiceType, c.Type(),
telemetry.TagServiceName, c.Name(),
)
c.RegisterCommonMetricsView(ctx)

return nil
}

// Signin a new service on API
func (c *Common) Signin(ctx context.Context, cfg cdsclient.ServiceConfig) error {
if c.ServiceType == "api" {
return nil
}

log.Info(ctx, "Init CDS client for service %s(%T) %s", c.Type(), c, c.Name())

ctxTimeout, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()
ticker := time.NewTicker(10 * time.Second)
Expand Down Expand Up @@ -98,32 +115,32 @@ func (c *Common) Start(ctx context.Context, cfg cdsclient.ServiceConfig) error {
if err != nil {
return sdk.WithStack(err)
}

ctx = telemetry.ContextWithTag(ctx,
telemetry.TagServiceType, c.Type(),
telemetry.TagServiceName, c.Name(),
)

c.RegisterCommonMetricsView(ctx)

return nil
}

// Register registers a new service on API
func (c *Common) Register(ctx context.Context, cfg sdk.ServiceConfig) error {
log.Info(ctx, "Registing service %s(%T) %s", c.Type(), c, c.Name())

// no register for api
func (c *Common) Register(ctx context.Context, cfg interface{}) error {
if c.ServiceType == "api" {
return nil
}

var sdkConfig sdk.ServiceConfig
b, err := json.Marshal(cfg)
if err != nil {
return sdk.WithStack(err)
}
if err := json.Unmarshal(b, &sdkConfig); err != nil {
return sdk.WithStack(err)
}

log.Info(ctx, "Registing service %s(%T) %s", c.Type(), c, c.Name())

var srv = sdk.Service{
CanonicalService: sdk.CanonicalService{
Name: c.ServiceName,
HTTPURL: c.HTTPURL,
Type: c.ServiceType,
Config: cfg,
Config: sdkConfig,
},
LastHeartbeat: time.Time{},
Version: sdk.VERSION,
Expand Down
6 changes: 4 additions & 2 deletions engine/service/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,15 @@ type Common struct {
}

// Service is the interface for a engine service
// Lifecycle: ApplyConfiguration->?BeforeStart->Init->Signin->Register->Start->Serve->Heartbeat
type Service interface {
ApplyConfiguration(cfg interface{}) error
Serve(ctx context.Context) error
CheckConfiguration(cfg interface{}) error
Start(ctx context.Context, cfg cdsclient.ServiceConfig) error
Start(ctx context.Context) error
Init(cfg interface{}) (cdsclient.ServiceConfig, error)
Register(ctx context.Context, cfg sdk.ServiceConfig) error
Signin(ctx context.Context, cfg cdsclient.ServiceConfig) error
Register(ctx context.Context, cfg interface{}) error
Unregister(ctx context.Context) error
Heartbeat(ctx context.Context, status func(ctx context.Context) *sdk.MonitoringStatus) error
Status(ctx context.Context) *sdk.MonitoringStatus
Expand Down
Loading

0 comments on commit bd4f0ce

Please sign in to comment.