From 3bfac2b3e6e1e59feac45bc9db33c8fbf30bf499 Mon Sep 17 00:00:00 2001 From: francois samin Date: Fri, 7 May 2021 16:02:34 +0200 Subject: [PATCH 1/3] feat(hatchery): inject env variables into workers from hatchery config --- engine/config.go | 1 + engine/hatchery/hatchery_local_test.go | 4 ++++ engine/hatchery/kubernetes/kubernetes.go | 4 ++-- engine/hatchery/kubernetes/kubernetes_test.go | 3 +++ engine/hatchery/local/worker_spawn.go | 5 ++++- engine/hatchery/marathon/marathon.go | 4 ++-- engine/hatchery/openstack/spawn.go | 2 +- engine/hatchery/serve.go | 14 +++++++++++++- engine/hatchery/swarm/swarm.go | 4 ++-- engine/hatchery/vsphere/spawn.go | 6 +++++- engine/service/types.go | 15 ++++++++------- sdk/worker.go | 2 ++ 12 files changed, 47 insertions(+), 17 deletions(-) diff --git a/engine/config.go b/engine/config.go index 840db4e6c2..572256eac7 100644 --- a/engine/config.go +++ b/engine/config.go @@ -115,6 +115,7 @@ func configBootstrap(args []string) Configuration { conf.Hatchery.Openstack.HTTP.Port = 8086 case sdk.TypeHatchery + ":swarm": conf.Hatchery.Swarm = &swarm.HatcheryConfiguration{} + conf.Hatchery.Swarm.Provision.InjectEnvVars = []string{"CDS=1"} defaults.SetDefaults(conf.Hatchery.Swarm) conf.Hatchery.Swarm.DockerEngines = map[string]swarm.DockerEngineConfiguration{ "sample-docker-engine": { diff --git a/engine/hatchery/hatchery_local_test.go b/engine/hatchery/hatchery_local_test.go index d3cb01d90b..6a4af3b918 100644 --- a/engine/hatchery/hatchery_local_test.go +++ b/engine/hatchery/hatchery_local_test.go @@ -2,6 +2,7 @@ package hatchery_test import ( "context" + "fmt" "os" "os/exec" "strings" @@ -49,6 +50,7 @@ func TestHatcheryLocal(t *testing.T) { cfg.API.MaxHeartbeatFailures = 0 cfg.Provision.RegisterFrequency = 1 cfg.Provision.MaxWorker = 1 + cfg.Provision.InjectEnvVars = []string{"AAA=AAA"} privKey, _ := jws.NewRandomRSAKey() privKeyPEM, _ := jws.ExportPrivateKey(privKey) cfg.RSAPrivateKey = string(privKeyPEM) @@ -109,5 +111,7 @@ func TestHelperProcess(*testing.T) { if os.Getenv("GO_WANT_HELPER_PROCESS") != "1" { return } + + fmt.Println(os.Environ()) time.Sleep(30 * time.Second) } diff --git a/engine/hatchery/kubernetes/kubernetes.go b/engine/hatchery/kubernetes/kubernetes.go index 4b26c2c6d7..d452753ac7 100644 --- a/engine/hatchery/kubernetes/kubernetes.go +++ b/engine/hatchery/kubernetes/kubernetes.go @@ -193,7 +193,7 @@ func (h *HatcheryKubernetes) SpawnWorker(ctx context.Context, spawnArgs hatchery } } - udataParam := h.GenerateWorkerArgs(h, spawnArgs) + udataParam := h.GenerateWorkerArgs(ctx, h, spawnArgs) udataParam.TTL = h.Config.WorkerTTL udataParam.WorkflowJobID = spawnArgs.JobID @@ -216,7 +216,7 @@ func (h *HatcheryKubernetes) SpawnWorker(ctx context.Context, spawnArgs hatchery if spawnArgs.Model.ModelDocker.Envs == nil { spawnArgs.Model.ModelDocker.Envs = map[string]string{} } - envsWm := map[string]string{} + envsWm := udataParam.InjectEnvVars envsWm["CDS_MODEL_MEMORY"] = fmt.Sprintf("%d", memory) envsWm["CDS_API"] = udataParam.API envsWm["CDS_TOKEN"] = udataParam.Token diff --git a/engine/hatchery/kubernetes/kubernetes_test.go b/engine/hatchery/kubernetes/kubernetes_test.go index 673822481e..c522195ba1 100644 --- a/engine/hatchery/kubernetes/kubernetes_test.go +++ b/engine/hatchery/kubernetes/kubernetes_test.go @@ -65,6 +65,7 @@ func TestHatcheryKubernetes_Status(t *testing.T) { defer gock.Off() defer gock.Observe(nil) h := NewHatcheryKubernetesTest(t) + h.Config.HatcheryCommonConfiguration.Provision.InjectEnvVars = []string{"ZZZZ=ZZZZ"} m := &sdk.Model{ Name: "model1", @@ -96,6 +97,8 @@ func TestHatcheryKubernetes_Status(t *testing.T) { require.Equal(t, 2, len(podRequest.Spec.Containers)) require.Equal(t, "k8s-toto", podRequest.Spec.Containers[0].Name) require.Equal(t, int64(4096), podRequest.Spec.Containers[0].Resources.Requests.Memory().Value()) + require.Equal(t, "ZZZZ", podRequest.Spec.Containers[0].Env[1].Name) + require.Equal(t, "ZZZZ", podRequest.Spec.Containers[0].Env[1].Value) require.Equal(t, "service-0-pg", podRequest.Spec.Containers[1].Name) require.Equal(t, 1, len(podRequest.Spec.Containers[1].Env)) require.Equal(t, "PG_USERNAME", podRequest.Spec.Containers[1].Env[0].Name) diff --git a/engine/hatchery/local/worker_spawn.go b/engine/hatchery/local/worker_spawn.go index 41cd5a8294..78e0d22c1b 100644 --- a/engine/hatchery/local/worker_spawn.go +++ b/engine/hatchery/local/worker_spawn.go @@ -71,7 +71,7 @@ func (h *HatcheryLocal) SpawnWorker(ctx context.Context, spawnArgs hatchery.Spaw log.Info(ctx, "HatcheryLocal.SpawnWorker> basedir: %s", basedir) - udataParam := h.GenerateWorkerArgs(h, spawnArgs) + udataParam := h.GenerateWorkerArgs(ctx, h, spawnArgs) udataParam.BaseDir = basedir udataParam.WorkerBinary = path.Join(h.BasedirDedicated, h.getWorkerBinaryName()) udataParam.WorkflowJobID = spawnArgs.JobID @@ -109,6 +109,9 @@ func (h *HatcheryLocal) SpawnWorker(ctx context.Context, spawnArgs hatchery.Spaw cmd.Env = append(cmd.Env, e) } } + for k, v := range udataParam.InjectEnvVars { + cmd.Env = append(cmd.Env, k+"="+v) + } // Wait in a goroutine so that when process exits, Wait() update cmd.ProcessState go func() { diff --git a/engine/hatchery/marathon/marathon.go b/engine/hatchery/marathon/marathon.go index b7843c9d78..e506d117e3 100644 --- a/engine/hatchery/marathon/marathon.go +++ b/engine/hatchery/marathon/marathon.go @@ -237,7 +237,7 @@ func (h *HatcheryMarathon) SpawnWorker(ctx context.Context, spawnArgs hatchery.S instance := 1 forcePull := strings.HasSuffix(spawnArgs.Model.ModelDocker.Image, ":latest") - udataParam := h.GenerateWorkerArgs(h, spawnArgs) + udataParam := h.GenerateWorkerArgs(ctx, h, spawnArgs) udataParam.TTL = h.Config.WorkerTTL udataParam.WorkflowJobID = spawnArgs.JobID @@ -279,7 +279,7 @@ func (h *HatcheryMarathon) SpawnWorker(ctx context.Context, spawnArgs hatchery.S spawnArgs.Model.ModelDocker.Envs = map[string]string{} } - envsWm := map[string]string{} + envsWm := udataParam.InjectEnvVars envsWm["CDS_MODEL_MEMORY"] = fmt.Sprintf("%d", memory) envsWm["CDS_API"] = udataParam.API envsWm["CDS_TOKEN"] = udataParam.Token diff --git a/engine/hatchery/openstack/spawn.go b/engine/hatchery/openstack/spawn.go index 00853bf306..b0b28ab23d 100644 --- a/engine/hatchery/openstack/spawn.go +++ b/engine/hatchery/openstack/spawn.go @@ -77,7 +77,7 @@ func (h *HatcheryOpenstack) SpawnWorker(ctx context.Context, spawnArgs hatchery. return err } - udataParam := h.GenerateWorkerArgs(h, spawnArgs) + udataParam := h.GenerateWorkerArgs(ctx, h, spawnArgs) udataParam.TTL = h.Config.WorkerTTL udataParam.FromWorkerImage = withExistingImage udataParam.WorkflowJobID = spawnArgs.JobID diff --git a/engine/hatchery/serve.go b/engine/hatchery/serve.go index 86784fe567..689f7f529e 100644 --- a/engine/hatchery/serve.go +++ b/engine/hatchery/serve.go @@ -250,7 +250,7 @@ func getStatusHandler(h hatchery.Interface) service.HandlerFunc { } } -func (c *Common) GenerateWorkerArgs(h hatchery.Interface, spawnArgs hatchery.SpawnArguments) sdk.WorkerArgs { +func (c *Common) GenerateWorkerArgs(ctx context.Context, h hatchery.Interface, spawnArgs hatchery.SpawnArguments) sdk.WorkerArgs { apiURL := h.Configuration().Provision.WorkerAPIHTTP.URL httpInsecure := h.Configuration().Provision.WorkerAPIHTTP.Insecure if apiURL == "" { @@ -258,6 +258,17 @@ func (c *Common) GenerateWorkerArgs(h hatchery.Interface, spawnArgs hatchery.Spa httpInsecure = h.Configuration().API.HTTP.Insecure } + envvars := make(map[string]string, len(h.Configuration().Provision.InjectEnvVars)) + + for _, e := range h.Configuration().Provision.InjectEnvVars { + tuple := strings.SplitN(e, "=", 2) + if len(tuple) != 2 { + log.Error(ctx, "invalid env variable to inject: %q", e) + continue + } + envvars[tuple[0]] = tuple[1] + } + return sdk.WorkerArgs{ API: apiURL, HTTPInsecure: httpInsecure, @@ -265,6 +276,7 @@ func (c *Common) GenerateWorkerArgs(h hatchery.Interface, spawnArgs hatchery.Spa Name: spawnArgs.WorkerName, Model: spawnArgs.ModelName(), HatcheryName: h.Name(), + InjectEnvVars: envvars, GraylogHost: h.Configuration().Provision.WorkerLogsOptions.Graylog.Host, GraylogPort: h.Configuration().Provision.WorkerLogsOptions.Graylog.Port, GraylogExtraKey: h.Configuration().Provision.WorkerLogsOptions.Graylog.ExtraKey, diff --git a/engine/hatchery/swarm/swarm.go b/engine/hatchery/swarm/swarm.go index 89583f74b4..b455ccc0e8 100644 --- a/engine/hatchery/swarm/swarm.go +++ b/engine/hatchery/swarm/swarm.go @@ -374,7 +374,7 @@ func (h *HatcherySwarm) SpawnWorker(ctx context.Context, spawnArgs hatchery.Spaw return errDockerOpts } - udataParam := h.GenerateWorkerArgs(h, spawnArgs) + udataParam := h.GenerateWorkerArgs(ctx, h, spawnArgs) udataParam.TTL = h.Config.WorkerTTL udataParam.WorkflowJobID = spawnArgs.JobID @@ -395,7 +395,7 @@ func (h *HatcherySwarm) SpawnWorker(ctx context.Context, spawnArgs hatchery.Spaw modelEnvs[k] = v } - envsWm := map[string]string{} + envsWm := udataParam.InjectEnvVars envsWm["CDS_MODEL_MEMORY"] = fmt.Sprintf("%d", memory) envsWm["CDS_API"] = udataParam.API envsWm["CDS_TOKEN"] = udataParam.Token diff --git a/engine/hatchery/vsphere/spawn.go b/engine/hatchery/vsphere/spawn.go index 15084ea614..77c19c44a7 100644 --- a/engine/hatchery/vsphere/spawn.go +++ b/engine/hatchery/vsphere/spawn.go @@ -323,7 +323,7 @@ func (h *HatcheryVSphere) launchScriptWorker(ctx context.Context, name string, j return errt } - udataParam := h.GenerateWorkerArgs(h, hatchery.SpawnArguments{ + udataParam := h.GenerateWorkerArgs(ctx, h, hatchery.SpawnArguments{ WorkerToken: token, WorkerName: name, Model: &model, @@ -332,6 +332,10 @@ func (h *HatcheryVSphere) launchScriptWorker(ctx context.Context, name string, j udataParam.FromWorkerImage = true udataParam.WorkflowJobID = jobID + for k, v := range udataParam.InjectEnvVars { + env = append(env, k+"="+v) + } + var buffer bytes.Buffer if err := tmpl.Execute(&buffer, udataParam); err != nil { return err diff --git a/engine/service/types.go b/engine/service/types.go index 98e45d76d9..d2a21e966e 100644 --- a/engine/service/types.go +++ b/engine/service/types.go @@ -46,13 +46,14 @@ type HatcheryCommonConfiguration struct { MaxHeartbeatFailures int `toml:"maxHeartbeatFailures" default:"10" comment:"Maximum allowed consecutives failures on heatbeat routine" json:"maxHeartbeatFailures"` } `toml:"api" json:"api"` Provision struct { - RatioService *int `toml:"ratioService" default:"50" commented:"true" comment:"Percent reserved for spawning worker with service requirement" json:"ratioService,omitempty" mapstructure:"ratioService"` - MaxWorker int `toml:"maxWorker" default:"10" comment:"Maximum allowed simultaneous workers" json:"maxWorker"` - MaxConcurrentProvisioning int `toml:"maxConcurrentProvisioning" default:"10" comment:"Maximum allowed simultaneous workers provisioning" json:"maxConcurrentProvisioning"` - MaxConcurrentRegistering int `toml:"maxConcurrentRegistering" default:"2" comment:"Maximum allowed simultaneous workers registering. -1 to disable registering on this hatchery" json:"maxConcurrentRegistering"` - RegisterFrequency int `toml:"registerFrequency" default:"60" comment:"Check if some worker model have to be registered each n Seconds" json:"registerFrequency"` - Region string `toml:"region" default:"" comment:"region of this hatchery - optional. With a free text as 'myregion', user can set a prerequisite 'region' with value 'myregion' on CDS Job" json:"region"` - IgnoreJobWithNoRegion bool `toml:"ignoreJobWithNoRegion" default:"false" comment:"Ignore job without a region prerequisite if ignoreJobWithNoRegion=true"` + InjectEnvVars []string `toml:"injectEnvVars" commented:"true" comment:"Inject env variables in workers" json:"injectEnvVars,omitempty" mapstructure:"injectEnvVars"` + RatioService *int `toml:"ratioService" default:"50" commented:"true" comment:"Percent reserved for spawning worker with service requirement" json:"ratioService,omitempty" mapstructure:"ratioService"` + MaxWorker int `toml:"maxWorker" default:"10" comment:"Maximum allowed simultaneous workers" json:"maxWorker"` + MaxConcurrentProvisioning int `toml:"maxConcurrentProvisioning" default:"10" comment:"Maximum allowed simultaneous workers provisioning" json:"maxConcurrentProvisioning"` + MaxConcurrentRegistering int `toml:"maxConcurrentRegistering" default:"2" comment:"Maximum allowed simultaneous workers registering. -1 to disable registering on this hatchery" json:"maxConcurrentRegistering"` + RegisterFrequency int `toml:"registerFrequency" default:"60" comment:"Check if some worker model have to be registered each n Seconds" json:"registerFrequency"` + Region string `toml:"region" default:"" comment:"region of this hatchery - optional. With a free text as 'myregion', user can set a prerequisite 'region' with value 'myregion' on CDS Job" json:"region"` + IgnoreJobWithNoRegion bool `toml:"ignoreJobWithNoRegion" default:"false" comment:"Ignore job without a region prerequisite if ignoreJobWithNoRegion=true"` WorkerAPIHTTP struct { URL string `toml:"url" default:"http://localhost:8081" commented:"true" comment:"CDS API URL for worker, let empty or commented to use the same URL that is used by the Hatchery" json:"url"` Insecure bool `toml:"insecure" default:"false" commented:"true" comment:"sslInsecureSkipVerify, set to true if you use a self-signed SSL on CDS API" json:"insecure"` diff --git a/sdk/worker.go b/sdk/worker.go index 27b641cb96..805017b6a1 100644 --- a/sdk/worker.go +++ b/sdk/worker.go @@ -56,6 +56,8 @@ type WorkerArgs struct { GraylogExtraKey string `json:"graylog_extra_key"` GraylogExtraValue string `json:"graylog_extra_value"` WorkerBinary string + // Env variables + InjectEnvVars map[string]string `json:"inject_env_vars"` } // TemplateEnvs return envs interpolated with worker arguments From 32595c6cff774bcc390b236fbc01dfc15dd2c5fd Mon Sep 17 00:00:00 2001 From: francois samin Date: Fri, 7 May 2021 16:04:05 +0200 Subject: [PATCH 2/3] feat(hatchery): inject env variables into workers from hatchery config --- engine/hatchery/hatchery_local_test.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/engine/hatchery/hatchery_local_test.go b/engine/hatchery/hatchery_local_test.go index 6a4af3b918..a4687d13de 100644 --- a/engine/hatchery/hatchery_local_test.go +++ b/engine/hatchery/hatchery_local_test.go @@ -2,7 +2,6 @@ package hatchery_test import ( "context" - "fmt" "os" "os/exec" "strings" @@ -107,11 +106,11 @@ func TestHatcheryLocal(t *testing.T) { } } -func TestHelperProcess(*testing.T) { +func TestHelperProcess(t *testing.T) { if os.Getenv("GO_WANT_HELPER_PROCESS") != "1" { return } - fmt.Println(os.Environ()) + t.Log(os.Environ()) time.Sleep(30 * time.Second) } From 25d4fd05bbcc26e0e1e8a8e245251fa054fa460f Mon Sep 17 00:00:00 2001 From: francois samin Date: Fri, 7 May 2021 17:00:59 +0200 Subject: [PATCH 3/3] fix: cr Signed-off-by: francois samin --- engine/config.go | 1 - engine/service/types.go | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/engine/config.go b/engine/config.go index 572256eac7..840db4e6c2 100644 --- a/engine/config.go +++ b/engine/config.go @@ -115,7 +115,6 @@ func configBootstrap(args []string) Configuration { conf.Hatchery.Openstack.HTTP.Port = 8086 case sdk.TypeHatchery + ":swarm": conf.Hatchery.Swarm = &swarm.HatcheryConfiguration{} - conf.Hatchery.Swarm.Provision.InjectEnvVars = []string{"CDS=1"} defaults.SetDefaults(conf.Hatchery.Swarm) conf.Hatchery.Swarm.DockerEngines = map[string]swarm.DockerEngineConfiguration{ "sample-docker-engine": { diff --git a/engine/service/types.go b/engine/service/types.go index d2a21e966e..f031a69783 100644 --- a/engine/service/types.go +++ b/engine/service/types.go @@ -46,7 +46,7 @@ type HatcheryCommonConfiguration struct { MaxHeartbeatFailures int `toml:"maxHeartbeatFailures" default:"10" comment:"Maximum allowed consecutives failures on heatbeat routine" json:"maxHeartbeatFailures"` } `toml:"api" json:"api"` Provision struct { - InjectEnvVars []string `toml:"injectEnvVars" commented:"true" comment:"Inject env variables in workers" json:"injectEnvVars,omitempty" mapstructure:"injectEnvVars"` + InjectEnvVars []string `toml:"injectEnvVars" commented:"true" comment:"Inject env variables in workers" json:"-" mapstructure:"injectEnvVars"` RatioService *int `toml:"ratioService" default:"50" commented:"true" comment:"Percent reserved for spawning worker with service requirement" json:"ratioService,omitempty" mapstructure:"ratioService"` MaxWorker int `toml:"maxWorker" default:"10" comment:"Maximum allowed simultaneous workers" json:"maxWorker"` MaxConcurrentProvisioning int `toml:"maxConcurrentProvisioning" default:"10" comment:"Maximum allowed simultaneous workers provisioning" json:"maxConcurrentProvisioning"`