Skip to content

Commit

Permalink
feat(worker): show running worker hooks in job spawn info (#6174)
Browse files Browse the repository at this point in the history
  • Loading branch information
richardlt authored Jun 2, 2022
1 parent 70e3ef9 commit eb4c592
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 49 deletions.
104 changes: 67 additions & 37 deletions engine/worker/internal/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"context"
"crypto/md5"
"fmt"
"io/fs"
"os"
"os/exec"
"os/user"
Expand Down Expand Up @@ -738,64 +737,82 @@ func (w *CurrentWorker) setupHooks(ctx context.Context, jobInfo sdk.WorkflowNode
// The error contains 'Executable file not found', the capa is not on the worker
continue
}

hookFilename := fmt.Sprintf("%d-%s-%s", hookConfig.Priority, integrationName, slug.Convert(hookConfig.Label))
hookFilePath := path.Join(workingDir, "setup", hookFilename)
log.Info(ctx, "setting up hook %q", hookFilePath)

hookFile, err := fs.Create(hookFilePath)
w.hooks = append(w.hooks, workerHook{
Config: hookConfig,
SetupPath: path.Join(workingDir, "setup", hookFilename),
TeardownPath: path.Join(workingDir, "teardown", hookFilename),
})
}

for _, h := range w.hooks {
infos := []sdk.SpawnInfo{{
RemoteTime: time.Now(),
Message: sdk.SpawnMsg{ID: sdk.MsgSpawnInfoWorkerHookSetup.ID, Args: []interface{}{h.Config.Label}},
}}
if err := w.Client().QueueJobSendSpawnInfo(ctx, w.currentJob.wJob.ID, infos); err != nil {
return sdk.WrapError(err, "cannot record QueueJobSendSpawnInfo for job (err spawn): %d", w.currentJob.wJob.ID)
}

log.Info(ctx, "setting up hook at %q", h.SetupPath)

hookFile, err := fs.Create(h.SetupPath)
if err != nil {
return errors.Errorf("unable to open hook file %q in %q: %v", hookFilePath, w.basedir.Name(), err)
return errors.Errorf("unable to open hook file %q in %q: %v", h.SetupPath, w.basedir.Name(), err)
}
if _, err := hookFile.WriteString(hookConfig.Setup); err != nil {
if _, err := hookFile.WriteString(h.Config.Setup); err != nil {
_ = hookFile.Close
return errors.Errorf("unable to setup hook %q: %v", hookFilePath, err)
return errors.Errorf("unable to setup hook %q: %v", h.SetupPath, err)
}
if err := hookFile.Close(); err != nil {
return errors.Errorf("unable to setup hook %q: %v", hookFilePath, err)
return errors.Errorf("unable to setup hook %q: %v", h.SetupPath, err)
}

hookFilePath = path.Join(workingDir, "teardown", hookFilename)
hookFile, err = fs.Create(hookFilePath)
hookFile, err = fs.Create(h.TeardownPath)
if err != nil {
return errors.Errorf("unable to open hook file %q: %v", hookFilePath, err)
return errors.Errorf("unable to open hook file %q: %v", h.TeardownPath, err)
}
if _, err := hookFile.WriteString(hookConfig.Teardown); err != nil {
if _, err := hookFile.WriteString(h.Config.Teardown); err != nil {
_ = hookFile.Close
return errors.Errorf("unable to setup hook %q: %v", hookFilePath, err)
return errors.Errorf("unable to setup hook %q: %v", h.TeardownPath, err)
}
if err := hookFile.Close(); err != nil {
return errors.Errorf("unable to setup hook %q: %v", hookFilePath, err)
return errors.Errorf("unable to setup hook %q: %v", h.TeardownPath, err)
}
}
}
return nil
}

func (w *CurrentWorker) executeHooksSetup(ctx context.Context, basedir afero.Fs, workingDir string) error {
func (w *CurrentWorker) executeHooksSetup(ctx context.Context, fs afero.Fs, workingDir string) error {
if strings.EqualFold(runtime.GOOS, "windows") {
log.Warn(ctx, "hooks are not supported on windows")
return nil
}

var result = make(map[string]string)
var setupDir = path.Join(workingDir, "setup")

var absPath string
if x, ok := basedir.(*afero.BasePathFs); ok {
absPath, _ = x.RealPath(setupDir)
absPath, _ = filepath.Abs(path.Dir(absPath))
basedir, ok := fs.(*afero.BasePathFs)
if !ok {
return sdk.WithStack(fmt.Errorf("invalid given basedir"))
}

setupDir = filepath.Join(absPath, filepath.Base(setupDir))

workerEnv := w.Environ()

err := filepath.Walk(setupDir, func(filepath string, info os.FileInfo, err error) error {
for _, h := range w.hooks {
filepath, err := basedir.RealPath(h.SetupPath)
if err != nil {
return err
return sdk.WrapError(err, "cannot get real path for: %s", h.SetupPath)
}
if info.IsDir() {
return nil

infos := []sdk.SpawnInfo{{
RemoteTime: time.Now(),
Message: sdk.SpawnMsg{ID: sdk.MsgSpawnInfoWorkerHookRun.ID, Args: []interface{}{h.Config.Label}},
}}
if err := w.Client().QueueJobSendSpawnInfo(ctx, w.currentJob.wJob.ID, infos); err != nil {
return sdk.WrapError(err, "cannot record QueueJobSendSpawnInfo for job (err spawn): %d", w.currentJob.wJob.ID)
}

str := fmt.Sprintf("source %s ; echo '<<<ENVIRONMENT>>>' ; env", filepath)
Expand All @@ -821,22 +838,35 @@ func (w *CurrentWorker) executeHooksSetup(ctx context.Context, basedir afero.Fs,
}
}
}
return nil
})
}
w.currentJob.envFromHooks = result
return errors.WithStack(err)
return nil
}

func (w *CurrentWorker) executeHooksTeardown(_ context.Context, basedir afero.Fs, workingDir string) error {
err := afero.Walk(basedir, path.Join(workingDir, "setup"), func(path string, info fs.FileInfo, err error) error {
if info.IsDir() {
return nil
func (w *CurrentWorker) executeHooksTeardown(ctx context.Context, fs afero.Fs, workingDir string) error {
basedir, ok := fs.(*afero.BasePathFs)
if !ok {
return sdk.WithStack(fmt.Errorf("invalid given basedir"))
}

for _, h := range w.hooks {
filepath, err := basedir.RealPath(h.SetupPath)
if err != nil {
return sdk.WrapError(err, "cannot get real path for: %s", h.SetupPath)
}

infos := []sdk.SpawnInfo{{
RemoteTime: time.Now(),
Message: sdk.SpawnMsg{ID: sdk.MsgSpawnInfoWorkerHookRunTeardown.ID, Args: []interface{}{h.Config.Label}},
}}
if err := w.Client().QueueJobSendSpawnInfo(ctx, w.currentJob.wJob.ID, infos); err != nil {
return sdk.WrapError(err, "cannot record QueueJobSendSpawnInfo for job (err spawn): %d", w.currentJob.wJob.ID)
}
cmd := exec.Command("bash", "-c", path)

cmd := exec.Command("bash", "-c", filepath)
if output, err := cmd.CombinedOutput(); err != nil {
return errors.WithMessage(err, w.blur.String(string(output)))
}
return nil
})
return err
}
return nil
}
26 changes: 14 additions & 12 deletions engine/worker/internal/start_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ func TestStartWorkerWithABookedJob(t *testing.T) {
Status: sdk.StatusBuilding,
})

gock.New("http://cds-api.local").Post("/queue/workflows/42/spawn/infos").Times(3).
HeaderPresent("Authorization").
Reply(200).
JSON(nil)

gock.New("http://cds-api.local").Get("project/proj_key/workflows/workflow_name/runs/0").Times(1).
HeaderPresent("Authorization").
Reply(200).
Expand Down Expand Up @@ -284,14 +289,14 @@ export FOO_FROM_HOOK=BAR`,

var checkRequest gock.ObserverFunc = func(request *http.Request, mock gock.Mock) {
bodyContent, err := io.ReadAll(request.Body)
assert.NoError(t, err)
require.NoError(t, err)
request.Body = io.NopCloser(bytes.NewReader(bodyContent))
if mock != nil {
switch mock.Request().URLStruct.String() {
case "http://cds-api.local/queue/workflows/42/step":
var result sdk.StepStatus
err := json.Unmarshal(bodyContent, &result)
assert.NoError(t, err)
require.NoError(t, err)

switch result.StepOrder {
case 0:
Expand Down Expand Up @@ -320,15 +325,14 @@ export FOO_FROM_HOOK=BAR`,
}
case "http://cds-api.local/queue/workflows/42/result":
var result sdk.Result
err := json.Unmarshal(bodyContent, &result)
assert.NoError(t, err)
assert.Equal(t, int64(42), result.BuildID)
assert.Equal(t, sdk.StatusFail, result.Status)
require.NoError(t, json.Unmarshal(bodyContent, &result))
require.Equal(t, int64(42), result.BuildID)
require.Equal(t, sdk.StatusFail, result.Status)
if len(result.NewVariables) > 0 {
assert.Equal(t, "cds.build.newvar", result.NewVariables[0].Name)
require.Equal(t, "cds.build.newvar", result.NewVariables[0].Name)
// assert.Equal(t, "cds.semver", result.NewVariables[0].Name)
// assert.Equal(t, "git.describe", result.NewVariables[0].Name)
assert.Equal(t, "newval", result.NewVariables[0].Value)
require.Equal(t, "newval", result.NewVariables[0].Value)
} else {
t.Error("missing new variables")
}
Expand Down Expand Up @@ -363,8 +367,7 @@ export FOO_FROM_HOOK=BAR`,
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()

err = internal.StartWorker(ctx, w, 42)
assert.NoError(t, err)
require.NoError(t, internal.StartWorker(ctx, w, 42))

var isDone bool
if gock.IsDone() {
Expand All @@ -380,7 +383,7 @@ export FOO_FROM_HOOK=BAR`,
}
}
}
assert.True(t, isDone)
require.True(t, isDone)
if gock.HasUnmatchedRequest() {
reqs := gock.GetUnmatchedRequests()
for _, req := range reqs {
Expand Down Expand Up @@ -426,5 +429,4 @@ export FOO_FROM_HOOK=BAR`,
assert.Equal(t, 1, strings.Count(logBuffer.String(), "HATCHERY_REGION=local-test"))
assert.Equal(t, 1, strings.Count(logBuffer.String(), "BASEDIR="))
assert.Equal(t, 1, strings.Count(logBuffer.String(), "FOO_FROM_HOOK=BAR"))

}
7 changes: 7 additions & 0 deletions engine/worker/internal/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ type CurrentWorker struct {
}
client cdsclient.WorkerInterface
blur *sdk.Blur
hooks []workerHook
}

type workerHook struct {
Config sdk.WorkerHookSetupTeardownScripts
SetupPath string
TeardownPath string
}

// BuiltInAction defines builtin action signature
Expand Down
6 changes: 6 additions & 0 deletions sdk/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ var (
MsgWorkflowV3Preview = &Message{"MsgWorkflowV3Preview", trad{FR: "Le workflow a été généré en version 3 à partir d'une ancienne version", EN: "The workflow was generated in version 3 from an old version"}, nil, RunInfoTypeWarning}
MsgSpawnInfoDisableSecretInjection = &Message{"MsgSpawnInfoDisableSecretInjection", trad{EN: "⚠ Project's secrets were not automatically injected for this job because of a region prerequisite: %s"}, nil, RunInfoTypInfo}
MsgSpawnInfoManualSecretInjection = &Message{"MsgSpawnInfoManualSecretInjection", trad{EN: "Prerequisites of type secret matched %s secret(s)"}, nil, RunInfoTypInfo}
MsgSpawnInfoWorkerHookSetup = &Message{"MsgSpawnInfoWorkerHookSetup", trad{EN: "Setting up worker hook %q"}, nil, RunInfoTypInfo}
MsgSpawnInfoWorkerHookRun = &Message{"MsgSpawnInfoWorkerHookRun", trad{EN: "Running worker hook %q"}, nil, RunInfoTypInfo}
MsgSpawnInfoWorkerHookRunTeardown = &Message{"MsgSpawnInfoWorkerHookRunTeardown", trad{EN: "Running worker hook %q teardown"}, nil, RunInfoTypInfo}
)

// Messages contains all sdk Messages
Expand Down Expand Up @@ -178,6 +181,9 @@ var Messages = map[string]*Message{
MsgWorkflowV3Preview.ID: MsgWorkflowV3Preview,
MsgSpawnInfoDisableSecretInjection.ID: MsgSpawnInfoDisableSecretInjection,
MsgSpawnInfoManualSecretInjection.ID: MsgSpawnInfoManualSecretInjection,
MsgSpawnInfoWorkerHookSetup.ID: MsgSpawnInfoWorkerHookSetup,
MsgSpawnInfoWorkerHookRun.ID: MsgSpawnInfoWorkerHookRun,
MsgSpawnInfoWorkerHookRunTeardown.ID: MsgSpawnInfoWorkerHookRunTeardown,
}

//Message represent a struc format translated messages
Expand Down

0 comments on commit eb4c592

Please sign in to comment.