Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(worker): show running worker hooks in job spawn info #6174

Merged
merged 2 commits into from
Jun 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 67 additions & 37 deletions engine/worker/internal/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"context"
"crypto/md5"
"fmt"
"io/fs"
"os"
"os/exec"
"os/user"
Expand Down Expand Up @@ -738,64 +737,82 @@ func (w *CurrentWorker) setupHooks(ctx context.Context, jobInfo sdk.WorkflowNode
// The error contains 'Executable file not found', the capa is not on the worker
continue
}

hookFilename := fmt.Sprintf("%d-%s-%s", hookConfig.Priority, integrationName, slug.Convert(hookConfig.Label))
hookFilePath := path.Join(workingDir, "setup", hookFilename)
log.Info(ctx, "setting up hook %q", hookFilePath)

hookFile, err := fs.Create(hookFilePath)
w.hooks = append(w.hooks, workerHook{
Config: hookConfig,
SetupPath: path.Join(workingDir, "setup", hookFilename),
TeardownPath: path.Join(workingDir, "teardown", hookFilename),
})
}

for _, h := range w.hooks {
infos := []sdk.SpawnInfo{{
RemoteTime: time.Now(),
Message: sdk.SpawnMsg{ID: sdk.MsgSpawnInfoWorkerHookSetup.ID, Args: []interface{}{h.Config.Label}},
}}
if err := w.Client().QueueJobSendSpawnInfo(ctx, w.currentJob.wJob.ID, infos); err != nil {
return sdk.WrapError(err, "cannot record QueueJobSendSpawnInfo for job (err spawn): %d", w.currentJob.wJob.ID)
}

log.Info(ctx, "setting up hook at %q", h.SetupPath)

hookFile, err := fs.Create(h.SetupPath)
if err != nil {
return errors.Errorf("unable to open hook file %q in %q: %v", hookFilePath, w.basedir.Name(), err)
return errors.Errorf("unable to open hook file %q in %q: %v", h.SetupPath, w.basedir.Name(), err)
}
if _, err := hookFile.WriteString(hookConfig.Setup); err != nil {
if _, err := hookFile.WriteString(h.Config.Setup); err != nil {
_ = hookFile.Close
return errors.Errorf("unable to setup hook %q: %v", hookFilePath, err)
return errors.Errorf("unable to setup hook %q: %v", h.SetupPath, err)
}
if err := hookFile.Close(); err != nil {
return errors.Errorf("unable to setup hook %q: %v", hookFilePath, err)
return errors.Errorf("unable to setup hook %q: %v", h.SetupPath, err)
}

hookFilePath = path.Join(workingDir, "teardown", hookFilename)
hookFile, err = fs.Create(hookFilePath)
hookFile, err = fs.Create(h.TeardownPath)
if err != nil {
return errors.Errorf("unable to open hook file %q: %v", hookFilePath, err)
return errors.Errorf("unable to open hook file %q: %v", h.TeardownPath, err)
}
if _, err := hookFile.WriteString(hookConfig.Teardown); err != nil {
if _, err := hookFile.WriteString(h.Config.Teardown); err != nil {
_ = hookFile.Close
return errors.Errorf("unable to setup hook %q: %v", hookFilePath, err)
return errors.Errorf("unable to setup hook %q: %v", h.TeardownPath, err)
}
if err := hookFile.Close(); err != nil {
return errors.Errorf("unable to setup hook %q: %v", hookFilePath, err)
return errors.Errorf("unable to setup hook %q: %v", h.TeardownPath, err)
}
}
}
return nil
}

func (w *CurrentWorker) executeHooksSetup(ctx context.Context, basedir afero.Fs, workingDir string) error {
func (w *CurrentWorker) executeHooksSetup(ctx context.Context, fs afero.Fs, workingDir string) error {
if strings.EqualFold(runtime.GOOS, "windows") {
log.Warn(ctx, "hooks are not supported on windows")
return nil
}

var result = make(map[string]string)
var setupDir = path.Join(workingDir, "setup")

var absPath string
if x, ok := basedir.(*afero.BasePathFs); ok {
absPath, _ = x.RealPath(setupDir)
absPath, _ = filepath.Abs(path.Dir(absPath))
basedir, ok := fs.(*afero.BasePathFs)
if !ok {
return sdk.WithStack(fmt.Errorf("invalid given basedir"))
}

setupDir = filepath.Join(absPath, filepath.Base(setupDir))

workerEnv := w.Environ()

err := filepath.Walk(setupDir, func(filepath string, info os.FileInfo, err error) error {
for _, h := range w.hooks {
filepath, err := basedir.RealPath(h.SetupPath)
if err != nil {
return err
return sdk.WrapError(err, "cannot get real path for: %s", h.SetupPath)
}
if info.IsDir() {
return nil

infos := []sdk.SpawnInfo{{
RemoteTime: time.Now(),
Message: sdk.SpawnMsg{ID: sdk.MsgSpawnInfoWorkerHookRun.ID, Args: []interface{}{h.Config.Label}},
}}
if err := w.Client().QueueJobSendSpawnInfo(ctx, w.currentJob.wJob.ID, infos); err != nil {
return sdk.WrapError(err, "cannot record QueueJobSendSpawnInfo for job (err spawn): %d", w.currentJob.wJob.ID)
}

str := fmt.Sprintf("source %s ; echo '<<<ENVIRONMENT>>>' ; env", filepath)
Expand All @@ -821,22 +838,35 @@ func (w *CurrentWorker) executeHooksSetup(ctx context.Context, basedir afero.Fs,
}
}
}
return nil
})
}
w.currentJob.envFromHooks = result
return errors.WithStack(err)
return nil
}

func (w *CurrentWorker) executeHooksTeardown(_ context.Context, basedir afero.Fs, workingDir string) error {
err := afero.Walk(basedir, path.Join(workingDir, "setup"), func(path string, info fs.FileInfo, err error) error {
if info.IsDir() {
return nil
func (w *CurrentWorker) executeHooksTeardown(ctx context.Context, fs afero.Fs, workingDir string) error {
basedir, ok := fs.(*afero.BasePathFs)
if !ok {
return sdk.WithStack(fmt.Errorf("invalid given basedir"))
}

for _, h := range w.hooks {
filepath, err := basedir.RealPath(h.SetupPath)
if err != nil {
return sdk.WrapError(err, "cannot get real path for: %s", h.SetupPath)
}

infos := []sdk.SpawnInfo{{
RemoteTime: time.Now(),
Message: sdk.SpawnMsg{ID: sdk.MsgSpawnInfoWorkerHookRunTeardown.ID, Args: []interface{}{h.Config.Label}},
}}
if err := w.Client().QueueJobSendSpawnInfo(ctx, w.currentJob.wJob.ID, infos); err != nil {
return sdk.WrapError(err, "cannot record QueueJobSendSpawnInfo for job (err spawn): %d", w.currentJob.wJob.ID)
}
cmd := exec.Command("bash", "-c", path)

cmd := exec.Command("bash", "-c", filepath)
if output, err := cmd.CombinedOutput(); err != nil {
return errors.WithMessage(err, w.blur.String(string(output)))
}
return nil
})
return err
}
return nil
}
26 changes: 14 additions & 12 deletions engine/worker/internal/start_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ func TestStartWorkerWithABookedJob(t *testing.T) {
Status: sdk.StatusBuilding,
})

gock.New("http://cds-api.local").Post("/queue/workflows/42/spawn/infos").Times(3).
HeaderPresent("Authorization").
Reply(200).
JSON(nil)

gock.New("http://cds-api.local").Get("project/proj_key/workflows/workflow_name/runs/0").Times(1).
HeaderPresent("Authorization").
Reply(200).
Expand Down Expand Up @@ -284,14 +289,14 @@ export FOO_FROM_HOOK=BAR`,

var checkRequest gock.ObserverFunc = func(request *http.Request, mock gock.Mock) {
bodyContent, err := io.ReadAll(request.Body)
assert.NoError(t, err)
require.NoError(t, err)
request.Body = io.NopCloser(bytes.NewReader(bodyContent))
if mock != nil {
switch mock.Request().URLStruct.String() {
case "http://cds-api.local/queue/workflows/42/step":
var result sdk.StepStatus
err := json.Unmarshal(bodyContent, &result)
assert.NoError(t, err)
require.NoError(t, err)

switch result.StepOrder {
case 0:
Expand Down Expand Up @@ -320,15 +325,14 @@ export FOO_FROM_HOOK=BAR`,
}
case "http://cds-api.local/queue/workflows/42/result":
var result sdk.Result
err := json.Unmarshal(bodyContent, &result)
assert.NoError(t, err)
assert.Equal(t, int64(42), result.BuildID)
assert.Equal(t, sdk.StatusFail, result.Status)
require.NoError(t, json.Unmarshal(bodyContent, &result))
require.Equal(t, int64(42), result.BuildID)
require.Equal(t, sdk.StatusFail, result.Status)
if len(result.NewVariables) > 0 {
assert.Equal(t, "cds.build.newvar", result.NewVariables[0].Name)
require.Equal(t, "cds.build.newvar", result.NewVariables[0].Name)
// assert.Equal(t, "cds.semver", result.NewVariables[0].Name)
// assert.Equal(t, "git.describe", result.NewVariables[0].Name)
assert.Equal(t, "newval", result.NewVariables[0].Value)
require.Equal(t, "newval", result.NewVariables[0].Value)
} else {
t.Error("missing new variables")
}
Expand Down Expand Up @@ -363,8 +367,7 @@ export FOO_FROM_HOOK=BAR`,
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()

err = internal.StartWorker(ctx, w, 42)
assert.NoError(t, err)
require.NoError(t, internal.StartWorker(ctx, w, 42))

var isDone bool
if gock.IsDone() {
Expand All @@ -380,7 +383,7 @@ export FOO_FROM_HOOK=BAR`,
}
}
}
assert.True(t, isDone)
require.True(t, isDone)
if gock.HasUnmatchedRequest() {
reqs := gock.GetUnmatchedRequests()
for _, req := range reqs {
Expand Down Expand Up @@ -426,5 +429,4 @@ export FOO_FROM_HOOK=BAR`,
assert.Equal(t, 1, strings.Count(logBuffer.String(), "HATCHERY_REGION=local-test"))
assert.Equal(t, 1, strings.Count(logBuffer.String(), "BASEDIR="))
assert.Equal(t, 1, strings.Count(logBuffer.String(), "FOO_FROM_HOOK=BAR"))

}
7 changes: 7 additions & 0 deletions engine/worker/internal/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ type CurrentWorker struct {
}
client cdsclient.WorkerInterface
blur *sdk.Blur
hooks []workerHook
}

type workerHook struct {
Config sdk.WorkerHookSetupTeardownScripts
SetupPath string
TeardownPath string
}

// BuiltInAction defines builtin action signature
Expand Down
6 changes: 6 additions & 0 deletions sdk/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ var (
MsgWorkflowV3Preview = &Message{"MsgWorkflowV3Preview", trad{FR: "Le workflow a été généré en version 3 à partir d'une ancienne version", EN: "The workflow was generated in version 3 from an old version"}, nil, RunInfoTypeWarning}
MsgSpawnInfoDisableSecretInjection = &Message{"MsgSpawnInfoDisableSecretInjection", trad{EN: "⚠ Project's secrets were not automatically injected for this job because of a region prerequisite: %s"}, nil, RunInfoTypInfo}
MsgSpawnInfoManualSecretInjection = &Message{"MsgSpawnInfoManualSecretInjection", trad{EN: "Prerequisites of type secret matched %s secret(s)"}, nil, RunInfoTypInfo}
MsgSpawnInfoWorkerHookSetup = &Message{"MsgSpawnInfoWorkerHookSetup", trad{EN: "Setting up worker hook %q"}, nil, RunInfoTypInfo}
MsgSpawnInfoWorkerHookRun = &Message{"MsgSpawnInfoWorkerHookRun", trad{EN: "Running worker hook %q"}, nil, RunInfoTypInfo}
MsgSpawnInfoWorkerHookRunTeardown = &Message{"MsgSpawnInfoWorkerHookRunTeardown", trad{EN: "Running worker hook %q teardown"}, nil, RunInfoTypInfo}
)

// Messages contains all sdk Messages
Expand Down Expand Up @@ -178,6 +181,9 @@ var Messages = map[string]*Message{
MsgWorkflowV3Preview.ID: MsgWorkflowV3Preview,
MsgSpawnInfoDisableSecretInjection.ID: MsgSpawnInfoDisableSecretInjection,
MsgSpawnInfoManualSecretInjection.ID: MsgSpawnInfoManualSecretInjection,
MsgSpawnInfoWorkerHookSetup.ID: MsgSpawnInfoWorkerHookSetup,
MsgSpawnInfoWorkerHookRun.ID: MsgSpawnInfoWorkerHookRun,
MsgSpawnInfoWorkerHookRunTeardown.ID: MsgSpawnInfoWorkerHookRunTeardown,
}

//Message represent a struc format translated messages
Expand Down