Skip to content

Commit

Permalink
refactor(hatchery): add CDN configuration to Worker spawn args (#6254)
Browse files Browse the repository at this point in the history
  • Loading branch information
richardlt authored Aug 23, 2022
1 parent 88cc9f1 commit a35012a
Show file tree
Hide file tree
Showing 18 changed files with 113 additions and 147 deletions.
4 changes: 1 addition & 3 deletions engine/api/workflow_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,11 @@ func (api *API) postTakeWorkflowJobHandler() service.Handler {
return sdk.WrapError(err, "cannot takeJob nodeJobRunID:%d", id)
}

// Get CDN TCP Addr
// Get CDN TCP Addr
// FIXME remove CDN info from payload, this information should be injected by the hatchery
pbji.GelfServiceAddr, pbji.GelfServiceAddrEnableTLS, err = services.GetCDNPublicTCPAdress(ctx, api.mustDB())
if err != nil {
return err
}

pbji.CDNHttpAddr, err = services.GetCDNPublicHTTPAdress(ctx, api.mustDB())
if err != nil {
return err
Expand Down
15 changes: 2 additions & 13 deletions engine/hatchery/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
jwt "github.com/golang-jwt/jwt"
"github.com/gorilla/mux"
"github.com/rockbears/log"
"github.com/sirupsen/logrus"

apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand All @@ -38,8 +37,8 @@ var _ hatchery.InterfaceWithModels = new(HatcheryKubernetes)

// InitHatchery register local hatchery with its worker model
func (h *HatcheryKubernetes) InitHatchery(ctx context.Context) error {
if err := h.Common.RefreshServiceLogger(ctx); err != nil {
log.Error(ctx, "hatchery> kubernetes> cannot get cdn configuration : %v", err)
if err := h.Common.Init(ctx, h); err != nil {
return err
}
h.GoRoutines.Run(context.Background(), "hatchery kubernetes routines", func(ctx context.Context) {
h.routines(ctx)
Expand Down Expand Up @@ -411,10 +410,6 @@ func (h *HatcheryKubernetes) SpawnWorker(ctx context.Context, spawnArgs hatchery
return sdk.WithStack(err)
}

func (h *HatcheryKubernetes) GetLogger() *logrus.Logger {
return h.ServiceLogger
}

// WorkersStarted returns the number of instances started but
// not necessarily register on CDS yet
func (h *HatcheryKubernetes) WorkersStarted(ctx context.Context) ([]string, error) {
Expand Down Expand Up @@ -446,12 +441,6 @@ func (h *HatcheryKubernetes) routines(ctx context.Context) {
for {
select {
case <-ticker.C:
h.GoRoutines.Exec(ctx, "getCDNConfiguration", func(ctx context.Context) {
if err := h.Common.RefreshServiceLogger(ctx); err != nil {
log.ErrorWithStackTrace(ctx, sdk.WrapError(err, "cannot get cdn configuration"))
}
})

h.GoRoutines.Exec(ctx, "getServicesLogs", func(ctx context.Context) {
if err := h.getServicesLogs(ctx); err != nil {
log.ErrorWithStackTrace(ctx, sdk.WrapError(err, "cannot get service logs"))
Expand Down
3 changes: 2 additions & 1 deletion engine/hatchery/kubernetes/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ func Test_serviceLogs(t *testing.T) {
gock.New("http://lolcat.api").Get("/worker").Reply(http.StatusOK).JSON([]sdk.Worker{{Name: "pod-name"}})

gock.New("http://lolcat.api").Get("/config/cdn").Reply(http.StatusOK).JSON(sdk.CDNConfig{TCPURL: "tcphost:8090"})
require.NoError(t, h.RefreshServiceLogger(context.TODO()))

require.NoError(t, h.Common.Init(context.TODO(), h))

podsList := v1.PodList{
Items: []v1.Pod{
Expand Down
15 changes: 2 additions & 13 deletions engine/hatchery/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
jwt "github.com/golang-jwt/jwt"
"github.com/gorilla/mux"
"github.com/rockbears/log"
"github.com/sirupsen/logrus"

"github.com/ovh/cds/engine/api"
"github.com/ovh/cds/engine/service"
Expand Down Expand Up @@ -258,20 +257,15 @@ func (h *HatcheryLocal) WorkersStarted(ctx context.Context) ([]string, error) {
// InitHatchery register local hatchery with its worker model
func (h *HatcheryLocal) InitHatchery(ctx context.Context) error {
h.workers = make(map[string]workerCmd)
if err := h.RefreshServiceLogger(ctx); err != nil {
log.Error(ctx, "Hatchery> local> Cannot get cdn configuration : %v", err)
if err := h.Common.Init(ctx, h); err != nil {
return nil
}
h.GoRoutines.Run(ctx, "hatchery locale routines", func(ctx context.Context) {
h.routines(ctx)
})
return nil
}

// GetLogger retuns the hatchery local logger
func (h *HatcheryLocal) GetLogger() *logrus.Logger {
return h.ServiceLogger
}

func (h *HatcheryLocal) routines(ctx context.Context) {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
Expand All @@ -284,11 +278,6 @@ func (h *HatcheryLocal) routines(ctx context.Context) {
log.Warn(ctx, "Cannot kill awol workers: %s", err)
}
})
h.GoRoutines.Exec(ctx, "local-refreshCDNConfiguration", func(ctx context.Context) {
if err := h.RefreshServiceLogger(ctx); err != nil {
log.Error(ctx, "Hatchery> local> Cannot get cdn configuration : %v", err)
}
})
case <-ctx.Done():
if ctx.Err() != nil {
log.Error(ctx, "Hatchery> local> Exiting routines")
Expand Down
15 changes: 2 additions & 13 deletions engine/hatchery/marathon/marathon.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
jwt "github.com/golang-jwt/jwt"
"github.com/gorilla/mux"
"github.com/rockbears/log"
"github.com/sirupsen/logrus"

"github.com/ovh/cds/engine/api"
"github.com/ovh/cds/engine/service"
Expand All @@ -35,11 +34,6 @@ func New() *HatcheryMarathon {

var _ hatchery.InterfaceWithModels = new(HatcheryMarathon)

// GetLogger return the logger
func (h *HatcheryMarathon) GetLogger() *logrus.Logger {
return h.ServiceLogger
}

// Init cdsclient config.
func (h *HatcheryMarathon) Init(config interface{}) (cdsclient.ServiceConfig, error) {
var cfg cdsclient.ServiceConfig
Expand Down Expand Up @@ -441,8 +435,8 @@ func (h *HatcheryMarathon) WorkersStarted(ctx context.Context) ([]string, error)

// InitHatchery only starts killing routine of worker not registered
func (h *HatcheryMarathon) InitHatchery(ctx context.Context) error {
if err := h.RefreshServiceLogger(ctx); err != nil {
log.Error(ctx, "Hatchery> marathon> Cannot get cdn configuration : %v", err)
if err := h.Common.Init(ctx, h); err != nil {
return err
}
h.GoRoutines.Run(ctx, "marathon-routines", func(ctx context.Context) {
h.routines(ctx)
Expand All @@ -467,11 +461,6 @@ func (h *HatcheryMarathon) routines(ctx context.Context) {
log.Warn(context.Background(), "Cannot kill awol workers: %s", err)
}
})
h.GoRoutines.Exec(ctx, "marathon-refreshCDNConfiguration", func(ctx context.Context) {
if err := h.RefreshServiceLogger(ctx); err != nil {
log.Error(ctx, "Hatchery> marathon> Cannot get cdn configuration : %v", err)
}
})
case <-ctx.Done():
if ctx.Err() != nil {
log.Error(ctx, "Hatchery> marathon> Exiting routines")
Expand Down
7 changes: 4 additions & 3 deletions engine/hatchery/openstack/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ import (
// then list available models
// then list available images
func (h *HatcheryOpenstack) InitHatchery(ctx context.Context) error {
if err := h.Common.Init(ctx, h); err != nil {
return err
}

workersAlive = map[string]int64{}

authOpts := gophercloud.AuthOptions{
Expand Down Expand Up @@ -51,9 +55,6 @@ func (h *HatcheryOpenstack) InitHatchery(ctx context.Context) error {
log.Warn(ctx, "Error on initIPStatus(): %v", err)
}

if err := h.RefreshServiceLogger(ctx); err != nil {
log.Error(ctx, "Hatchery> openstack> Cannot get cdn configuration : %v", err)
}
h.GoRoutines.Run(ctx, "hatchery openstack routines", func(ctx context.Context) {
h.main(ctx)
})
Expand Down
10 changes: 0 additions & 10 deletions engine/hatchery/openstack/openstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/gophercloud/gophercloud/openstack/compute/v2/servers"
"github.com/gorilla/mux"
"github.com/rockbears/log"
"github.com/sirupsen/logrus"

"github.com/ovh/cds/engine/api"
"github.com/ovh/cds/engine/service"
Expand Down Expand Up @@ -214,13 +213,8 @@ func (h *HatcheryOpenstack) CanSpawn(ctx context.Context, model *sdk.Model, jobI
return true
}

func (h *HatcheryOpenstack) GetLogger() *logrus.Logger {
return h.ServiceLogger
}

func (h *HatcheryOpenstack) main(ctx context.Context) {
serverListTick := time.NewTicker(10 * time.Second).C
cdnConfTick := time.NewTicker(10 * time.Second).C
killAwolServersTick := time.NewTicker(30 * time.Second).C
killErrorServersTick := time.NewTicker(60 * time.Second).C
killDisabledWorkersTick := time.NewTicker(60 * time.Second).C
Expand All @@ -235,10 +229,6 @@ func (h *HatcheryOpenstack) main(ctx context.Context) {
h.killErrorServers(ctx)
case <-killDisabledWorkersTick:
h.killDisabledWorkers()
case <-cdnConfTick:
if err := h.RefreshServiceLogger(ctx); err != nil {
log.Error(ctx, "Hatchery> openstack> Cannot get cdn configuration : %v", err)
}
case <-ctx.Done():
if ctx.Err() != nil {
log.Error(ctx, "Hatchery> openstack> Exiting routines")
Expand Down
84 changes: 57 additions & 27 deletions engine/hatchery/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"time"

"github.com/rockbears/log"
"gopkg.in/square/go-jose.v2"

"github.com/ovh/cds/engine/api"
"github.com/ovh/cds/engine/service"
Expand Down Expand Up @@ -116,36 +115,53 @@ func (c *Common) GetPrivateKey() *rsa.PrivateKey {
return c.Common.PrivateKey
}

func (c *Common) RefreshServiceLogger(ctx context.Context) error {
cdnConfig, err := c.Client.ConfigCDN()
if err != nil {
if sdk.ErrorIs(err, sdk.ErrNotFound) {
c.CDNLogsURL = ""
c.ServiceLogger = nil
func (c *Common) Init(ctx context.Context, h hatchery.Interface) error {
c.CDNConfig.HTTPURL = h.Configuration().CDN.URL
c.CDNConfig.TCPURL = h.Configuration().CDN.TCP.URL
c.CDNConfig.TCPURLEnableTLS = h.Configuration().CDN.TCP.EnableTLS

// Init CDN config from Hatchery config or public CDN information
if c.CDNConfig.HTTPURL == "" || c.CDNConfig.TCPURL == "" {
// Load CDN information from CDS API
var cfg sdk.CDNConfig
var err error
for {
cfg, err = c.Client.ConfigCDN()
if err == nil {
break
}
err = sdk.NewErrorFrom(err, "cannot get CDN config from CDS API, retrying...")
log.ErrorWithStackTrace(ctx, err)
time.Sleep(2 * time.Second)
}
if c.CDNConfig.HTTPURL == "" {
c.CDNConfig.HTTPURL = cfg.HTTPURL
}
if c.CDNConfig.TCPURL == "" {
c.CDNConfig.TCPURL = cfg.TCPURL
c.CDNConfig.TCPURLEnableTLS = cfg.TCPURLEnableTLS
}
return err
}
if cdnConfig.TCPURL == c.CDNLogsURL {
return nil
}
c.CDNLogsURL = cdnConfig.TCPURL

return c.initServiceLogger(ctx)
}

func (c *Common) initServiceLogger(ctx context.Context) error {
if c.Signer == nil {
var signer jose.Signer
signer, err = jws.NewSigner(c.Common.PrivateKey)
signer, err := jws.NewSigner(c.Common.PrivateKey)
if err != nil {
return sdk.WithStack(err)
}
c.Signer = signer
}

var graylogCfg = &hook.Config{
Addr: c.CDNLogsURL,
Addr: c.CDNConfig.TCPURL,
Protocol: "tcp",
}

if cdnConfig.TCPURLEnableTLS {
tcpCDNUrl := c.CDNLogsURL
if c.CDNConfig.TCPURLEnableTLS {
tcpCDNUrl := c.CDNConfig.TCPURL
// Check if the url has a scheme
// We have to remove if to retrieve the hostname
if i := strings.Index(tcpCDNUrl, "://"); i > -1 {
Expand Down Expand Up @@ -260,6 +276,17 @@ func (c *Common) GenerateWorkerConfig(ctx context.Context, h hatchery.Interface,
httpInsecure = h.Configuration().API.HTTP.Insecure
}

cdnURL := h.Configuration().Provision.WorkerCDN.URL
if cdnURL == "" {
cdnURL = c.CDNConfig.HTTPURL
}
cdnTCP := h.Configuration().Provision.WorkerCDN.TCP.URL
cdnTCPEnableTLS := h.Configuration().Provision.WorkerCDN.TCP.EnableTLS
if cdnTCP == "" {
cdnTCP = c.CDNConfig.TCPURL
cdnTCPEnableTLS = c.CDNConfig.TCPURLEnableTLS
}

envvars := make(map[string]string, len(h.Configuration().Provision.InjectEnvVars))

for _, e := range h.Configuration().Provision.InjectEnvVars {
Expand All @@ -272,16 +299,19 @@ func (c *Common) GenerateWorkerConfig(ctx context.Context, h hatchery.Interface,
}

cfg := workerruntime.WorkerConfig{
Name: spawnArgs.WorkerName,
BookedJobID: spawnArgs.JobID,
HatcheryName: h.Name(),
Model: spawnArgs.ModelName(),
APIToken: spawnArgs.WorkerToken,
APIEndpoint: apiURL,
APIEndpointInsecure: httpInsecure,
InjectEnvVars: envvars,
Region: h.Configuration().Provision.Region,
Basedir: h.Configuration().Provision.WorkerBasedir,
Name: spawnArgs.WorkerName,
BookedJobID: spawnArgs.JobID,
HatcheryName: h.Name(),
Model: spawnArgs.ModelName(),
APIToken: spawnArgs.WorkerToken,
APIEndpoint: apiURL,
APIEndpointInsecure: httpInsecure,
CDNEndpoint: cdnURL,
GelfServiceAddr: cdnTCP,
GelfServiceAddrEnableTLS: cdnTCPEnableTLS,
InjectEnvVars: envvars,
Region: h.Configuration().Provision.Region,
Basedir: h.Configuration().Provision.WorkerBasedir,
Log: cdslog.Conf{
GraylogHost: h.Configuration().Provision.WorkerLogsOptions.Graylog.Host,
GraylogPort: strconv.Itoa(h.Configuration().Provision.WorkerLogsOptions.Graylog.Port),
Expand Down
17 changes: 4 additions & 13 deletions engine/hatchery/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ var _ hatchery.InterfaceWithModels = new(HatcherySwarm)

// InitHatchery connect the hatchery to the docker api
func (h *HatcherySwarm) InitHatchery(ctx context.Context) error {
if err := h.Common.Init(ctx, h); err != nil {
return err
}

h.dockerClients = map[string]*dockerClient{}

if len(h.Config.DockerEngines) == 0 {
Expand Down Expand Up @@ -173,9 +177,6 @@ func (h *HatcherySwarm) InitHatchery(ctx context.Context) error {
}
}

if err := h.RefreshServiceLogger(ctx); err != nil {
log.Error(ctx, "Hatchery> swarm> Cannot get cdn configuration : %v", err)
}
h.GoRoutines.Run(ctx, "swarm", func(ctx context.Context) {
h.routines(ctx)
})
Expand Down Expand Up @@ -506,10 +507,6 @@ func (h *HatcherySwarm) WorkersStarted(ctx context.Context) ([]string, error) {
return res, nil
}

func (h *HatcherySwarm) GetLogger() *logrus.Logger {
return h.ServiceLogger
}

// Start inits client and routines for hatchery
func (h *HatcherySwarm) Start(ctx context.Context) error {
return hatchery.Create(ctx, h)
Expand Down Expand Up @@ -551,12 +548,6 @@ func (h *HatcherySwarm) routines(ctx context.Context) {
h.GoRoutines.Exec(ctx, "killAwolWorker", func(ctx context.Context) {
_ = h.killAwolWorker(ctx)
})

h.GoRoutines.Exec(ctx, "refreshCDNConfiguration", func(ctx context.Context) {
if err := h.RefreshServiceLogger(ctx); err != nil {
log.Error(ctx, "Hatchery> swarm> Cannot get cdn configuration : %v", err)
}
})
case <-ctx.Done():
if ctx.Err() != nil {
log.Error(ctx, "Hatchery> Swarm> Exiting routines")
Expand Down
3 changes: 2 additions & 1 deletion engine/hatchery/swarm/swarm_util_logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ func Test_serviceLogs(t *testing.T) {
gock.New("https://cds-api.local").Get("/worker").Reply(http.StatusOK).JSON([]sdk.Worker{{Name: "swarmy-model1-w1"}})

gock.New("https://cds-api.local").Get("/config/cdn").Reply(http.StatusOK).JSON(sdk.CDNConfig{TCPURL: "tcphost:8090"})
require.NoError(t, h.RefreshServiceLogger(context.TODO()))

require.NoError(t, h.Common.Init(context.TODO(), h))

containers := []types.Container{
{
Expand Down
Loading

0 comments on commit a35012a

Please sign in to comment.