Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(hatchery): add CDN configuration to Worker spawn args #6254

Merged
merged 1 commit into from
Aug 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions engine/api/workflow_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,11 @@ func (api *API) postTakeWorkflowJobHandler() service.Handler {
return sdk.WrapError(err, "cannot takeJob nodeJobRunID:%d", id)
}

// Get CDN TCP Addr
// Get CDN TCP Addr
// FIXME remove CDN info from payload, this information should be injected by the hatchery
pbji.GelfServiceAddr, pbji.GelfServiceAddrEnableTLS, err = services.GetCDNPublicTCPAdress(ctx, api.mustDB())
if err != nil {
return err
}

pbji.CDNHttpAddr, err = services.GetCDNPublicHTTPAdress(ctx, api.mustDB())
if err != nil {
return err
Expand Down
15 changes: 2 additions & 13 deletions engine/hatchery/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
jwt "github.com/golang-jwt/jwt"
"github.com/gorilla/mux"
"github.com/rockbears/log"
"github.com/sirupsen/logrus"

apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand All @@ -38,8 +37,8 @@ var _ hatchery.InterfaceWithModels = new(HatcheryKubernetes)

// InitHatchery register local hatchery with its worker model
func (h *HatcheryKubernetes) InitHatchery(ctx context.Context) error {
if err := h.Common.RefreshServiceLogger(ctx); err != nil {
log.Error(ctx, "hatchery> kubernetes> cannot get cdn configuration : %v", err)
if err := h.Common.Init(ctx, h); err != nil {
return err
}
h.GoRoutines.Run(context.Background(), "hatchery kubernetes routines", func(ctx context.Context) {
h.routines(ctx)
Expand Down Expand Up @@ -411,10 +410,6 @@ func (h *HatcheryKubernetes) SpawnWorker(ctx context.Context, spawnArgs hatchery
return sdk.WithStack(err)
}

func (h *HatcheryKubernetes) GetLogger() *logrus.Logger {
return h.ServiceLogger
}

// WorkersStarted returns the number of instances started but
// not necessarily register on CDS yet
func (h *HatcheryKubernetes) WorkersStarted(ctx context.Context) ([]string, error) {
Expand Down Expand Up @@ -446,12 +441,6 @@ func (h *HatcheryKubernetes) routines(ctx context.Context) {
for {
select {
case <-ticker.C:
h.GoRoutines.Exec(ctx, "getCDNConfiguration", func(ctx context.Context) {
if err := h.Common.RefreshServiceLogger(ctx); err != nil {
log.ErrorWithStackTrace(ctx, sdk.WrapError(err, "cannot get cdn configuration"))
}
})

h.GoRoutines.Exec(ctx, "getServicesLogs", func(ctx context.Context) {
if err := h.getServicesLogs(ctx); err != nil {
log.ErrorWithStackTrace(ctx, sdk.WrapError(err, "cannot get service logs"))
Expand Down
3 changes: 2 additions & 1 deletion engine/hatchery/kubernetes/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ func Test_serviceLogs(t *testing.T) {
gock.New("http://lolcat.api").Get("/worker").Reply(http.StatusOK).JSON([]sdk.Worker{{Name: "pod-name"}})

gock.New("http://lolcat.api").Get("/config/cdn").Reply(http.StatusOK).JSON(sdk.CDNConfig{TCPURL: "tcphost:8090"})
require.NoError(t, h.RefreshServiceLogger(context.TODO()))

require.NoError(t, h.Common.Init(context.TODO(), h))

podsList := v1.PodList{
Items: []v1.Pod{
Expand Down
15 changes: 2 additions & 13 deletions engine/hatchery/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
jwt "github.com/golang-jwt/jwt"
"github.com/gorilla/mux"
"github.com/rockbears/log"
"github.com/sirupsen/logrus"

"github.com/ovh/cds/engine/api"
"github.com/ovh/cds/engine/service"
Expand Down Expand Up @@ -258,20 +257,15 @@ func (h *HatcheryLocal) WorkersStarted(ctx context.Context) ([]string, error) {
// InitHatchery register local hatchery with its worker model
func (h *HatcheryLocal) InitHatchery(ctx context.Context) error {
h.workers = make(map[string]workerCmd)
if err := h.RefreshServiceLogger(ctx); err != nil {
log.Error(ctx, "Hatchery> local> Cannot get cdn configuration : %v", err)
if err := h.Common.Init(ctx, h); err != nil {
return nil
}
h.GoRoutines.Run(ctx, "hatchery locale routines", func(ctx context.Context) {
h.routines(ctx)
})
return nil
}

// GetLogger retuns the hatchery local logger
func (h *HatcheryLocal) GetLogger() *logrus.Logger {
return h.ServiceLogger
}

func (h *HatcheryLocal) routines(ctx context.Context) {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
Expand All @@ -284,11 +278,6 @@ func (h *HatcheryLocal) routines(ctx context.Context) {
log.Warn(ctx, "Cannot kill awol workers: %s", err)
}
})
h.GoRoutines.Exec(ctx, "local-refreshCDNConfiguration", func(ctx context.Context) {
if err := h.RefreshServiceLogger(ctx); err != nil {
log.Error(ctx, "Hatchery> local> Cannot get cdn configuration : %v", err)
}
})
case <-ctx.Done():
if ctx.Err() != nil {
log.Error(ctx, "Hatchery> local> Exiting routines")
Expand Down
15 changes: 2 additions & 13 deletions engine/hatchery/marathon/marathon.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
jwt "github.com/golang-jwt/jwt"
"github.com/gorilla/mux"
"github.com/rockbears/log"
"github.com/sirupsen/logrus"

"github.com/ovh/cds/engine/api"
"github.com/ovh/cds/engine/service"
Expand All @@ -35,11 +34,6 @@ func New() *HatcheryMarathon {

var _ hatchery.InterfaceWithModels = new(HatcheryMarathon)

// GetLogger return the logger
func (h *HatcheryMarathon) GetLogger() *logrus.Logger {
return h.ServiceLogger
}

// Init cdsclient config.
func (h *HatcheryMarathon) Init(config interface{}) (cdsclient.ServiceConfig, error) {
var cfg cdsclient.ServiceConfig
Expand Down Expand Up @@ -441,8 +435,8 @@ func (h *HatcheryMarathon) WorkersStarted(ctx context.Context) ([]string, error)

// InitHatchery only starts killing routine of worker not registered
func (h *HatcheryMarathon) InitHatchery(ctx context.Context) error {
if err := h.RefreshServiceLogger(ctx); err != nil {
log.Error(ctx, "Hatchery> marathon> Cannot get cdn configuration : %v", err)
if err := h.Common.Init(ctx, h); err != nil {
return err
}
h.GoRoutines.Run(ctx, "marathon-routines", func(ctx context.Context) {
h.routines(ctx)
Expand All @@ -467,11 +461,6 @@ func (h *HatcheryMarathon) routines(ctx context.Context) {
log.Warn(context.Background(), "Cannot kill awol workers: %s", err)
}
})
h.GoRoutines.Exec(ctx, "marathon-refreshCDNConfiguration", func(ctx context.Context) {
if err := h.RefreshServiceLogger(ctx); err != nil {
log.Error(ctx, "Hatchery> marathon> Cannot get cdn configuration : %v", err)
}
})
case <-ctx.Done():
if ctx.Err() != nil {
log.Error(ctx, "Hatchery> marathon> Exiting routines")
Expand Down
7 changes: 4 additions & 3 deletions engine/hatchery/openstack/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ import (
// then list available models
// then list available images
func (h *HatcheryOpenstack) InitHatchery(ctx context.Context) error {
if err := h.Common.Init(ctx, h); err != nil {
return err
}

workersAlive = map[string]int64{}

authOpts := gophercloud.AuthOptions{
Expand Down Expand Up @@ -51,9 +55,6 @@ func (h *HatcheryOpenstack) InitHatchery(ctx context.Context) error {
log.Warn(ctx, "Error on initIPStatus(): %v", err)
}

if err := h.RefreshServiceLogger(ctx); err != nil {
log.Error(ctx, "Hatchery> openstack> Cannot get cdn configuration : %v", err)
}
h.GoRoutines.Run(ctx, "hatchery openstack routines", func(ctx context.Context) {
h.main(ctx)
})
Expand Down
10 changes: 0 additions & 10 deletions engine/hatchery/openstack/openstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/gophercloud/gophercloud/openstack/compute/v2/servers"
"github.com/gorilla/mux"
"github.com/rockbears/log"
"github.com/sirupsen/logrus"

"github.com/ovh/cds/engine/api"
"github.com/ovh/cds/engine/service"
Expand Down Expand Up @@ -214,13 +213,8 @@ func (h *HatcheryOpenstack) CanSpawn(ctx context.Context, model *sdk.Model, jobI
return true
}

func (h *HatcheryOpenstack) GetLogger() *logrus.Logger {
return h.ServiceLogger
}

func (h *HatcheryOpenstack) main(ctx context.Context) {
serverListTick := time.NewTicker(10 * time.Second).C
cdnConfTick := time.NewTicker(10 * time.Second).C
killAwolServersTick := time.NewTicker(30 * time.Second).C
killErrorServersTick := time.NewTicker(60 * time.Second).C
killDisabledWorkersTick := time.NewTicker(60 * time.Second).C
Expand All @@ -235,10 +229,6 @@ func (h *HatcheryOpenstack) main(ctx context.Context) {
h.killErrorServers(ctx)
case <-killDisabledWorkersTick:
h.killDisabledWorkers()
case <-cdnConfTick:
if err := h.RefreshServiceLogger(ctx); err != nil {
log.Error(ctx, "Hatchery> openstack> Cannot get cdn configuration : %v", err)
}
case <-ctx.Done():
if ctx.Err() != nil {
log.Error(ctx, "Hatchery> openstack> Exiting routines")
Expand Down
84 changes: 57 additions & 27 deletions engine/hatchery/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"time"

"github.com/rockbears/log"
"gopkg.in/square/go-jose.v2"

"github.com/ovh/cds/engine/api"
"github.com/ovh/cds/engine/service"
Expand Down Expand Up @@ -116,36 +115,53 @@ func (c *Common) GetPrivateKey() *rsa.PrivateKey {
return c.Common.PrivateKey
}

func (c *Common) RefreshServiceLogger(ctx context.Context) error {
cdnConfig, err := c.Client.ConfigCDN()
if err != nil {
if sdk.ErrorIs(err, sdk.ErrNotFound) {
c.CDNLogsURL = ""
c.ServiceLogger = nil
func (c *Common) Init(ctx context.Context, h hatchery.Interface) error {
c.CDNConfig.HTTPURL = h.Configuration().CDN.URL
c.CDNConfig.TCPURL = h.Configuration().CDN.TCP.URL
c.CDNConfig.TCPURLEnableTLS = h.Configuration().CDN.TCP.EnableTLS

// Init CDN config from Hatchery config or public CDN information
if c.CDNConfig.HTTPURL == "" || c.CDNConfig.TCPURL == "" {
// Load CDN information from CDS API
var cfg sdk.CDNConfig
var err error
for {
cfg, err = c.Client.ConfigCDN()
if err == nil {
break
}
err = sdk.NewErrorFrom(err, "cannot get CDN config from CDS API, retrying...")
log.ErrorWithStackTrace(ctx, err)
time.Sleep(2 * time.Second)
}
if c.CDNConfig.HTTPURL == "" {
c.CDNConfig.HTTPURL = cfg.HTTPURL
}
if c.CDNConfig.TCPURL == "" {
c.CDNConfig.TCPURL = cfg.TCPURL
c.CDNConfig.TCPURLEnableTLS = cfg.TCPURLEnableTLS
}
return err
}
if cdnConfig.TCPURL == c.CDNLogsURL {
return nil
}
c.CDNLogsURL = cdnConfig.TCPURL

return c.initServiceLogger(ctx)
}

func (c *Common) initServiceLogger(ctx context.Context) error {
if c.Signer == nil {
var signer jose.Signer
signer, err = jws.NewSigner(c.Common.PrivateKey)
signer, err := jws.NewSigner(c.Common.PrivateKey)
if err != nil {
return sdk.WithStack(err)
}
c.Signer = signer
}

var graylogCfg = &hook.Config{
Addr: c.CDNLogsURL,
Addr: c.CDNConfig.TCPURL,
Protocol: "tcp",
}

if cdnConfig.TCPURLEnableTLS {
tcpCDNUrl := c.CDNLogsURL
if c.CDNConfig.TCPURLEnableTLS {
tcpCDNUrl := c.CDNConfig.TCPURL
// Check if the url has a scheme
// We have to remove if to retrieve the hostname
if i := strings.Index(tcpCDNUrl, "://"); i > -1 {
Expand Down Expand Up @@ -260,6 +276,17 @@ func (c *Common) GenerateWorkerConfig(ctx context.Context, h hatchery.Interface,
httpInsecure = h.Configuration().API.HTTP.Insecure
}

cdnURL := h.Configuration().Provision.WorkerCDN.URL
if cdnURL == "" {
cdnURL = c.CDNConfig.HTTPURL
}
cdnTCP := h.Configuration().Provision.WorkerCDN.TCP.URL
cdnTCPEnableTLS := h.Configuration().Provision.WorkerCDN.TCP.EnableTLS
if cdnTCP == "" {
cdnTCP = c.CDNConfig.TCPURL
cdnTCPEnableTLS = c.CDNConfig.TCPURLEnableTLS
}

envvars := make(map[string]string, len(h.Configuration().Provision.InjectEnvVars))

for _, e := range h.Configuration().Provision.InjectEnvVars {
Expand All @@ -272,16 +299,19 @@ func (c *Common) GenerateWorkerConfig(ctx context.Context, h hatchery.Interface,
}

cfg := workerruntime.WorkerConfig{
Name: spawnArgs.WorkerName,
BookedJobID: spawnArgs.JobID,
HatcheryName: h.Name(),
Model: spawnArgs.ModelName(),
APIToken: spawnArgs.WorkerToken,
APIEndpoint: apiURL,
APIEndpointInsecure: httpInsecure,
InjectEnvVars: envvars,
Region: h.Configuration().Provision.Region,
Basedir: h.Configuration().Provision.WorkerBasedir,
Name: spawnArgs.WorkerName,
BookedJobID: spawnArgs.JobID,
HatcheryName: h.Name(),
Model: spawnArgs.ModelName(),
APIToken: spawnArgs.WorkerToken,
APIEndpoint: apiURL,
APIEndpointInsecure: httpInsecure,
CDNEndpoint: cdnURL,
GelfServiceAddr: cdnTCP,
GelfServiceAddrEnableTLS: cdnTCPEnableTLS,
InjectEnvVars: envvars,
Region: h.Configuration().Provision.Region,
Basedir: h.Configuration().Provision.WorkerBasedir,
Log: cdslog.Conf{
GraylogHost: h.Configuration().Provision.WorkerLogsOptions.Graylog.Host,
GraylogPort: strconv.Itoa(h.Configuration().Provision.WorkerLogsOptions.Graylog.Port),
Expand Down
17 changes: 4 additions & 13 deletions engine/hatchery/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ var _ hatchery.InterfaceWithModels = new(HatcherySwarm)

// InitHatchery connect the hatchery to the docker api
func (h *HatcherySwarm) InitHatchery(ctx context.Context) error {
if err := h.Common.Init(ctx, h); err != nil {
return err
}

h.dockerClients = map[string]*dockerClient{}

if len(h.Config.DockerEngines) == 0 {
Expand Down Expand Up @@ -173,9 +177,6 @@ func (h *HatcherySwarm) InitHatchery(ctx context.Context) error {
}
}

if err := h.RefreshServiceLogger(ctx); err != nil {
log.Error(ctx, "Hatchery> swarm> Cannot get cdn configuration : %v", err)
}
h.GoRoutines.Run(ctx, "swarm", func(ctx context.Context) {
h.routines(ctx)
})
Expand Down Expand Up @@ -506,10 +507,6 @@ func (h *HatcherySwarm) WorkersStarted(ctx context.Context) ([]string, error) {
return res, nil
}

func (h *HatcherySwarm) GetLogger() *logrus.Logger {
return h.ServiceLogger
}

// Start inits client and routines for hatchery
func (h *HatcherySwarm) Start(ctx context.Context) error {
return hatchery.Create(ctx, h)
Expand Down Expand Up @@ -551,12 +548,6 @@ func (h *HatcherySwarm) routines(ctx context.Context) {
h.GoRoutines.Exec(ctx, "killAwolWorker", func(ctx context.Context) {
_ = h.killAwolWorker(ctx)
})

h.GoRoutines.Exec(ctx, "refreshCDNConfiguration", func(ctx context.Context) {
if err := h.RefreshServiceLogger(ctx); err != nil {
log.Error(ctx, "Hatchery> swarm> Cannot get cdn configuration : %v", err)
}
})
case <-ctx.Done():
if ctx.Err() != nil {
log.Error(ctx, "Hatchery> Swarm> Exiting routines")
Expand Down
3 changes: 2 additions & 1 deletion engine/hatchery/swarm/swarm_util_logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ func Test_serviceLogs(t *testing.T) {
gock.New("https://cds-api.local").Get("/worker").Reply(http.StatusOK).JSON([]sdk.Worker{{Name: "swarmy-model1-w1"}})

gock.New("https://cds-api.local").Get("/config/cdn").Reply(http.StatusOK).JSON(sdk.CDNConfig{TCPURL: "tcphost:8090"})
require.NoError(t, h.RefreshServiceLogger(context.TODO()))

require.NoError(t, h.Common.Init(context.TODO(), h))

containers := []types.Container{
{
Expand Down
Loading