Skip to content

Commit

Permalink
feat(hatchery): inject env variables into workers from hatchery config (
Browse files Browse the repository at this point in the history
  • Loading branch information
fsamin authored May 18, 2021
1 parent 521a739 commit 70909f2
Show file tree
Hide file tree
Showing 11 changed files with 46 additions and 18 deletions.
5 changes: 4 additions & 1 deletion engine/hatchery/hatchery_local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func TestHatcheryLocal(t *testing.T) {
cfg.API.MaxHeartbeatFailures = 0
cfg.Provision.RegisterFrequency = 1
cfg.Provision.MaxWorker = 1
cfg.Provision.InjectEnvVars = []string{"AAA=AAA"}
privKey, _ := jws.NewRandomRSAKey()
privKeyPEM, _ := jws.ExportPrivateKey(privKey)
cfg.RSAPrivateKey = string(privKeyPEM)
Expand Down Expand Up @@ -105,9 +106,11 @@ func TestHatcheryLocal(t *testing.T) {
}
}

func TestHelperProcess(*testing.T) {
func TestHelperProcess(t *testing.T) {
if os.Getenv("GO_WANT_HELPER_PROCESS") != "1" {
return
}

t.Log(os.Environ())
time.Sleep(30 * time.Second)
}
4 changes: 2 additions & 2 deletions engine/hatchery/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (h *HatcheryKubernetes) SpawnWorker(ctx context.Context, spawnArgs hatchery
}
}

udataParam := h.GenerateWorkerArgs(h, spawnArgs)
udataParam := h.GenerateWorkerArgs(ctx, h, spawnArgs)
udataParam.TTL = h.Config.WorkerTTL

udataParam.WorkflowJobID = spawnArgs.JobID
Expand All @@ -216,7 +216,7 @@ func (h *HatcheryKubernetes) SpawnWorker(ctx context.Context, spawnArgs hatchery
if spawnArgs.Model.ModelDocker.Envs == nil {
spawnArgs.Model.ModelDocker.Envs = map[string]string{}
}
envsWm := map[string]string{}
envsWm := udataParam.InjectEnvVars
envsWm["CDS_MODEL_MEMORY"] = fmt.Sprintf("%d", memory)
envsWm["CDS_API"] = udataParam.API
envsWm["CDS_TOKEN"] = udataParam.Token
Expand Down
3 changes: 3 additions & 0 deletions engine/hatchery/kubernetes/kubernetes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func TestHatcheryKubernetes_Status(t *testing.T) {
defer gock.Off()
defer gock.Observe(nil)
h := NewHatcheryKubernetesTest(t)
h.Config.HatcheryCommonConfiguration.Provision.InjectEnvVars = []string{"ZZZZ=ZZZZ"}

m := &sdk.Model{
Name: "model1",
Expand Down Expand Up @@ -96,6 +97,8 @@ func TestHatcheryKubernetes_Status(t *testing.T) {
require.Equal(t, 2, len(podRequest.Spec.Containers))
require.Equal(t, "k8s-toto", podRequest.Spec.Containers[0].Name)
require.Equal(t, int64(4096), podRequest.Spec.Containers[0].Resources.Requests.Memory().Value())
require.Equal(t, "ZZZZ", podRequest.Spec.Containers[0].Env[1].Name)
require.Equal(t, "ZZZZ", podRequest.Spec.Containers[0].Env[1].Value)
require.Equal(t, "service-0-pg", podRequest.Spec.Containers[1].Name)
require.Equal(t, 1, len(podRequest.Spec.Containers[1].Env))
require.Equal(t, "PG_USERNAME", podRequest.Spec.Containers[1].Env[0].Name)
Expand Down
5 changes: 4 additions & 1 deletion engine/hatchery/local/worker_spawn.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (h *HatcheryLocal) SpawnWorker(ctx context.Context, spawnArgs hatchery.Spaw

log.Info(ctx, "HatcheryLocal.SpawnWorker> basedir: %s", basedir)

udataParam := h.GenerateWorkerArgs(h, spawnArgs)
udataParam := h.GenerateWorkerArgs(ctx, h, spawnArgs)
udataParam.BaseDir = basedir
udataParam.WorkerBinary = path.Join(h.BasedirDedicated, h.getWorkerBinaryName())
udataParam.WorkflowJobID = spawnArgs.JobID
Expand Down Expand Up @@ -109,6 +109,9 @@ func (h *HatcheryLocal) SpawnWorker(ctx context.Context, spawnArgs hatchery.Spaw
cmd.Env = append(cmd.Env, e)
}
}
for k, v := range udataParam.InjectEnvVars {
cmd.Env = append(cmd.Env, k+"="+v)
}

// Wait in a goroutine so that when process exits, Wait() update cmd.ProcessState
go func() {
Expand Down
4 changes: 2 additions & 2 deletions engine/hatchery/marathon/marathon.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (h *HatcheryMarathon) SpawnWorker(ctx context.Context, spawnArgs hatchery.S
instance := 1
forcePull := strings.HasSuffix(spawnArgs.Model.ModelDocker.Image, ":latest")

udataParam := h.GenerateWorkerArgs(h, spawnArgs)
udataParam := h.GenerateWorkerArgs(ctx, h, spawnArgs)
udataParam.TTL = h.Config.WorkerTTL
udataParam.WorkflowJobID = spawnArgs.JobID

Expand Down Expand Up @@ -279,7 +279,7 @@ func (h *HatcheryMarathon) SpawnWorker(ctx context.Context, spawnArgs hatchery.S
spawnArgs.Model.ModelDocker.Envs = map[string]string{}
}

envsWm := map[string]string{}
envsWm := udataParam.InjectEnvVars
envsWm["CDS_MODEL_MEMORY"] = fmt.Sprintf("%d", memory)
envsWm["CDS_API"] = udataParam.API
envsWm["CDS_TOKEN"] = udataParam.Token
Expand Down
2 changes: 1 addition & 1 deletion engine/hatchery/openstack/spawn.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (h *HatcheryOpenstack) SpawnWorker(ctx context.Context, spawnArgs hatchery.
return err
}

udataParam := h.GenerateWorkerArgs(h, spawnArgs)
udataParam := h.GenerateWorkerArgs(ctx, h, spawnArgs)
udataParam.TTL = h.Config.WorkerTTL
udataParam.FromWorkerImage = withExistingImage
udataParam.WorkflowJobID = spawnArgs.JobID
Expand Down
14 changes: 13 additions & 1 deletion engine/hatchery/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,21 +250,33 @@ func getStatusHandler(h hatchery.Interface) service.HandlerFunc {
}
}

func (c *Common) GenerateWorkerArgs(h hatchery.Interface, spawnArgs hatchery.SpawnArguments) sdk.WorkerArgs {
func (c *Common) GenerateWorkerArgs(ctx context.Context, h hatchery.Interface, spawnArgs hatchery.SpawnArguments) sdk.WorkerArgs {
apiURL := h.Configuration().Provision.WorkerAPIHTTP.URL
httpInsecure := h.Configuration().Provision.WorkerAPIHTTP.Insecure
if apiURL == "" {
apiURL = h.Configuration().API.HTTP.URL
httpInsecure = h.Configuration().API.HTTP.Insecure
}

envvars := make(map[string]string, len(h.Configuration().Provision.InjectEnvVars))

for _, e := range h.Configuration().Provision.InjectEnvVars {
tuple := strings.SplitN(e, "=", 2)
if len(tuple) != 2 {
log.Error(ctx, "invalid env variable to inject: %q", e)
continue
}
envvars[tuple[0]] = tuple[1]
}

return sdk.WorkerArgs{
API: apiURL,
HTTPInsecure: httpInsecure,
Token: spawnArgs.WorkerToken,
Name: spawnArgs.WorkerName,
Model: spawnArgs.ModelName(),
HatcheryName: h.Name(),
InjectEnvVars: envvars,
GraylogHost: h.Configuration().Provision.WorkerLogsOptions.Graylog.Host,
GraylogPort: h.Configuration().Provision.WorkerLogsOptions.Graylog.Port,
GraylogExtraKey: h.Configuration().Provision.WorkerLogsOptions.Graylog.ExtraKey,
Expand Down
4 changes: 2 additions & 2 deletions engine/hatchery/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ func (h *HatcherySwarm) SpawnWorker(ctx context.Context, spawnArgs hatchery.Spaw
return errDockerOpts
}

udataParam := h.GenerateWorkerArgs(h, spawnArgs)
udataParam := h.GenerateWorkerArgs(ctx, h, spawnArgs)
udataParam.TTL = h.Config.WorkerTTL
udataParam.WorkflowJobID = spawnArgs.JobID

Expand All @@ -395,7 +395,7 @@ func (h *HatcherySwarm) SpawnWorker(ctx context.Context, spawnArgs hatchery.Spaw
modelEnvs[k] = v
}

envsWm := map[string]string{}
envsWm := udataParam.InjectEnvVars
envsWm["CDS_MODEL_MEMORY"] = fmt.Sprintf("%d", memory)
envsWm["CDS_API"] = udataParam.API
envsWm["CDS_TOKEN"] = udataParam.Token
Expand Down
6 changes: 5 additions & 1 deletion engine/hatchery/vsphere/spawn.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func (h *HatcheryVSphere) launchScriptWorker(ctx context.Context, name string, j
return errt
}

udataParam := h.GenerateWorkerArgs(h, hatchery.SpawnArguments{
udataParam := h.GenerateWorkerArgs(ctx, h, hatchery.SpawnArguments{
WorkerToken: token,
WorkerName: name,
Model: &model,
Expand All @@ -332,6 +332,10 @@ func (h *HatcheryVSphere) launchScriptWorker(ctx context.Context, name string, j
udataParam.FromWorkerImage = true
udataParam.WorkflowJobID = jobID

for k, v := range udataParam.InjectEnvVars {
env = append(env, k+"="+v)
}

var buffer bytes.Buffer
if err := tmpl.Execute(&buffer, udataParam); err != nil {
return err
Expand Down
15 changes: 8 additions & 7 deletions engine/service/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,14 @@ type HatcheryCommonConfiguration struct {
MaxHeartbeatFailures int `toml:"maxHeartbeatFailures" default:"10" comment:"Maximum allowed consecutives failures on heatbeat routine" json:"maxHeartbeatFailures"`
} `toml:"api" json:"api"`
Provision struct {
RatioService *int `toml:"ratioService" default:"50" commented:"true" comment:"Percent reserved for spawning worker with service requirement" json:"ratioService,omitempty" mapstructure:"ratioService"`
MaxWorker int `toml:"maxWorker" default:"10" comment:"Maximum allowed simultaneous workers" json:"maxWorker"`
MaxConcurrentProvisioning int `toml:"maxConcurrentProvisioning" default:"10" comment:"Maximum allowed simultaneous workers provisioning" json:"maxConcurrentProvisioning"`
MaxConcurrentRegistering int `toml:"maxConcurrentRegistering" default:"2" comment:"Maximum allowed simultaneous workers registering. -1 to disable registering on this hatchery" json:"maxConcurrentRegistering"`
RegisterFrequency int `toml:"registerFrequency" default:"60" comment:"Check if some worker model have to be registered each n Seconds" json:"registerFrequency"`
Region string `toml:"region" default:"" comment:"region of this hatchery - optional. With a free text as 'myregion', user can set a prerequisite 'region' with value 'myregion' on CDS Job" json:"region"`
IgnoreJobWithNoRegion bool `toml:"ignoreJobWithNoRegion" default:"false" comment:"Ignore job without a region prerequisite if ignoreJobWithNoRegion=true"`
InjectEnvVars []string `toml:"injectEnvVars" commented:"true" comment:"Inject env variables in workers" json:"-" mapstructure:"injectEnvVars"`
RatioService *int `toml:"ratioService" default:"50" commented:"true" comment:"Percent reserved for spawning worker with service requirement" json:"ratioService,omitempty" mapstructure:"ratioService"`
MaxWorker int `toml:"maxWorker" default:"10" comment:"Maximum allowed simultaneous workers" json:"maxWorker"`
MaxConcurrentProvisioning int `toml:"maxConcurrentProvisioning" default:"10" comment:"Maximum allowed simultaneous workers provisioning" json:"maxConcurrentProvisioning"`
MaxConcurrentRegistering int `toml:"maxConcurrentRegistering" default:"2" comment:"Maximum allowed simultaneous workers registering. -1 to disable registering on this hatchery" json:"maxConcurrentRegistering"`
RegisterFrequency int `toml:"registerFrequency" default:"60" comment:"Check if some worker model have to be registered each n Seconds" json:"registerFrequency"`
Region string `toml:"region" default:"" comment:"region of this hatchery - optional. With a free text as 'myregion', user can set a prerequisite 'region' with value 'myregion' on CDS Job" json:"region"`
IgnoreJobWithNoRegion bool `toml:"ignoreJobWithNoRegion" default:"false" comment:"Ignore job without a region prerequisite if ignoreJobWithNoRegion=true"`
WorkerAPIHTTP struct {
URL string `toml:"url" default:"http://localhost:8081" commented:"true" comment:"CDS API URL for worker, let empty or commented to use the same URL that is used by the Hatchery" json:"url"`
Insecure bool `toml:"insecure" default:"false" commented:"true" comment:"sslInsecureSkipVerify, set to true if you use a self-signed SSL on CDS API" json:"insecure"`
Expand Down
2 changes: 2 additions & 0 deletions sdk/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ type WorkerArgs struct {
GraylogExtraKey string `json:"graylog_extra_key"`
GraylogExtraValue string `json:"graylog_extra_value"`
WorkerBinary string
// Env variables
InjectEnvVars map[string]string `json:"inject_env_vars"`
}

// TemplateEnvs return envs interpolated with worker arguments
Expand Down

0 comments on commit 70909f2

Please sign in to comment.