diff --git a/docs/content/docs/tutorials/worker_model-vsphere.md b/docs/content/docs/tutorials/worker_model-vsphere.md index 31eba93fea..0aa3d52739 100644 --- a/docs/content/docs/tutorials/worker_model-vsphere.md +++ b/docs/content/docs/tutorials/worker_model-vsphere.md @@ -51,16 +51,13 @@ We will create a worker model called debian8-docker: #!/bin/bash set +e export CDS_FROM_WORKER_IMAGE={{.FromWorkerImage}} - export CDS_SINGLE_USE=1 export CDS_API={{.API}} export CDS_TOKEN={{.Token}} export CDS_NAME={{.Name}} export CDS_MODEL={{.Model}} export CDS_HATCHERY={{.Hatchery}} export CDS_HATCHERY_NAME={{.HatcheryName}} - export CDS_BOOKED_PB_JOB_ID={{.PipelineBuildJobID}} export CDS_BOOKED_WORKFLOW_JOB_ID={{.WorkflowJobID}} - export CDS_TTL={{.TTL}} export CDS_INSECURE={{.HTTPInsecure}} # Basic build binaries diff --git a/engine/api/project_integration_worker_hook_test.go b/engine/api/project_integration_worker_hook_test.go index e3830b56ca..d1119a3dfa 100644 --- a/engine/api/project_integration_worker_hook_test.go +++ b/engine/api/project_integration_worker_hook_test.go @@ -65,7 +65,6 @@ func TestAPI_post_getProjectIntegrationWorkerHookHandler(t *testing.T) { btes := w.Body.Bytes() var wh2 sdk.WorkerHookProjectIntegrationModel require.NoError(t, json.Unmarshal(btes, &wh2)) - t.Logf(">> wh2=%+v", wh2) uri = router.GetRoute("GET", api.getProjectIntegrationWorkerHookHandler, vars) req = assets.NewAuthentifiedRequest(t, u, pass, "GET", uri, nil) @@ -77,8 +76,6 @@ func TestAPI_post_getProjectIntegrationWorkerHookHandler(t *testing.T) { var wh3 sdk.WorkerHookProjectIntegrationModel require.NoError(t, json.Unmarshal(btes, &wh3)) - t.Logf(">> wh3=%+v", wh3) - wh = wh3 wh.Disable = true @@ -104,6 +101,4 @@ func TestAPI_post_getProjectIntegrationWorkerHookHandler(t *testing.T) { var wh5 sdk.WorkerHookProjectIntegrationModel require.NoError(t, json.Unmarshal(btes, &wh5)) require.True(t, wh5.Disable) - t.Logf(">> wh5=%+v", wh5) - } diff --git a/engine/api/workermodel/dao.go b/engine/api/workermodel/dao.go index 3172224b97..72551aad74 100644 --- a/engine/api/workermodel/dao.go +++ b/engine/api/workermodel/dao.go @@ -200,8 +200,6 @@ func Insert(ctx context.Context, db gorpmapper.SqlExecutorWithTx, model *sdk.Mod dbmodel.UserLastModified = time.Now() dbmodel.NeedRegistration = true - mergeModelEnvsWithDefaultEnvs(&dbmodel) - needSaveRegistryPassword, dockerRegistryPassword, err := replaceDockerRegistryPassword(db, &dbmodel) if err != nil { return err @@ -246,8 +244,6 @@ func UpdateDB(ctx context.Context, db gorpmapper.SqlExecutorWithTx, model *sdk.M return err } - mergeModelEnvsWithDefaultEnvs(&dbmodel) - needSaveRegistryPassword, dockerRegistryPassword, err := replaceDockerRegistryPassword(db, &dbmodel) if err != nil { return err diff --git a/engine/api/workermodel/dao_test.go b/engine/api/workermodel/dao_test.go index 70104ca2f7..3fc1d992e6 100644 --- a/engine/api/workermodel/dao_test.go +++ b/engine/api/workermodel/dao_test.go @@ -135,29 +135,6 @@ func TestInsert(t *testing.T) { assert.EqualValues(t, *src, *res) } -func TestMergeModelEnvsWithDefaultEnvs(t *testing.T) { - db, _ := test.SetupPG(t, bootstrap.InitiliazeDB) - - g := assets.InsertGroup(t, db) - - m := sdk.Model{ - Name: sdk.RandomString(10), - Type: sdk.Docker, - ModelDocker: sdk.ModelDocker{ - Image: "foo/bar:3.4", - }, - GroupID: g.ID, - } - require.NoError(t, workermodel.Insert(context.TODO(), db, &m)) - require.Len(t, m.ModelDocker.Envs, 6, "all default vars should be added by insert") - - m.ModelDocker.Envs = map[string]string{ - "myvar": "myvalue", - } - require.NoError(t, workermodel.UpdateDB(context.TODO(), db, &m)) - require.Len(t, m.ModelDocker.Envs, 7, "all default vars should be merged to given vars by update") -} - func TestLoadByNameAndGroupID(t *testing.T) { db, _ := test.SetupPG(t, bootstrap.InitiliazeDB) diff --git a/engine/api/workermodel/init.go b/engine/api/workermodel/init.go index 27df11d2eb..0cd7d587f1 100644 --- a/engine/api/workermodel/init.go +++ b/engine/api/workermodel/init.go @@ -20,25 +20,12 @@ func insertFirstPatterns(db gorp.SqlExecutor) error { preCmdOs := `#!/bin/bash set +e export CDS_FROM_WORKER_IMAGE={{.FromWorkerImage}} -export CDS_SINGLE_USE=1 export CDS_API={{.API}} -export CDS_TOKEN={{.Token}} -export CDS_NAME={{.Name}} -export CDS_MODEL={{.Model}} -export CDS_HATCHERY_NAME={{.HatcheryName}} -export CDS_BOOKED_WORKFLOW_JOB_ID={{.WorkflowJobID}} -export CDS_TTL={{.TTL}} -export CDS_INSECURE={{.HTTPInsecure}} -export CDS_GRAYLOG_HOST={{.GraylogHost}} -export CDS_GRAYLOG_PORT={{.GraylogPort}} -export CDS_GRAYLOG_EXTRA_KEY={{.GraylogExtraKey}} -export CDS_GRAYLOG_EXTRA_VALUE={{.GraylogExtraValue}} # Basic build binaries cd $HOME apt-get -y --force-yes update >> /tmp/user_data 2>&1 -apt-get -y --force-yes install curl git >> /tmp/user_data 2>&1 -apt-get -y --force-yes install binutils >> /tmp/user_data 2>&1 +apt-get -y --force-yes install curl git binutils >> /tmp/user_data 2>&1 # Docker installation (FOR DEBIAN) if [[ "x{{.FromWorkerImage}}" = "xtrue" ]]; then @@ -94,7 +81,7 @@ chmod +x worker Type: sdk.HostProcess, Name: "basic_unix", Model: sdk.ModelCmds{ - Cmd: "worker --api={{.API}} --token={{.Token}} --basedir={{.BaseDir}} --model={{.Model}} --name={{.Name}} --hatchery-name={{.HatcheryName}} --insecure={{.HTTPInsecure}} --graylog-extra-key={{.GraylogExtraKey}} --graylog-extra-value={{.GraylogExtraValue}} --graylog-host={{.GraylogHost}} --graylog-port={{.GraylogPort}} --booked-workflow-job-id={{.WorkflowJobID}} --single-use --force-exit", + Cmd: "worker --config={{.Config}}", }, }, } @@ -108,6 +95,9 @@ chmod +x worker return sdk.WrapError(err, "cannot load worker_model_pattern for type %s", pattern.Type) } if numPattern > 0 { + if err := UpdatePattern(db, &pattern); err != nil { + return sdk.WrapError(err, "cannot update basic model %s", pattern.Type) + } continue } if err := InsertPattern(db, &pattern); err != nil { diff --git a/engine/api/workermodel/utils.go b/engine/api/workermodel/utils.go index 85f3b89c55..f2d0b858c0 100644 --- a/engine/api/workermodel/utils.go +++ b/engine/api/workermodel/utils.go @@ -9,30 +9,6 @@ import ( "github.com/ovh/cds/sdk" ) -var defaultEnvs = map[string]string{ - "CDS_SINGLE_USE": "1", - "CDS_TTL": "{{.TTL}}", - "CDS_GRAYLOG_HOST": "{{.GraylogHost}}", - "CDS_GRAYLOG_PORT": "{{.GraylogPort}}", - "CDS_GRAYLOG_EXTRA_KEY": "{{.GraylogExtraKey}}", - "CDS_GRAYLOG_EXTRA_VALUE": "{{.GraylogExtraValue}}", -} - -func mergeModelEnvsWithDefaultEnvs(m *workerModel) { - if m.Type != sdk.Docker { - return - } - - if m.ModelDocker.Envs == nil { - m.ModelDocker.Envs = make(map[string]string) - } - for envName := range defaultEnvs { - if _, ok := m.ModelDocker.Envs[envName]; !ok { - m.ModelDocker.Envs[envName] = defaultEnvs[envName] - } - } -} - const registryPasswordSecretName = "secrets.registry_password" const vpsherePasswordSecretName = "secrets.vsphere_password" diff --git a/engine/hatchery/kubernetes/kubernetes.go b/engine/hatchery/kubernetes/kubernetes.go index 326ca09c7b..6e241ed1f8 100644 --- a/engine/hatchery/kubernetes/kubernetes.go +++ b/engine/hatchery/kubernetes/kubernetes.go @@ -193,10 +193,12 @@ func (h *HatcheryKubernetes) SpawnWorker(ctx context.Context, spawnArgs hatchery } } - udataParam := h.GenerateWorkerArgs(ctx, h, spawnArgs) - udataParam.TTL = h.Config.WorkerTTL - - udataParam.WorkflowJobID = spawnArgs.JobID + workerConfig := h.GenerateWorkerConfig(ctx, h, spawnArgs) + udataParam := struct { + API string + }{ + API: workerConfig.APIEndpoint, + } tmpl, errt := template.New("cmd").Parse(spawnArgs.Model.ModelDocker.Cmd) if errt != nil { @@ -216,26 +218,12 @@ func (h *HatcheryKubernetes) SpawnWorker(ctx context.Context, spawnArgs hatchery if spawnArgs.Model.ModelDocker.Envs == nil { spawnArgs.Model.ModelDocker.Envs = map[string]string{} } - envsWm := udataParam.InjectEnvVars + envsWm := workerConfig.InjectEnvVars envsWm["CDS_MODEL_MEMORY"] = fmt.Sprintf("%d", memory) - envsWm["CDS_API"] = udataParam.API - envsWm["CDS_TOKEN"] = udataParam.Token - envsWm["CDS_NAME"] = udataParam.Name - envsWm["CDS_MODEL_PATH"] = udataParam.Model - envsWm["CDS_HATCHERY_NAME"] = udataParam.HatcheryName - envsWm["CDS_FROM_WORKER_IMAGE"] = fmt.Sprintf("%v", udataParam.FromWorkerImage) - envsWm["CDS_INSECURE"] = fmt.Sprintf("%v", udataParam.HTTPInsecure) - - if spawnArgs.JobID > 0 { - envsWm["CDS_BOOKED_WORKFLOW_JOB_ID"] = fmt.Sprintf("%d", spawnArgs.JobID) - } - - envTemplated, errEnv := sdk.TemplateEnvs(udataParam, spawnArgs.Model.ModelDocker.Envs) - if errEnv != nil { - return errEnv - } + envsWm["CDS_FROM_WORKER_IMAGE"] = "true" + envsWm["CDS_CONFIG"] = workerConfig.EncodeBase64() - for envName, envValue := range envTemplated { + for envName, envValue := range spawnArgs.Model.ModelDocker.Envs { envsWm[envName] = envValue } @@ -393,7 +381,7 @@ func (h *HatcheryKubernetes) NeedRegistration(_ context.Context, m *sdk.Model) b } func (h *HatcheryKubernetes) routines(ctx context.Context) { - ticker := time.NewTicker(10 * time.Second) + ticker := time.NewTicker(10 * time.Minute) defer ticker.Stop() for { diff --git a/engine/hatchery/kubernetes/types.go b/engine/hatchery/kubernetes/types.go index ee9c92b47d..9bc1cff170 100644 --- a/engine/hatchery/kubernetes/types.go +++ b/engine/hatchery/kubernetes/types.go @@ -20,8 +20,6 @@ var containerServiceNameRegexp = regexp.MustCompile(`service-([0-9]+)-(.*)`) // HatcheryConfiguration is the configuration for local hatchery type HatcheryConfiguration struct { service.HatcheryCommonConfiguration `mapstructure:"commonConfiguration" toml:"commonConfiguration" json:"commonConfiguration"` - // WorkerTTL Worker TTL (minutes) - WorkerTTL int `mapstructure:"workerTTL" toml:"workerTTL" default:"10" commented:"false" comment:"Worker TTL (minutes)" json:"workerTTL"` // DefaultMemory Worker default memory DefaultMemory int `mapstructure:"defaultMemory" toml:"defaultMemory" default:"1024" commented:"false" comment:"Worker default memory in Mo" json:"defaultMemory"` // Namespace is the kubernetes namespace in which workers are spawned" diff --git a/engine/hatchery/local/logger.go b/engine/hatchery/local/logger.go new file mode 100644 index 0000000000..c2cf2f6247 --- /dev/null +++ b/engine/hatchery/local/logger.go @@ -0,0 +1,98 @@ +package local + +import ( + "bufio" + "context" + "fmt" + "os/exec" + "strings" + "time" + + "github.com/rockbears/log" +) + +type Logger interface { + Logf(fmt string, values ...interface{}) + Errorf(fmt string, values ...interface{}) + Fatalf(fmt string, values ...interface{}) +} + +func (h *HatcheryLocal) startCmd(name string, cmd *exec.Cmd, logger Logger) error { + stdout, err := cmd.StdoutPipe() + if err != nil { + return fmt.Errorf("Failure due to internal error: unable to capture stdout: %v", err) + } + + stderr, err := cmd.StderrPipe() + if err != nil { + return fmt.Errorf("Failure due to internal error: unable to capture stderr: %v", err) + } + + stdoutreader := bufio.NewReader(stdout) + stderrreader := bufio.NewReader(stderr) + + outchan := make(chan bool) + go func() { + for { + line, err := stdoutreader.ReadString('\n') + if line != "" { + logger.Logf(line) + } + if err != nil { + stdout.Close() + close(outchan) + return + } + } + }() + + errchan := make(chan bool) + go func() { + for { + line, err := stderrreader.ReadString('\n') + if line != "" { + logger.Logf(line) + } + if err != nil { + stderr.Close() + close(errchan) + return + } + } + }() + + if err := cmd.Start(); err != nil { + return fmt.Errorf("unable to start command: %v", err) + } + + h.Lock() + h.workers[name] = workerCmd{cmd: cmd, created: time.Now()} + h.Unlock() + + <-outchan + <-errchan + if err := cmd.Wait(); err != nil { + return fmt.Errorf("command failure: %v", err) + } + + return nil +} + +type localWorkerLogger struct { + name string +} + +func (l localWorkerLogger) Logf(fmt string, values ...interface{}) { + fmt = strings.TrimSuffix(fmt, "\n") + log.Info(context.Background(), "hatchery> local> worker> %s> "+fmt, l.name) +} + +func (l localWorkerLogger) Errorf(fmt string, values ...interface{}) { + fmt = strings.TrimSuffix(fmt, "\n") + log.Error(context.Background(), "hatchery> local> worker> %s> "+fmt, l.name) +} + +func (l localWorkerLogger) Fatalf(fmt string, values ...interface{}) { + fmt = strings.TrimSuffix(fmt, "\n") + log.Fatal(context.TODO(), "hatchery> local> worker> %s> "+fmt, l.name) +} diff --git a/engine/hatchery/local/worker_spawn.go b/engine/hatchery/local/worker_spawn.go index 97f7b865e4..3905ae846e 100644 --- a/engine/hatchery/local/worker_spawn.go +++ b/engine/hatchery/local/worker_spawn.go @@ -1,8 +1,6 @@ package local import ( - "bufio" - "bytes" "context" "crypto/rand" "encoding/hex" @@ -11,8 +9,6 @@ import ( "os/exec" "path" "strings" - "text/template" - "time" "github.com/rockbears/log" @@ -28,27 +24,6 @@ func (localWorkerRunner) NewCmd(ctx context.Context, command string, args ...str return cmd } -type localWorkerLogger struct { - name string -} - -func (l localWorkerLogger) Logf(fmt string, values ...interface{}) { - fmt = strings.TrimSuffix(fmt, "\n") - log.Info(context.Background(), "hatchery> local> worker> %s> "+fmt, l.name) -} - -func (l localWorkerLogger) Errorf(fmt string, values ...interface{}) { - fmt = strings.TrimSuffix(fmt, "\n") - log.Error(context.Background(), "hatchery> local> worker> %s> "+fmt, l.name) -} - -func (l localWorkerLogger) Fatalf(fmt string, values ...interface{}) { - fmt = strings.TrimSuffix(fmt, "\n") - log.Fatal(context.TODO(), "hatchery> local> worker> %s> "+fmt, l.name) -} - -const workerCmdTmpl = "{{.WorkerBinary}} --api={{.API}} --token={{.Token}} --log-level=debug --basedir={{.BaseDir}} --name={{.Name}} --hatchery-name={{.HatcheryName}} --insecure={{.HTTPInsecure}} --graylog-extra-key={{.GraylogExtraKey}} --graylog-extra-value={{.GraylogExtraValue}} --graylog-host={{.GraylogHost}} --graylog-port={{.GraylogPort}} --booked-workflow-job-id={{.WorkflowJobID}}" - // SpawnWorker starts a new worker process func (h *HatcheryLocal) SpawnWorker(ctx context.Context, spawnArgs hatchery.SpawnArguments) error { log.Debug(ctx, "HatcheryLocal.SpawnWorker> %s want to spawn a worker named %s (jobID = %d)", spawnArgs.HatcheryName, spawnArgs.WorkerName, spawnArgs.JobID) @@ -71,36 +46,19 @@ func (h *HatcheryLocal) SpawnWorker(ctx context.Context, spawnArgs hatchery.Spaw log.Info(ctx, "HatcheryLocal.SpawnWorker> basedir: %s", basedir) - udataParam := h.GenerateWorkerArgs(ctx, h, spawnArgs) - udataParam.BaseDir = basedir - udataParam.WorkerBinary = path.Join(h.BasedirDedicated, h.getWorkerBinaryName()) - udataParam.WorkflowJobID = spawnArgs.JobID - - tmpl, errt := template.New("cmd").Parse(workerCmdTmpl) - if errt != nil { - return errt - } - var buffer bytes.Buffer - if errTmpl := tmpl.Execute(&buffer, udataParam); errTmpl != nil { - return errTmpl - } - - cmdSplitted := strings.Split(buffer.String(), " -") - for i := range cmdSplitted[1:] { - cmdSplitted[i+1] = "-" + strings.Trim(cmdSplitted[i+1], " ") - } + workerBinary := path.Join(h.BasedirDedicated, h.getWorkerBinaryName()) + workerConfig := h.GenerateWorkerConfig(ctx, h, spawnArgs) + workerConfig.Basedir = basedir // Prefix the command with the directory where the worker binary has been downloaded - log.Debug(ctx, "Command exec: %v", cmdSplitted) + log.Debug(ctx, "Command exec: %v", workerBinary) var cmd *exec.Cmd if spawnArgs.RegisterOnly { - cmdSplitted[0] = "register" - cmd = h.LocalWorkerRunner.NewCmd(context.Background(), cmdSplitted[0], cmdSplitted...) + cmd = h.LocalWorkerRunner.NewCmd(context.Background(), workerBinary, "register", "--config", workerConfig.EncodeBase64()) } else { - cmd = h.LocalWorkerRunner.NewCmd(context.Background(), cmdSplitted[0], cmdSplitted[1:]...) + cmd = h.LocalWorkerRunner.NewCmd(context.Background(), workerBinary, "--config", workerConfig.EncodeBase64()) } - - cmd.Dir = udataParam.BaseDir + cmd.Dir = basedir // Clearenv env := os.Environ() @@ -109,9 +67,6 @@ func (h *HatcheryLocal) SpawnWorker(ctx context.Context, spawnArgs hatchery.Spaw cmd.Env = append(cmd.Env, e) } } - for k, v := range udataParam.InjectEnvVars { - cmd.Env = append(cmd.Env, k+"="+v) - } // Wait in a goroutine so that when process exits, Wait() update cmd.ProcessState go func() { @@ -123,70 +78,3 @@ func (h *HatcheryLocal) SpawnWorker(ctx context.Context, spawnArgs hatchery.Spaw return nil } - -type Logger interface { - Logf(fmt string, values ...interface{}) - Errorf(fmt string, values ...interface{}) - Fatalf(fmt string, values ...interface{}) -} - -func (h *HatcheryLocal) startCmd(name string, cmd *exec.Cmd, logger Logger) error { - stdout, err := cmd.StdoutPipe() - if err != nil { - return fmt.Errorf("Failure due to internal error: unable to capture stdout: %v", err) - } - - stderr, err := cmd.StderrPipe() - if err != nil { - return fmt.Errorf("Failure due to internal error: unable to capture stderr: %v", err) - } - - stdoutreader := bufio.NewReader(stdout) - stderrreader := bufio.NewReader(stderr) - - outchan := make(chan bool) - go func() { - for { - line, err := stdoutreader.ReadString('\n') - if line != "" { - logger.Logf(line) - } - if err != nil { - stdout.Close() - close(outchan) - return - } - } - }() - - errchan := make(chan bool) - go func() { - for { - line, err := stderrreader.ReadString('\n') - if line != "" { - logger.Logf(line) - } - if err != nil { - stderr.Close() - close(errchan) - return - } - } - }() - - if err := cmd.Start(); err != nil { - return fmt.Errorf("unable to start command: %v", err) - } - - h.Lock() - h.workers[name] = workerCmd{cmd: cmd, created: time.Now()} - h.Unlock() - - <-outchan - <-errchan - if err := cmd.Wait(); err != nil { - return fmt.Errorf("command failure: %v", err) - } - - return nil -} diff --git a/engine/hatchery/marathon/marathon.go b/engine/hatchery/marathon/marathon.go index b309c081ca..5bbc1e8a5e 100644 --- a/engine/hatchery/marathon/marathon.go +++ b/engine/hatchery/marathon/marathon.go @@ -236,15 +236,18 @@ func (h *HatcheryMarathon) SpawnWorker(ctx context.Context, spawnArgs hatchery.S instance := 1 forcePull := strings.HasSuffix(spawnArgs.Model.ModelDocker.Image, ":latest") + workerConfig := h.GenerateWorkerConfig(ctx, h, spawnArgs) - udataParam := h.GenerateWorkerArgs(ctx, h, spawnArgs) - udataParam.TTL = h.Config.WorkerTTL - udataParam.WorkflowJobID = spawnArgs.JobID - + // Prepare worker startup command tmpl, errt := template.New("cmd").Parse(spawnArgs.Model.ModelDocker.Cmd) if errt != nil { return errt } + udataParam := struct { + API string + }{ + API: workerConfig.APIEndpoint, + } var buffer bytes.Buffer if errTmpl := tmpl.Execute(&buffer, udataParam); errTmpl != nil { return errTmpl @@ -279,26 +282,11 @@ func (h *HatcheryMarathon) SpawnWorker(ctx context.Context, spawnArgs hatchery.S spawnArgs.Model.ModelDocker.Envs = map[string]string{} } - envsWm := udataParam.InjectEnvVars + envsWm := workerConfig.InjectEnvVars envsWm["CDS_MODEL_MEMORY"] = fmt.Sprintf("%d", memory) - envsWm["CDS_API"] = udataParam.API - envsWm["CDS_TOKEN"] = udataParam.Token - envsWm["CDS_NAME"] = udataParam.Name - envsWm["CDS_MODEL_PATH"] = udataParam.Model - envsWm["CDS_HATCHERY_NAME"] = udataParam.HatcheryName - envsWm["CDS_FROM_WORKER_IMAGE"] = fmt.Sprintf("%v", udataParam.FromWorkerImage) - envsWm["CDS_INSECURE"] = fmt.Sprintf("%v", udataParam.HTTPInsecure) - - if spawnArgs.JobID > 0 { - envsWm["CDS_BOOKED_WORKFLOW_JOB_ID"] = fmt.Sprintf("%d", spawnArgs.JobID) - } - - envTemplated, errEnv := sdk.TemplateEnvs(udataParam, spawnArgs.Model.ModelDocker.Envs) - if errEnv != nil { - return errEnv - } + envsWm["CDS_CONFIG"] = workerConfig.EncodeBase64() - for envName, envValue := range envTemplated { + for envName, envValue := range spawnArgs.Model.ModelDocker.Envs { envsWm[envName] = envValue } diff --git a/engine/hatchery/marathon/marathon_test.go b/engine/hatchery/marathon/marathon_test.go index f287deb386..09b0a98f7d 100644 --- a/engine/hatchery/marathon/marathon_test.go +++ b/engine/hatchery/marathon/marathon_test.go @@ -300,8 +300,7 @@ func TestSpawnWorkerTimeout(t *testing.T) { assert.Equal(t, "BRIDGE", a.Container.Docker.Network) assert.Equal(t, float64(1), a.CPUs) assert.Equal(t, 1, *a.Instances) - assert.Equal(t, "1", (*a.Env)["CDS_BOOKED_WORKFLOW_JOB_ID"]) - assert.Equal(t, "GroupModel/fake", (*a.Env)["CDS_MODEL_PATH"]) + assert.NotEmpty(t, (*a.Env)["CDS_CONFIG"]) createAppResult.ID = a.ID createAppResult.Env = a.Env diff --git a/engine/hatchery/marathon/types.go b/engine/hatchery/marathon/types.go index 52ec1f5b2f..18cea35777 100644 --- a/engine/hatchery/marathon/types.go +++ b/engine/hatchery/marathon/types.go @@ -32,9 +32,6 @@ type HatcheryConfiguration struct { // DefaultMemory Worker default memory DefaultMemory int `mapstructure:"defaultMemory" toml:"defaultMemory" default:"1024" commented:"false" comment:"Worker default memory in Mo" json:"defaultMemory"` - // WorkerTTL Worker TTL (minutes) - WorkerTTL int `mapstructure:"workerTTL" toml:"workerTTL" default:"10" commented:"false" comment:"Worker TTL (minutes)" json:"workerTTL"` - // WorkerSpawnTimeout Worker Timeout Spawning (seconds) WorkerSpawnTimeout int `mapstructure:"workerSpawnTimeout" toml:"workerSpawnTimeout" default:"120" commented:"false" comment:"Worker Timeout Spawning (seconds)" json:"workerSpawnTimeout"` diff --git a/engine/hatchery/openstack/spawn.go b/engine/hatchery/openstack/spawn.go index b0b28ab23d..c7bf21386c 100644 --- a/engine/hatchery/openstack/spawn.go +++ b/engine/hatchery/openstack/spawn.go @@ -65,22 +65,30 @@ func (h *HatcheryOpenstack) SpawnWorker(ctx context.Context, spawnArgs hatchery. } } } + workerConfig := h.GenerateWorkerConfig(ctx, h, spawnArgs) if spawnArgs.RegisterOnly { - spawnArgs.Model.ModelVirtualMachine.Cmd += " register" + spawnArgs.Model.ModelVirtualMachine.Cmd += fmt.Sprintf(" --config %s register", workerConfig.EncodeBase64()) + } else { + spawnArgs.Model.ModelVirtualMachine.Cmd += fmt.Sprintf(" --config %s", workerConfig.EncodeBase64()) } udata := spawnArgs.Model.ModelVirtualMachine.PreCmd + "\n" + spawnArgs.Model.ModelVirtualMachine.Cmd + "\n" + spawnArgs.Model.ModelVirtualMachine.PostCmd - tmpl, err := template.New("udata").Parse(udata) if err != nil { return err } - udataParam := h.GenerateWorkerArgs(ctx, h, spawnArgs) - udataParam.TTL = h.Config.WorkerTTL - udataParam.FromWorkerImage = withExistingImage - udataParam.WorkflowJobID = spawnArgs.JobID + //workerConfig.Basedir = + udataParam := struct { + API string + FromWorkerImage bool + Config string + }{ + API: workerConfig.APIEndpoint, + FromWorkerImage: withExistingImage, + Config: workerConfig.EncodeBase64(), + } var buffer bytes.Buffer if err := tmpl.Execute(&buffer, udataParam); err != nil { diff --git a/engine/hatchery/openstack/types.go b/engine/hatchery/openstack/types.go index c64d3e3d28..2d5a2039e9 100644 --- a/engine/hatchery/openstack/types.go +++ b/engine/hatchery/openstack/types.go @@ -39,9 +39,6 @@ type HatcheryConfiguration struct { // IPRange IP Range IPRange string `mapstructure:"iprange" toml:"iprange" default:"" commented:"false" comment:"Facultative. IP Range for spawned workers. \n Format: a.a.a.a/b,c.c.c.c/e \n Hatchery will use an IP from this range to create Virtual Machine (Fixed IP Attribute).\nIf not set, it will get an address from the neutron service" json:"iprange,omitempty"` - // WorkerTTL Worker TTL (minutes) - WorkerTTL int `mapstructure:"workerTTL" toml:"workerTTL" default:"30" commented:"false" comment:"Worker TTL (minutes)" json:"workerTTL"` - // DisableCreateImage if true: hatchery does not create openstack image when a worker model is updated DisableCreateImage bool `mapstructure:"disableCreateImage" toml:"disableCreateImage" default:"false" commented:"false" comment:"if true: hatchery does not create openstack image when a worker model is updated" json:"disableCreateImage"` diff --git a/engine/hatchery/serve.go b/engine/hatchery/serve.go index 689f7f529e..50c7b4e34b 100644 --- a/engine/hatchery/serve.go +++ b/engine/hatchery/serve.go @@ -8,6 +8,7 @@ import ( "net" "net/http" "net/http/pprof" + "strconv" "strings" "sync" "time" @@ -17,6 +18,7 @@ import ( "github.com/ovh/cds/engine/api" "github.com/ovh/cds/engine/service" + "github.com/ovh/cds/engine/worker/pkg/workerruntime" "github.com/ovh/cds/sdk" "github.com/ovh/cds/sdk/cdsclient" "github.com/ovh/cds/sdk/hatchery" @@ -250,7 +252,7 @@ func getStatusHandler(h hatchery.Interface) service.HandlerFunc { } } -func (c *Common) GenerateWorkerArgs(ctx context.Context, h hatchery.Interface, spawnArgs hatchery.SpawnArguments) sdk.WorkerArgs { +func (c *Common) GenerateWorkerConfig(ctx context.Context, h hatchery.Interface, spawnArgs hatchery.SpawnArguments) workerruntime.WorkerConfig { apiURL := h.Configuration().Provision.WorkerAPIHTTP.URL httpInsecure := h.Configuration().Provision.WorkerAPIHTTP.Insecure if apiURL == "" { @@ -269,17 +271,25 @@ func (c *Common) GenerateWorkerArgs(ctx context.Context, h hatchery.Interface, s envvars[tuple[0]] = tuple[1] } - return sdk.WorkerArgs{ - API: apiURL, - HTTPInsecure: httpInsecure, - Token: spawnArgs.WorkerToken, - Name: spawnArgs.WorkerName, - Model: spawnArgs.ModelName(), - HatcheryName: h.Name(), - InjectEnvVars: envvars, - GraylogHost: h.Configuration().Provision.WorkerLogsOptions.Graylog.Host, - GraylogPort: h.Configuration().Provision.WorkerLogsOptions.Graylog.Port, - GraylogExtraKey: h.Configuration().Provision.WorkerLogsOptions.Graylog.ExtraKey, - GraylogExtraValue: h.Configuration().Provision.WorkerLogsOptions.Graylog.ExtraValue, + cfg := workerruntime.WorkerConfig{ + Name: spawnArgs.WorkerName, + BookedJobID: spawnArgs.JobID, + HatcheryName: h.Name(), + Model: spawnArgs.ModelName(), + APIToken: spawnArgs.WorkerToken, + APIEndpoint: apiURL, + APIEndpointInsecure: httpInsecure, + InjectEnvVars: envvars, + Region: h.Configuration().Provision.Region, + Log: cdslog.Conf{ + GraylogHost: h.Configuration().Provision.WorkerLogsOptions.Graylog.Host, + GraylogPort: strconv.Itoa(h.Configuration().Provision.WorkerLogsOptions.Graylog.Port), + GraylogExtraKey: h.Configuration().Provision.WorkerLogsOptions.Graylog.ExtraKey, + GraylogExtraValue: h.Configuration().Provision.WorkerLogsOptions.Graylog.ExtraValue, + Level: h.Configuration().Provision.WorkerLogsOptions.Level, + }, } + + log.Debug(ctx, "worker config: %v", cfg.EncodeBase64()) + return cfg } diff --git a/engine/hatchery/swarm/swarm.go b/engine/hatchery/swarm/swarm.go index f2d597c4ab..61f6dd37e7 100644 --- a/engine/hatchery/swarm/swarm.go +++ b/engine/hatchery/swarm/swarm.go @@ -375,9 +375,12 @@ func (h *HatcherySwarm) SpawnWorker(ctx context.Context, spawnArgs hatchery.Spaw return errDockerOpts } - udataParam := h.GenerateWorkerArgs(ctx, h, spawnArgs) - udataParam.TTL = h.Config.WorkerTTL - udataParam.WorkflowJobID = spawnArgs.JobID + workerConfig := h.GenerateWorkerConfig(ctx, h, spawnArgs) + udataParam := struct { + API string + }{ + API: workerConfig.APIEndpoint, + } tmpl, errt := template.New("cmd").Parse(spawnArgs.Model.ModelDocker.Cmd) if errt != nil { @@ -396,26 +399,11 @@ func (h *HatcherySwarm) SpawnWorker(ctx context.Context, spawnArgs hatchery.Spaw modelEnvs[k] = v } - envsWm := udataParam.InjectEnvVars + envsWm := workerConfig.InjectEnvVars envsWm["CDS_MODEL_MEMORY"] = fmt.Sprintf("%d", memory) - envsWm["CDS_API"] = udataParam.API - envsWm["CDS_TOKEN"] = udataParam.Token - envsWm["CDS_NAME"] = udataParam.Name - envsWm["CDS_MODEL_PATH"] = udataParam.Model - envsWm["CDS_HATCHERY_NAME"] = udataParam.HatcheryName - envsWm["CDS_FROM_WORKER_IMAGE"] = fmt.Sprintf("%v", udataParam.FromWorkerImage) - envsWm["CDS_INSECURE"] = fmt.Sprintf("%v", udataParam.HTTPInsecure) - - if spawnArgs.JobID > 0 { - envsWm["CDS_BOOKED_WORKFLOW_JOB_ID"] = fmt.Sprintf("%d", spawnArgs.JobID) - } - - envTemplated, errEnv := sdk.TemplateEnvs(udataParam, modelEnvs) - if errEnv != nil { - return errEnv - } + envsWm["CDS_CONFIG"] = workerConfig.EncodeBase64() - for envName, envValue := range envTemplated { + for envName, envValue := range modelEnvs { envsWm[envName] = envValue } diff --git a/engine/hatchery/swarm/swarm_conf.go b/engine/hatchery/swarm/swarm_conf.go index aee48d6c88..ec194907cd 100644 --- a/engine/hatchery/swarm/swarm_conf.go +++ b/engine/hatchery/swarm/swarm_conf.go @@ -112,9 +112,6 @@ func (h *HatcherySwarm) CheckConfiguration(cfg interface{}) error { return fmt.Errorf("Invalid hatchery swarm configuration: %v", err) } - if hconfig.WorkerTTL <= 0 { - return fmt.Errorf("worker-ttl must be > 0") - } if hconfig.DefaultMemory <= 1 { return fmt.Errorf("worker-memory must be > 1") } diff --git a/engine/hatchery/swarm/types.go b/engine/hatchery/swarm/types.go index a69274d0e3..7b3e270a2e 100644 --- a/engine/hatchery/swarm/types.go +++ b/engine/hatchery/swarm/types.go @@ -18,9 +18,6 @@ type HatcheryConfiguration struct { DefaultMemory int `mapstructure:"defaultMemory" toml:"defaultMemory" default:"1024" commented:"false" comment:"Worker default memory in Mo" json:"defaultMemory"` DisableMemorySwap bool `mapstructure:"disableMemorySwap" toml:"disableMemorySwap" default:"false" commented:"true" comment:"Set to true to disable memory swap" json:"disableMemorySwap"` - // WorkerTTL Worker TTL (minutes) - WorkerTTL int `mapstructure:"workerTTL" toml:"workerTTL" default:"10" commented:"false" comment:"Worker TTL (minutes)" json:"workerTTL"` - // DockerOpts Docker options DockerOpts string `mapstructure:"dockerOpts" toml:"dockerOpts" default:"" commented:"true" comment:"Docker Options. --add-host and --privileged supported. Example: dockerOpts=\"--add-host=myhost:x.x.x.x,myhost2:y.y.y.y --privileged\"" json:"dockerOpts,omitempty"` diff --git a/engine/hatchery/vsphere/spawn.go b/engine/hatchery/vsphere/spawn.go index 77c19c44a7..e33ae2cc1e 100644 --- a/engine/hatchery/vsphere/spawn.go +++ b/engine/hatchery/vsphere/spawn.go @@ -306,12 +306,16 @@ func (h *HatcheryVSphere) launchScriptWorker(ctx context.Context, name string, j return err } - env := []string{ - "CDS_FROM_WORKER_IMAGE=true", - } + workerConfig := h.GenerateWorkerConfig(ctx, h, hatchery.SpawnArguments{ + WorkerToken: token, + WorkerName: name, + Model: &model, + }) + + env := []string{} env = append(env, h.getGraylogEnv(model)...) - udata := model.ModelVirtualMachine.PreCmd + "\n" + model.ModelVirtualMachine.Cmd + udata := model.ModelVirtualMachine.PreCmd + "\n" + "CDS_CONFIG=" + workerConfig.EncodeBase64() + " " + model.ModelVirtualMachine.Cmd if registerOnly { udata += " register" @@ -323,19 +327,17 @@ func (h *HatcheryVSphere) launchScriptWorker(ctx context.Context, name string, j return errt } - udataParam := h.GenerateWorkerArgs(ctx, h, hatchery.SpawnArguments{ - WorkerToken: token, - WorkerName: name, - Model: &model, - }) - udataParam.TTL = h.Config.WorkerTTL - udataParam.FromWorkerImage = true - udataParam.WorkflowJobID = jobID - - for k, v := range udataParam.InjectEnvVars { + for k, v := range workerConfig.InjectEnvVars { env = append(env, k+"="+v) } + udataParam := struct { + API string + FromWorkerImage bool + }{ + API: workerConfig.APIEndpoint, + FromWorkerImage: true, + } var buffer bytes.Buffer if err := tmpl.Execute(&buffer, udataParam); err != nil { return err diff --git a/engine/hatchery/vsphere/spawn_test.go b/engine/hatchery/vsphere/spawn_test.go index 8eaf29acc7..2dedd346c1 100644 --- a/engine/hatchery/vsphere/spawn_test.go +++ b/engine/hatchery/vsphere/spawn_test.go @@ -242,7 +242,9 @@ func TestHatcheryVSphere_launchScriptWorker(t *testing.T) { c.EXPECT().StartProgramInGuest(gomock.Any(), &procman, gomock.Any()).DoAndReturn( func(ctx context.Context, procman *guest.ProcessManager, req *types.StartProgramInGuest) (int64, error) { assert.Equal(t, "/bin/echo", req.Spec.GetGuestProgramSpec().ProgramPath) - assert.Equal(t, "-n ;\n./worker register\nshutdown -h now", req.Spec.GetGuestProgramSpec().Arguments) + assert.Contains(t, req.Spec.GetGuestProgramSpec().Arguments, "-n ;\n") + assert.Contains(t, req.Spec.GetGuestProgramSpec().Arguments, "./worker register\nshutdown -h now") + assert.Contains(t, req.Spec.GetGuestProgramSpec().Arguments, "CDS_CONFIG=") return 1, nil }, ) @@ -400,7 +402,9 @@ func TestHatcheryVSphere_SpawnWorker(t *testing.T) { c.EXPECT().StartProgramInGuest(gomock.Any(), &procman, gomock.Any()).DoAndReturn( func(ctx context.Context, procman *guest.ProcessManager, req *types.StartProgramInGuest) (int64, error) { assert.Equal(t, "/bin/echo", req.Spec.GetGuestProgramSpec().ProgramPath) - assert.Equal(t, "-n ;\n./worker\nshutdown -h now", req.Spec.GetGuestProgramSpec().Arguments) + assert.Contains(t, req.Spec.GetGuestProgramSpec().Arguments, "-n ;\n") + assert.Contains(t, req.Spec.GetGuestProgramSpec().Arguments, "./worker\nshutdown -h now") + assert.Contains(t, req.Spec.GetGuestProgramSpec().Arguments, "CDS_CONFIG=") return 1, nil }, ) @@ -577,8 +581,9 @@ func TestHatcheryVSphere_SpawnWorkerFromProvisioning(t *testing.T) { c.EXPECT().StartProgramInGuest(gomock.Any(), &procman, gomock.Any()).DoAndReturn( func(ctx context.Context, procman *guest.ProcessManager, req *types.StartProgramInGuest) (int64, error) { - assert.Equal(t, "/bin/echo", req.Spec.GetGuestProgramSpec().ProgramPath) - assert.Equal(t, "-n ;\n./worker\nshutdown -h now", req.Spec.GetGuestProgramSpec().Arguments) + assert.Contains(t, req.Spec.GetGuestProgramSpec().Arguments, "-n ;\n") + assert.Contains(t, req.Spec.GetGuestProgramSpec().Arguments, "./worker\nshutdown -h now") + assert.Contains(t, req.Spec.GetGuestProgramSpec().Arguments, "CDS_CONFIG=") return 1, nil }, ) diff --git a/engine/service/types.go b/engine/service/types.go index d554933bf3..d84aeaf4db 100644 --- a/engine/service/types.go +++ b/engine/service/types.go @@ -59,6 +59,7 @@ type HatcheryCommonConfiguration struct { Insecure bool `toml:"insecure" default:"false" commented:"true" comment:"sslInsecureSkipVerify, set to true if you use a self-signed SSL on CDS API" json:"insecure"` } `toml:"workerApiHttp" json:"workerApiHttp"` WorkerLogsOptions struct { + Level string `toml:"level" comment:"Worker log level" json:"level"` Graylog struct { Host string `toml:"host" comment:"Example: thot.ovh.com" json:"host"` Port int `toml:"port" comment:"Example: 12202" json:"port"` diff --git a/engine/worker/Makefile b/engine/worker/Makefile index f5b2723697..3d1c1de4bf 100644 --- a/engine/worker/Makefile +++ b/engine/worker/Makefile @@ -41,6 +41,9 @@ clean: @for profile in `find . -name "*.coverprofile"`; do \ rm $$profile; \ done; + @rm -rf internal/action/test-Test* + @rm -rf internal/test-Test* + define get_os_from_binary_file $(strip $(shell echo $(1) | awk '{n=split($$1,a,"-");print a[n-1]}')) diff --git a/engine/worker/cmd.go b/engine/worker/cmd.go index 27fc95ef13..6bbfc5d8f1 100644 --- a/engine/worker/cmd.go +++ b/engine/worker/cmd.go @@ -2,23 +2,29 @@ package main import ( "context" + "encoding/base64" "fmt" "os" "path/filepath" "strconv" "strings" + "github.com/pkg/errors" "github.com/rockbears/log" "github.com/spf13/afero" "github.com/spf13/cobra" "github.com/ovh/cds/engine/worker/internal" + "github.com/ovh/cds/engine/worker/pkg/workerruntime" "github.com/ovh/cds/sdk" cdslog "github.com/ovh/cds/sdk/log" ) const ( - envFlagPrefix = "cds_" + envFlagPrefix = "cds_" + flagConfig = "config" + + // TODO: the flag below will be removed flagBaseDir = "basedir" flagBookedWorkflowJobID = "booked-workflow-job-id" flagGraylogProtocol = "graylog-protocol" @@ -37,6 +43,9 @@ const ( func initFlagsRun(cmd *cobra.Command) { flags := cmd.Flags() + flags.String(flagConfig, "", "base64 encoded json configuration") + + // TODO: the flag below will be removed flags.String(flagBaseDir, "", "This directory (default TMPDIR os environment var) will contains worker working directory and temporary files") flags.Int64(flagBookedWorkflowJobID, 0, "Booked Workflow job id") flags.String(flagGraylogProtocol, "", "Ex: --graylog-protocol=xxxx-yyyy") @@ -98,7 +107,54 @@ func FlagInt64(cmd *cobra.Command, key string) int64 { return i } -func initFromFlags(cmd *cobra.Command, w *internal.CurrentWorker) { +func initFromConfig(ctx context.Context, cfg *workerruntime.WorkerConfig, w *internal.CurrentWorker) error { + cfg.Log.GraylogFieldCDSVersion = sdk.VERSION + cfg.Log.GraylogFieldCDSOS = sdk.GOOS + cfg.Log.GraylogFieldCDSArch = sdk.GOARCH + cfg.Log.GraylogFieldCDSServiceType = "worker" + + cdslog.Initialize(ctx, &cfg.Log) + + fs := afero.NewOsFs() + if cfg.Basedir == "" { + cfg.Basedir = os.TempDir() + } + log.Debug(ctx, "creating basedir %s", cfg.Basedir) + if err := fs.MkdirAll(cfg.Basedir, os.FileMode(0755)); err != nil { + return errors.Errorf("unable to setup worker basedir %q: %+v", cfg.Basedir, err) + } + os.Setenv("BASEDIR", cfg.Basedir) + os.Setenv("HATCHERY_NAME", cfg.HatcheryName) + os.Setenv("HATCHERY_WORKER", cfg.Name) + if cfg.Region != "" { + os.Setenv("HATCHERY_REGION", cfg.Region) + } + if cfg.Model != "" { + os.Setenv("HATCHERY_MODEL", cfg.Model) + } + for k, v := range cfg.InjectEnvVars { + if v == "" { + continue + } + os.Setenv(k, v) + } + + return w.Init(cfg, afero.NewBasePathFs(fs, cfg.Basedir)) +} + +func initFromFlags(cmd *cobra.Command) (*workerruntime.WorkerConfig, error) { + if base64config := FlagString(cmd, flagConfig); base64config != "" { + btes, err := base64.StdEncoding.DecodeString(base64config) + if err != nil { + return nil, errors.Errorf("unable to decode config: %v", err) + } + var cfg workerruntime.WorkerConfig + if err := sdk.JSONUnmarshal(btes, &cfg); err != nil { + return nil, errors.Errorf("unable to parse config: %v", err) + } + return &cfg, nil + } + var errN error var hostname string hostname, errN = os.Hostname() @@ -118,54 +174,37 @@ func initFromFlags(cmd *cobra.Command, w *internal.CurrentWorker) { basedir = os.TempDir() } - cdslog.Initialize(context.Background(), &cdslog.Conf{ - Level: FlagString(cmd, flagLogLevel), - GraylogProtocol: FlagString(cmd, flagGraylogProtocol), - GraylogHost: FlagString(cmd, flagGraylogHost), - GraylogPort: FlagString(cmd, flagGraylogPort), - GraylogExtraKey: FlagString(cmd, flagGraylogExtraKey), - GraylogExtraValue: FlagString(cmd, flagGraylogExtraValue), - GraylogFieldCDSVersion: sdk.VERSION, - GraylogFieldCDSOS: sdk.GOOS, - GraylogFieldCDSArch: sdk.GOARCH, - GraylogFieldCDSServiceName: givenName, - GraylogFieldCDSServiceType: "worker", - }) - hatcheryName := FlagString(cmd, flagHatcheryName) apiEndpoint := FlagString(cmd, flagAPI) if apiEndpoint == "" { - log.Error(context.TODO(), "--api not provided, aborting.") - os.Exit(3) + return nil, errors.New("--api not provided, aborting.") } token := FlagString(cmd, flagToken) if token == "" { - log.Error(context.TODO(), "--token not provided, aborting.") - os.Exit(4) + return nil, errors.New("--token not provided, aborting.") } basedir, err := filepath.EvalSymlinks(basedir) if err != nil { - log.Error(context.Background(), "symlink error: %v", err) - os.Exit(6) + return nil, errors.Errorf("symlink error: %v", err) } - fs := afero.NewOsFs() - log.Debug(context.TODO(), "creating basedir %s", basedir) - if err := fs.MkdirAll(basedir, os.FileMode(0755)); err != nil { - log.Error(context.TODO(), "basedir error: %v", err) - os.Exit(5) - } - - if err := w.Init(givenName, - hatcheryName, - apiEndpoint, - token, - FlagString(cmd, flagModel), - FlagBool(cmd, flagInsecure), - afero.NewBasePathFs(fs, basedir)); err != nil { - log.Error(context.TODO(), "Cannot init worker: %v", err) - os.Exit(1) - } + return &workerruntime.WorkerConfig{ + Name: givenName, + Basedir: basedir, + HatcheryName: hatcheryName, + APIToken: token, + APIEndpoint: apiEndpoint, + APIEndpointInsecure: FlagBool(cmd, flagInsecure), + Model: FlagString(cmd, flagModel), + Log: cdslog.Conf{ + Level: FlagString(cmd, flagLogLevel), + GraylogProtocol: FlagString(cmd, flagGraylogProtocol), + GraylogHost: FlagString(cmd, flagGraylogHost), + GraylogPort: FlagString(cmd, flagGraylogPort), + GraylogExtraKey: FlagString(cmd, flagGraylogExtraKey), + GraylogExtraValue: FlagString(cmd, flagGraylogExtraValue), + }, + }, nil } diff --git a/engine/worker/cmd_register.go b/engine/worker/cmd_register.go index cab7d66df7..fcfdb847aa 100644 --- a/engine/worker/cmd_register.go +++ b/engine/worker/cmd_register.go @@ -28,7 +28,15 @@ func cmdRegisterRun() func(cmd *cobra.Command, args []string) { ctx := context.Background() - initFromFlags(cmd, w) + cfg, err := initFromFlags(cmd) + if err != nil { + log.Fatal(ctx, "%v", err) + } + + if err := initFromConfig(ctx, cfg, w); err != nil { + log.Fatal(ctx, "%v", err) + } + defer cdslog.Flush(ctx, logrus.StandardLogger()) if err := w.Register(ctx); err != nil { diff --git a/engine/worker/cmd_run.go b/engine/worker/cmd_run.go index fac3b721cf..ea5d1fee1f 100644 --- a/engine/worker/cmd_run.go +++ b/engine/worker/cmd_run.go @@ -35,14 +35,24 @@ func runCmd() func(cmd *cobra.Command, args []string) { ctx, cancel := context.WithCancel(context.Background()) // Setup workerfrom commandline flags or env variables - initFromFlags(cmd, w) + cfg, err := initFromFlags(cmd) + if err != nil { + log.Fatal(ctx, "%v", err) + } + if err := initFromConfig(ctx, cfg, w); err != nil { + log.Fatal(ctx, "%v", err) + } + defer cdslog.Flush(ctx, logrus.StandardLogger()) + // TODO: remove this code with all the flags replaces by config // Get the booked job ID - bookedWJobID := FlagInt64(cmd, flagBookedWorkflowJobID) - - if bookedWJobID == 0 { - sdk.Exit("flag --booked-workflow-job-id is mandatory") + if cfg.BookedJobID == 0 { + bookedWJobID := FlagInt64(cmd, flagBookedWorkflowJobID) + if bookedWJobID == 0 { + sdk.Exit("flag --booked-workflow-job-id is mandatory") + } + cfg.BookedJobID = bookedWJobID } // Gracefully shutdown connections @@ -64,7 +74,7 @@ func runCmd() func(cmd *cobra.Command, args []string) { } }() // Start the worker - if err := internal.StartWorker(ctx, w, bookedWJobID); err != nil { + if err := internal.StartWorker(ctx, w, cfg.BookedJobID); err != nil { ctx := sdk.ContextWithStacktrace(ctx, err) ctx = context.WithValue(ctx, cdslog.RequestID, sdk.ExtractHTTPError(err).RequestID) log.Error(ctx, err.Error()) diff --git a/engine/worker/internal/action/builtin_key_install_test.go b/engine/worker/internal/action/builtin_key_install_test.go index e8fccaa1a8..1dc7588b75 100644 --- a/engine/worker/internal/action/builtin_key_install_test.go +++ b/engine/worker/internal/action/builtin_key_install_test.go @@ -26,8 +26,15 @@ func TestRunInstallKeyAction_Relative(t *testing.T) { fs := afero.NewOsFs() basedir := "test-" + test.GetTestName(t) + "-" + sdk.RandomString(10) + "-" + fmt.Sprintf("%d", time.Now().Unix()) require.NoError(t, fs.MkdirAll(basedir, os.FileMode(0755))) - - if err := w.Init("test-worker", "test-hatchery", "http://lolcat.host", "xxx-my-token", "", true, afero.NewBasePathFs(fs, basedir)); err != nil { + cfg := &workerruntime.WorkerConfig{ + Name: "test-worker", + HatcheryName: "test-hatchery", + APIEndpoint: "http://lolcat.host", + APIToken: "xxx-my-token", + APIEndpointInsecure: true, + Basedir: basedir, + } + if err := w.Init(cfg, afero.NewBasePathFs(fs, basedir)); err != nil { t.Fatalf("worker init failed: %v", err) } require.NoError(t, w.BaseDir().Mkdir("keys", os.FileMode(0700))) @@ -84,8 +91,15 @@ func TestRunInstallKeyAction_Absolute(t *testing.T) { fs := afero.NewOsFs() basedir := "test-" + test.GetTestName(t) + "-" + sdk.RandomString(10) + "-" + fmt.Sprintf("%d", time.Now().Unix()) require.NoError(t, fs.MkdirAll(basedir, os.FileMode(0755))) - - if err := w.Init("test-worker", "test-hatchery", "http://lolcat.host", "xxx-my-token", "", true, afero.NewBasePathFs(fs, basedir)); err != nil { + cfg := &workerruntime.WorkerConfig{ + Name: "test-worker", + HatcheryName: "test-hatchery", + APIEndpoint: "http://lolcat.host", + APIToken: "xxx-my-token", + APIEndpointInsecure: true, + Basedir: basedir, + } + if err := w.Init(cfg, afero.NewBasePathFs(fs, basedir)); err != nil { t.Fatalf("worker init failed: %v", err) } require.NoError(t, w.BaseDir().Mkdir("keys", os.FileMode(0700))) diff --git a/engine/worker/internal/handler_key_test.go b/engine/worker/internal/handler_key_test.go index e933b75c84..b5b548c4eb 100644 --- a/engine/worker/internal/handler_key_test.go +++ b/engine/worker/internal/handler_key_test.go @@ -22,8 +22,15 @@ func Test_keyInstall(t *testing.T) { fs := afero.NewOsFs() basedir := "test-" + test.GetTestName(t) + "-" + sdk.RandomString(10) + "-" + fmt.Sprintf("%d", time.Now().Unix()) require.NoError(t, fs.MkdirAll(basedir, os.FileMode(0755))) - - if err := w.Init("test-worker", "test-hatchery", "http://lolcat.host", "xxx-my-token", "", true, afero.NewBasePathFs(fs, basedir)); err != nil { + cfg := &workerruntime.WorkerConfig{ + Name: "test-worker", + HatcheryName: "test-hatchery", + APIEndpoint: "http://lolcat.host", + APIToken: "xxx-my-token", + APIEndpointInsecure: true, + Basedir: basedir, + } + if err := w.Init(cfg, afero.NewBasePathFs(fs, basedir)); err != nil { t.Fatalf("worker init failed: %v", err) } require.NoError(t, w.BaseDir().Mkdir("keys", os.FileMode(0700))) diff --git a/engine/worker/internal/handler_tmpl_test.go b/engine/worker/internal/handler_tmpl_test.go index f52fbc63c0..021222b3f8 100644 --- a/engine/worker/internal/handler_tmpl_test.go +++ b/engine/worker/internal/handler_tmpl_test.go @@ -25,8 +25,15 @@ func Test_tmplHandler(t *testing.T) { fs := afero.NewOsFs() basedir := "test-" + test.GetTestName(t) + "-" + sdk.RandomString(10) + "-" + fmt.Sprintf("%d", time.Now().Unix()) require.NoError(t, fs.MkdirAll(basedir, os.FileMode(0755))) - - if err := wk.Init("test-worker", "test-hatchery", "http://lolcat.host", "xxx-my-token", "", true, afero.NewBasePathFs(fs, basedir)); err != nil { + cfg := &workerruntime.WorkerConfig{ + Name: "test-worker", + HatcheryName: "test-hatchery", + APIEndpoint: "http://lolcat.host", + APIToken: "xxx-my-token", + APIEndpointInsecure: true, + Basedir: basedir, + } + if err := wk.Init(cfg, afero.NewBasePathFs(fs, basedir)); err != nil { t.Fatalf("worker init failed: %v", err) } wk.currentJob.wJob = &sdk.WorkflowNodeJobRun{ @@ -83,8 +90,15 @@ func Test_tmplHandlerInWrongDir(t *testing.T) { fs := afero.NewOsFs() basedir := "test-" + test.GetTestName(t) + "-" + sdk.RandomString(10) + "-" + fmt.Sprintf("%d", time.Now().Unix()) require.NoError(t, fs.MkdirAll(basedir, os.FileMode(0755))) - - if err := wk.Init("test-worker", "test-hatchery", "http://lolcat.host", "xxx-my-token", "", true, afero.NewBasePathFs(fs, basedir)); err != nil { + cfg := &workerruntime.WorkerConfig{ + Name: "test-worker", + HatcheryName: "test-hatchery", + APIEndpoint: "http://lolcat.host", + APIToken: "xxx-my-token", + APIEndpointInsecure: true, + Basedir: basedir, + } + if err := wk.Init(cfg, afero.NewBasePathFs(fs, basedir)); err != nil { t.Fatalf("worker init failed: %v", err) } wk.currentJob.wJob = &sdk.WorkflowNodeJobRun{ID: 1} diff --git a/engine/worker/internal/keys_test.go b/engine/worker/internal/keys_test.go index bbf5e2c2fa..f4d7097566 100644 --- a/engine/worker/internal/keys_test.go +++ b/engine/worker/internal/keys_test.go @@ -58,7 +58,15 @@ func TestInstallKey_SSHKeyWithoutDestination(t *testing.T) { basedir := "test-" + test.GetTestName(t) + "-" + sdk.RandomString(10) + "-" + fmt.Sprintf("%d", time.Now().Unix()) require.NoError(t, fs.MkdirAll(basedir, os.FileMode(0755))) - if err := w.Init("test-worker", "test-hatchery", "http://lolcat.host", "xxx-my-token", "", true, afero.NewBasePathFs(fs, basedir)); err != nil { + cfg := workerruntime.WorkerConfig{ + Name: "test-worker", + HatcheryName: "test-hatchery", + APIEndpoint: "http://lolcat.host", + APIToken: "xxx-my-token", + APIEndpointInsecure: true, + Basedir: basedir, + } + if err := w.Init(&cfg, afero.NewBasePathFs(fs, basedir)); err != nil { t.Fatalf("worker init failed: %v", err) } @@ -98,8 +106,15 @@ func TestInstallKey_SSHKeyWithRelativeDestination(t *testing.T) { fs := afero.NewOsFs() basedir := "test-" + test.GetTestName(t) + "-" + sdk.RandomString(10) + "-" + fmt.Sprintf("%d", time.Now().Unix()) require.NoError(t, fs.MkdirAll(basedir, os.FileMode(0755))) - - if err := w.Init("test-worker", "test-hatchery", "http://lolcat.host", "xxx-my-token", "", true, afero.NewBasePathFs(fs, basedir)); err != nil { + cfg := workerruntime.WorkerConfig{ + Name: "test-worker", + HatcheryName: "test-hatchery", + APIEndpoint: "http://lolcat.host", + APIToken: "xxx-my-token", + APIEndpointInsecure: true, + Basedir: basedir, + } + if err := w.Init(&cfg, afero.NewBasePathFs(fs, basedir)); err != nil { t.Fatalf("worker init failed: %v", err) } @@ -137,8 +152,14 @@ func TestInstallKey_SSHKeyWithRelativeDestination(t *testing.T) { func TestInstallKey_SSHKeyWithAbsoluteDestination(t *testing.T) { var w = new(CurrentWorker) - - if err := w.Init("test-worker", "test-hatchery", "http://lolcat.host", "xxx-my-token", "", true, nil); err != nil { + cfg := &workerruntime.WorkerConfig{ + Name: "test-worker", + HatcheryName: "test-hatchery", + APIEndpoint: "http://lolcat.host", + APIToken: "xxx-my-token", + APIEndpointInsecure: true, + } + if err := w.Init(cfg, nil); err != nil { t.Fatalf("worker init failed: %v", err) } diff --git a/engine/worker/internal/register.go b/engine/worker/internal/register.go index 9acf00661b..c860817057 100644 --- a/engine/worker/internal/register.go +++ b/engine/worker/internal/register.go @@ -13,7 +13,7 @@ import ( // Workers need to register to main api so they can run actions func (w *CurrentWorker) Register(ctx context.Context) error { var form sdk.WorkerRegistrationForm - log.Info(ctx, "Registering with Token %s on %s", sdk.StringFirstN(w.register.token, 12), w.register.apiEndpoint) + log.Info(ctx, "Registering on %s", w.cfg.APIEndpoint) requirements, errR := w.client.Requirements() if errR != nil { @@ -27,7 +27,7 @@ func (w *CurrentWorker) Register(ctx context.Context) error { form.OS = sdk.GOOS form.Arch = sdk.GOARCH - worker, uptodate, err := w.client.WorkerRegister(context.Background(), w.register.token, form) + worker, uptodate, err := w.client.WorkerRegister(context.Background(), w.cfg.APIToken, form) if err != nil { return sdk.WithStack(err) } diff --git a/engine/worker/internal/start.go b/engine/worker/internal/start.go index 8d7d2441d4..f77989539d 100644 --- a/engine/worker/internal/start.go +++ b/engine/worker/internal/start.go @@ -6,15 +6,19 @@ import ( "strings" "time" - "github.com/ovh/cds/sdk" + "github.com/pkg/errors" "github.com/rockbears/log" + + "github.com/ovh/cds/sdk" ) func StartWorker(ctx context.Context, w *CurrentWorker, bookedJobID int64) (mainError error) { + ctx = context.WithValue(ctx, log.Field("permJobID"), bookedJobID) + log.Info(ctx, "Starting worker %s on job %d", w.Name(), bookedJobID) if bookedJobID == 0 { - return fmt.Errorf("startWorker: bookedJobID is mandatory. val:%d", bookedJobID) + return errors.Errorf("startWorker: bookedJobID is mandatory. val:%d", bookedJobID) } ctx, cancel := context.WithCancel(ctx) diff --git a/engine/worker/internal/start_test.go b/engine/worker/internal/start_test.go index ca0b158249..9524e63070 100644 --- a/engine/worker/internal/start_test.go +++ b/engine/worker/internal/start_test.go @@ -24,6 +24,7 @@ import ( "github.com/ovh/cds/engine/test" "github.com/ovh/cds/engine/worker/internal" + "github.com/ovh/cds/engine/worker/pkg/workerruntime" "github.com/ovh/cds/sdk" "github.com/ovh/cds/sdk/cdsclient" cdslog "github.com/ovh/cds/sdk/log" @@ -253,8 +254,6 @@ func TestStartWorkerWithABookedJob(t *testing.T) { assert.NoError(t, err) request.Body = ioutil.NopCloser(bytes.NewReader(bodyContent)) if mock != nil { - t.Logf("%s %s - Body: %s", mock.Request().Method, mock.Request().URLStruct.String(), string(bodyContent)) - switch mock.Request().URLStruct.String() { case "http://lolcat.host/queue/workflows/42/step": var result sdk.StepStatus @@ -312,7 +311,17 @@ func TestStartWorkerWithABookedJob(t *testing.T) { log.Debug(context.TODO(), "creating basedir %s", basedir) require.NoError(t, fs.MkdirAll(basedir, os.FileMode(0755))) - if err := w.Init("test-worker", "test-hatchery", "http://lolcat.host", "xxx-my-token", "", true, afero.NewBasePathFs(fs, basedir)); err != nil { + cfg := &workerruntime.WorkerConfig{ + Name: "test-worker", + HatcheryName: "test-hatchery", + APIEndpoint: "http://lolcat.host", + APIToken: "xxx-my-token", + APIEndpointInsecure: true, + Model: "my-model", + Region: "local-test", + Basedir: basedir, + } + if err := w.Init(cfg, afero.NewBasePathFs(fs, basedir)); err != nil { t.Fatalf("worker init failed: %v", err) } gock.InterceptClient(w.Client().(cdsclient.Raw).HTTPClient()) @@ -378,4 +387,10 @@ func TestStartWorkerWithABookedJob(t *testing.T) { assert.Equal(t, 1, strings.Count(logBuffer.String(), "CDS_SEMVER=0.1.0+cds.1")) assert.Equal(t, 1, strings.Count(logBuffer.String(), "GIT_DESCRIBE=0.1.0")) assert.Equal(t, 0, strings.Count(logBuffer.String(), "CDS_BUILD_CDS_BUILD")) + assert.Equal(t, 1, strings.Count(logBuffer.String(), "HATCHERY_MODEL=my-model")) + assert.Equal(t, 1, strings.Count(logBuffer.String(), "HATCHERY_NAME=test-hatchery")) + assert.Equal(t, 1, strings.Count(logBuffer.String(), "HATCHERY_WORKER=test-worker")) + assert.Equal(t, 1, strings.Count(logBuffer.String(), "HATCHERY_REGION=local-test")) + assert.Equal(t, 1, strings.Count(logBuffer.String(), "BASEDIR=")) + } diff --git a/engine/worker/internal/types.go b/engine/worker/internal/types.go index df6d0b4336..ed4e7f152b 100644 --- a/engine/worker/internal/types.go +++ b/engine/worker/internal/types.go @@ -36,6 +36,7 @@ type logger struct { } type CurrentWorker struct { + cfg *workerruntime.WorkerConfig id string model sdk.Model basedir afero.Fs @@ -44,12 +45,7 @@ type CurrentWorker struct { cdnHttpAddr string stepLogLine int64 httpPort int32 - register struct { - apiEndpoint string - token string - model string - } - currentJob struct { + currentJob struct { wJob *sdk.WorkflowNodeJobRun newVariables []sdk.Variable params []sdk.Parameter @@ -76,13 +72,11 @@ type CurrentWorker struct { // BuiltInAction defines builtin action signature type BuiltInAction func(context.Context, workerruntime.Runtime, sdk.Action, []sdk.Variable) (sdk.Result, error) -func (wk *CurrentWorker) Init(name, hatcheryName, apiEndpoint, token string, model string, insecure bool, workspace afero.Fs) error { - wk.status.Name = name +func (wk *CurrentWorker) Init(cfg *workerruntime.WorkerConfig, workspace afero.Fs) error { + wk.cfg = cfg + wk.status.Name = cfg.Name wk.basedir = workspace - wk.register.model = model - wk.register.token = token - wk.register.apiEndpoint = apiEndpoint - wk.client = cdsclient.NewWorker(apiEndpoint, name, cdsclient.NewHTTPClient(time.Second*10, insecure)) + wk.client = cdsclient.NewWorker(cfg.APIEndpoint, cfg.Name, cdsclient.NewHTTPClient(time.Second*10, cfg.APIEndpointInsecure)) return nil } @@ -265,20 +259,33 @@ func (wk *CurrentWorker) Environ() []string { newEnv := []string{"CI=1"} // filter technical env variables for _, e := range env { + if e == "" { + continue + } if strings.HasPrefix(e, "CDS_") { continue } newEnv = append(newEnv, e) } - //We have to let it here for some legacy reason - newEnv = append(newEnv, "CDS_KEY=********") - - // worker export http port + newEnv = append(newEnv, "CDS_KEY=********") //We have to let it here for some legacy reason newEnv = append(newEnv, fmt.Sprintf("%s=%d", WorkerServerPort, wk.HTTPPort())) - - // Api Endpoint in CDS_API_URL var - newEnv = append(newEnv, fmt.Sprintf("%s=%s", CDSApiUrl, wk.register.apiEndpoint)) + newEnv = append(newEnv, fmt.Sprintf("%s=%s", CDSApiUrl, wk.cfg.APIEndpoint)) + newEnv = append(newEnv, "BASEDIR="+wk.cfg.Basedir) + newEnv = append(newEnv, "HATCHERY_NAME="+wk.cfg.HatcheryName) + newEnv = append(newEnv, "HATCHERY_WORKER="+wk.cfg.Name) + if wk.cfg.Region != "" { + newEnv = append(newEnv, "HATCHERY_REGION="+wk.cfg.Region) + } + if wk.cfg.Model != "" { + newEnv = append(newEnv, "HATCHERY_MODEL="+wk.cfg.Model) + } + for k, v := range wk.cfg.InjectEnvVars { + if v == "" { + continue + } + newEnv = append(newEnv, k+"="+v) + } //set up environment variables from pipeline build job parameters for _, p := range wk.currentJob.params { diff --git a/engine/worker/pkg/workerruntime/types.go b/engine/worker/pkg/workerruntime/types.go index 41e27196c4..2293afddea 100644 --- a/engine/worker/pkg/workerruntime/types.go +++ b/engine/worker/pkg/workerruntime/types.go @@ -2,16 +2,38 @@ package workerruntime import ( "context" + "encoding/base64" + "encoding/json" "errors" "fmt" "github.com/ovh/cds/sdk/cdsclient" + cdslog "github.com/ovh/cds/sdk/log" "github.com/rockbears/log" "github.com/ovh/cds/sdk" "github.com/spf13/afero" ) +type WorkerConfig struct { + Name string `json:"name"` + Basedir string `json:"basedir"` + Log cdslog.Conf `json:"log"` + HatcheryName string `json:"hatchery_name"` + APIEndpoint string `json:"api_endpoint"` + APIEndpointInsecure bool `json:"api_endpoint_insecure,omitempty"` + APIToken string `json:"api_token"` + Model string `json:"model"` + BookedJobID int64 `json:"booked_job_id,omitempty"` + Region string `json:"region,omitempty"` + InjectEnvVars map[string]string `json:"inject_env_vars,omitempty"` +} + +func (cfg WorkerConfig) EncodeBase64() string { + btes, _ := json.Marshal(cfg) + return base64.StdEncoding.EncodeToString(btes) +} + type DownloadArtifact struct { Workflow string `json:"workflow"` Number int64 `json:"number"` diff --git a/sdk/log/hook/throttle.go b/sdk/log/hook/throttle.go index 9b58c5de67..942fe5c4c6 100644 --- a/sdk/log/hook/throttle.go +++ b/sdk/log/hook/throttle.go @@ -64,7 +64,6 @@ func (d *DefaultThrottlePolicy) Flush() { d.mutex.Lock() defer d.mutex.Unlock() - fmt.Fprintf(os.Stderr, "[graylog] flush\n") for len(d.buffer) != 0 { time.Sleep(time.Second) } diff --git a/sdk/worker.go b/sdk/worker.go index 805017b6a1..bb84ce9ce1 100644 --- a/sdk/worker.go +++ b/sdk/worker.go @@ -1,8 +1,6 @@ package sdk import ( - "bytes" - "html/template" "time" ) @@ -38,45 +36,6 @@ type SpawnErrorForm struct { Logs []byte } -// WorkerArgs is all the args needed to run a worker -type WorkerArgs struct { - API string `json:"api"` - Token string `json:"token"` - Name string `json:"name"` - BaseDir string `json:"base_dir"` - HTTPInsecure bool `json:"http_insecure"` - Model string `json:"model"` - HatcheryName string `json:"hatchery_name"` - WorkflowJobID int64 `json:"workflow_job_id"` - TTL int `json:"ttl"` - FromWorkerImage bool `json:"from_worker_image"` - //Graylog params - GraylogHost string `json:"graylog_host"` - GraylogPort int `json:"graylog_port"` - GraylogExtraKey string `json:"graylog_extra_key"` - GraylogExtraValue string `json:"graylog_extra_value"` - WorkerBinary string - // Env variables - InjectEnvVars map[string]string `json:"inject_env_vars"` -} - -// TemplateEnvs return envs interpolated with worker arguments -func TemplateEnvs(args WorkerArgs, envs map[string]string) (map[string]string, error) { - for name, value := range envs { - tmpl, errt := template.New("env").Parse(value) - if errt != nil { - return envs, errt - } - var buffer bytes.Buffer - if errTmpl := tmpl.Execute(&buffer, args); errTmpl != nil { - return envs, errTmpl - } - envs[name] = buffer.String() - } - - return envs, nil -} - // WorkflowNodeJobRunData is returned to worker in answer to postTakeWorkflowJobHandler type WorkflowNodeJobRunData struct { NodeJobRun WorkflowNodeJobRun diff --git a/tests/fixtures/my_worker_model_updated.yml b/tests/fixtures/my_worker_model_updated.yml index 14147b09ca..ad18fe0fff 100644 --- a/tests/fixtures/my_worker_model_updated.yml +++ b/tests/fixtures/my_worker_model_updated.yml @@ -8,8 +8,6 @@ envs: CDS_GRAYLOG_EXTRA_VALUE: '{{.GraylogExtraValue}}' CDS_GRAYLOG_HOST: '{{.GraylogHost}}' CDS_GRAYLOG_PORT: '{{.GraylogPort}}' - CDS_SINGLE_USE: "1" - CDS_TTL: '{{.TTL}}' atest: this is an env test shell: sh -c cmd: worker --api={{.API}} --token={{.Token}} --basedir={{.BaseDir}} --model={{.Model}} --name={{.Name}} --hatchery={{.Hatchery}} --hatchery-name={{.HatcheryName}} --insecure={{.HTTPInsecure}} --single-use