Skip to content

Commit

Permalink
feat(hatchery,worker): worker start command (#5986)
Browse files Browse the repository at this point in the history
  • Loading branch information
fsamin authored Nov 3, 2021
1 parent fd7b0a4 commit d9eec0f
Show file tree
Hide file tree
Showing 38 changed files with 455 additions and 443 deletions.
3 changes: 0 additions & 3 deletions docs/content/docs/tutorials/worker_model-vsphere.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,13 @@ We will create a worker model called debian8-docker:
#!/bin/bash
set +e
export CDS_FROM_WORKER_IMAGE={{.FromWorkerImage}}
export CDS_SINGLE_USE=1
export CDS_API={{.API}}
export CDS_TOKEN={{.Token}}
export CDS_NAME={{.Name}}
export CDS_MODEL={{.Model}}
export CDS_HATCHERY={{.Hatchery}}
export CDS_HATCHERY_NAME={{.HatcheryName}}
export CDS_BOOKED_PB_JOB_ID={{.PipelineBuildJobID}}
export CDS_BOOKED_WORKFLOW_JOB_ID={{.WorkflowJobID}}
export CDS_TTL={{.TTL}}
export CDS_INSECURE={{.HTTPInsecure}}
# Basic build binaries
Expand Down
5 changes: 0 additions & 5 deletions engine/api/project_integration_worker_hook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ func TestAPI_post_getProjectIntegrationWorkerHookHandler(t *testing.T) {
btes := w.Body.Bytes()
var wh2 sdk.WorkerHookProjectIntegrationModel
require.NoError(t, json.Unmarshal(btes, &wh2))
t.Logf(">> wh2=%+v", wh2)

uri = router.GetRoute("GET", api.getProjectIntegrationWorkerHookHandler, vars)
req = assets.NewAuthentifiedRequest(t, u, pass, "GET", uri, nil)
Expand All @@ -77,8 +76,6 @@ func TestAPI_post_getProjectIntegrationWorkerHookHandler(t *testing.T) {
var wh3 sdk.WorkerHookProjectIntegrationModel
require.NoError(t, json.Unmarshal(btes, &wh3))

t.Logf(">> wh3=%+v", wh3)

wh = wh3
wh.Disable = true

Expand All @@ -104,6 +101,4 @@ func TestAPI_post_getProjectIntegrationWorkerHookHandler(t *testing.T) {
var wh5 sdk.WorkerHookProjectIntegrationModel
require.NoError(t, json.Unmarshal(btes, &wh5))
require.True(t, wh5.Disable)
t.Logf(">> wh5=%+v", wh5)

}
4 changes: 0 additions & 4 deletions engine/api/workermodel/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,6 @@ func Insert(ctx context.Context, db gorpmapper.SqlExecutorWithTx, model *sdk.Mod
dbmodel.UserLastModified = time.Now()
dbmodel.NeedRegistration = true

mergeModelEnvsWithDefaultEnvs(&dbmodel)

needSaveRegistryPassword, dockerRegistryPassword, err := replaceDockerRegistryPassword(db, &dbmodel)
if err != nil {
return err
Expand Down Expand Up @@ -246,8 +244,6 @@ func UpdateDB(ctx context.Context, db gorpmapper.SqlExecutorWithTx, model *sdk.M
return err
}

mergeModelEnvsWithDefaultEnvs(&dbmodel)

needSaveRegistryPassword, dockerRegistryPassword, err := replaceDockerRegistryPassword(db, &dbmodel)
if err != nil {
return err
Expand Down
23 changes: 0 additions & 23 deletions engine/api/workermodel/dao_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,29 +135,6 @@ func TestInsert(t *testing.T) {
assert.EqualValues(t, *src, *res)
}

func TestMergeModelEnvsWithDefaultEnvs(t *testing.T) {
db, _ := test.SetupPG(t, bootstrap.InitiliazeDB)

g := assets.InsertGroup(t, db)

m := sdk.Model{
Name: sdk.RandomString(10),
Type: sdk.Docker,
ModelDocker: sdk.ModelDocker{
Image: "foo/bar:3.4",
},
GroupID: g.ID,
}
require.NoError(t, workermodel.Insert(context.TODO(), db, &m))
require.Len(t, m.ModelDocker.Envs, 6, "all default vars should be added by insert")

m.ModelDocker.Envs = map[string]string{
"myvar": "myvalue",
}
require.NoError(t, workermodel.UpdateDB(context.TODO(), db, &m))
require.Len(t, m.ModelDocker.Envs, 7, "all default vars should be merged to given vars by update")
}

func TestLoadByNameAndGroupID(t *testing.T) {
db, _ := test.SetupPG(t, bootstrap.InitiliazeDB)

Expand Down
20 changes: 5 additions & 15 deletions engine/api/workermodel/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,12 @@ func insertFirstPatterns(db gorp.SqlExecutor) error {
preCmdOs := `#!/bin/bash
set +e
export CDS_FROM_WORKER_IMAGE={{.FromWorkerImage}}
export CDS_SINGLE_USE=1
export CDS_API={{.API}}
export CDS_TOKEN={{.Token}}
export CDS_NAME={{.Name}}
export CDS_MODEL={{.Model}}
export CDS_HATCHERY_NAME={{.HatcheryName}}
export CDS_BOOKED_WORKFLOW_JOB_ID={{.WorkflowJobID}}
export CDS_TTL={{.TTL}}
export CDS_INSECURE={{.HTTPInsecure}}
export CDS_GRAYLOG_HOST={{.GraylogHost}}
export CDS_GRAYLOG_PORT={{.GraylogPort}}
export CDS_GRAYLOG_EXTRA_KEY={{.GraylogExtraKey}}
export CDS_GRAYLOG_EXTRA_VALUE={{.GraylogExtraValue}}
# Basic build binaries
cd $HOME
apt-get -y --force-yes update >> /tmp/user_data 2>&1
apt-get -y --force-yes install curl git >> /tmp/user_data 2>&1
apt-get -y --force-yes install binutils >> /tmp/user_data 2>&1
apt-get -y --force-yes install curl git binutils >> /tmp/user_data 2>&1
# Docker installation (FOR DEBIAN)
if [[ "x{{.FromWorkerImage}}" = "xtrue" ]]; then
Expand Down Expand Up @@ -94,7 +81,7 @@ chmod +x worker
Type: sdk.HostProcess,
Name: "basic_unix",
Model: sdk.ModelCmds{
Cmd: "worker --api={{.API}} --token={{.Token}} --basedir={{.BaseDir}} --model={{.Model}} --name={{.Name}} --hatchery-name={{.HatcheryName}} --insecure={{.HTTPInsecure}} --graylog-extra-key={{.GraylogExtraKey}} --graylog-extra-value={{.GraylogExtraValue}} --graylog-host={{.GraylogHost}} --graylog-port={{.GraylogPort}} --booked-workflow-job-id={{.WorkflowJobID}} --single-use --force-exit",
Cmd: "worker --config={{.Config}}",
},
},
}
Expand All @@ -108,6 +95,9 @@ chmod +x worker
return sdk.WrapError(err, "cannot load worker_model_pattern for type %s", pattern.Type)
}
if numPattern > 0 {
if err := UpdatePattern(db, &pattern); err != nil {
return sdk.WrapError(err, "cannot update basic model %s", pattern.Type)
}
continue
}
if err := InsertPattern(db, &pattern); err != nil {
Expand Down
24 changes: 0 additions & 24 deletions engine/api/workermodel/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,6 @@ import (
"github.com/ovh/cds/sdk"
)

var defaultEnvs = map[string]string{
"CDS_SINGLE_USE": "1",
"CDS_TTL": "{{.TTL}}",
"CDS_GRAYLOG_HOST": "{{.GraylogHost}}",
"CDS_GRAYLOG_PORT": "{{.GraylogPort}}",
"CDS_GRAYLOG_EXTRA_KEY": "{{.GraylogExtraKey}}",
"CDS_GRAYLOG_EXTRA_VALUE": "{{.GraylogExtraValue}}",
}

func mergeModelEnvsWithDefaultEnvs(m *workerModel) {
if m.Type != sdk.Docker {
return
}

if m.ModelDocker.Envs == nil {
m.ModelDocker.Envs = make(map[string]string)
}
for envName := range defaultEnvs {
if _, ok := m.ModelDocker.Envs[envName]; !ok {
m.ModelDocker.Envs[envName] = defaultEnvs[envName]
}
}
}

const registryPasswordSecretName = "secrets.registry_password"
const vpsherePasswordSecretName = "secrets.vsphere_password"

Expand Down
34 changes: 11 additions & 23 deletions engine/hatchery/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,12 @@ func (h *HatcheryKubernetes) SpawnWorker(ctx context.Context, spawnArgs hatchery
}
}

udataParam := h.GenerateWorkerArgs(ctx, h, spawnArgs)
udataParam.TTL = h.Config.WorkerTTL

udataParam.WorkflowJobID = spawnArgs.JobID
workerConfig := h.GenerateWorkerConfig(ctx, h, spawnArgs)
udataParam := struct {
API string
}{
API: workerConfig.APIEndpoint,
}

tmpl, errt := template.New("cmd").Parse(spawnArgs.Model.ModelDocker.Cmd)
if errt != nil {
Expand All @@ -216,26 +218,12 @@ func (h *HatcheryKubernetes) SpawnWorker(ctx context.Context, spawnArgs hatchery
if spawnArgs.Model.ModelDocker.Envs == nil {
spawnArgs.Model.ModelDocker.Envs = map[string]string{}
}
envsWm := udataParam.InjectEnvVars
envsWm := workerConfig.InjectEnvVars
envsWm["CDS_MODEL_MEMORY"] = fmt.Sprintf("%d", memory)
envsWm["CDS_API"] = udataParam.API
envsWm["CDS_TOKEN"] = udataParam.Token
envsWm["CDS_NAME"] = udataParam.Name
envsWm["CDS_MODEL_PATH"] = udataParam.Model
envsWm["CDS_HATCHERY_NAME"] = udataParam.HatcheryName
envsWm["CDS_FROM_WORKER_IMAGE"] = fmt.Sprintf("%v", udataParam.FromWorkerImage)
envsWm["CDS_INSECURE"] = fmt.Sprintf("%v", udataParam.HTTPInsecure)

if spawnArgs.JobID > 0 {
envsWm["CDS_BOOKED_WORKFLOW_JOB_ID"] = fmt.Sprintf("%d", spawnArgs.JobID)
}

envTemplated, errEnv := sdk.TemplateEnvs(udataParam, spawnArgs.Model.ModelDocker.Envs)
if errEnv != nil {
return errEnv
}
envsWm["CDS_FROM_WORKER_IMAGE"] = "true"
envsWm["CDS_CONFIG"] = workerConfig.EncodeBase64()

for envName, envValue := range envTemplated {
for envName, envValue := range spawnArgs.Model.ModelDocker.Envs {
envsWm[envName] = envValue
}

Expand Down Expand Up @@ -393,7 +381,7 @@ func (h *HatcheryKubernetes) NeedRegistration(_ context.Context, m *sdk.Model) b
}

func (h *HatcheryKubernetes) routines(ctx context.Context) {
ticker := time.NewTicker(10 * time.Second)
ticker := time.NewTicker(10 * time.Minute)
defer ticker.Stop()

for {
Expand Down
2 changes: 0 additions & 2 deletions engine/hatchery/kubernetes/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ var containerServiceNameRegexp = regexp.MustCompile(`service-([0-9]+)-(.*)`)
// HatcheryConfiguration is the configuration for local hatchery
type HatcheryConfiguration struct {
service.HatcheryCommonConfiguration `mapstructure:"commonConfiguration" toml:"commonConfiguration" json:"commonConfiguration"`
// WorkerTTL Worker TTL (minutes)
WorkerTTL int `mapstructure:"workerTTL" toml:"workerTTL" default:"10" commented:"false" comment:"Worker TTL (minutes)" json:"workerTTL"`
// DefaultMemory Worker default memory
DefaultMemory int `mapstructure:"defaultMemory" toml:"defaultMemory" default:"1024" commented:"false" comment:"Worker default memory in Mo" json:"defaultMemory"`
// Namespace is the kubernetes namespace in which workers are spawned"
Expand Down
98 changes: 98 additions & 0 deletions engine/hatchery/local/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package local

import (
"bufio"
"context"
"fmt"
"os/exec"
"strings"
"time"

"github.com/rockbears/log"
)

type Logger interface {
Logf(fmt string, values ...interface{})
Errorf(fmt string, values ...interface{})
Fatalf(fmt string, values ...interface{})
}

func (h *HatcheryLocal) startCmd(name string, cmd *exec.Cmd, logger Logger) error {
stdout, err := cmd.StdoutPipe()
if err != nil {
return fmt.Errorf("Failure due to internal error: unable to capture stdout: %v", err)
}

stderr, err := cmd.StderrPipe()
if err != nil {
return fmt.Errorf("Failure due to internal error: unable to capture stderr: %v", err)
}

stdoutreader := bufio.NewReader(stdout)
stderrreader := bufio.NewReader(stderr)

outchan := make(chan bool)
go func() {
for {
line, err := stdoutreader.ReadString('\n')
if line != "" {
logger.Logf(line)
}
if err != nil {
stdout.Close()
close(outchan)
return
}
}
}()

errchan := make(chan bool)
go func() {
for {
line, err := stderrreader.ReadString('\n')
if line != "" {
logger.Logf(line)
}
if err != nil {
stderr.Close()
close(errchan)
return
}
}
}()

if err := cmd.Start(); err != nil {
return fmt.Errorf("unable to start command: %v", err)
}

h.Lock()
h.workers[name] = workerCmd{cmd: cmd, created: time.Now()}
h.Unlock()

<-outchan
<-errchan
if err := cmd.Wait(); err != nil {
return fmt.Errorf("command failure: %v", err)
}

return nil
}

type localWorkerLogger struct {
name string
}

func (l localWorkerLogger) Logf(fmt string, values ...interface{}) {
fmt = strings.TrimSuffix(fmt, "\n")
log.Info(context.Background(), "hatchery> local> worker> %s> "+fmt, l.name)
}

func (l localWorkerLogger) Errorf(fmt string, values ...interface{}) {
fmt = strings.TrimSuffix(fmt, "\n")
log.Error(context.Background(), "hatchery> local> worker> %s> "+fmt, l.name)
}

func (l localWorkerLogger) Fatalf(fmt string, values ...interface{}) {
fmt = strings.TrimSuffix(fmt, "\n")
log.Fatal(context.TODO(), "hatchery> local> worker> %s> "+fmt, l.name)
}
Loading

0 comments on commit d9eec0f

Please sign in to comment.