From eb4c592f524612c269fd72b63797bb1b0e60435a Mon Sep 17 00:00:00 2001 From: Richard LT Date: Thu, 2 Jun 2022 16:16:09 +0200 Subject: [PATCH] feat(worker): show running worker hooks in job spawn info (#6174) --- engine/worker/internal/run.go | 104 +++++++++++++++++---------- engine/worker/internal/start_test.go | 26 +++---- engine/worker/internal/types.go | 7 ++ sdk/messages.go | 6 ++ 4 files changed, 94 insertions(+), 49 deletions(-) diff --git a/engine/worker/internal/run.go b/engine/worker/internal/run.go index 196c865000..ef0a9540c1 100644 --- a/engine/worker/internal/run.go +++ b/engine/worker/internal/run.go @@ -6,7 +6,6 @@ import ( "context" "crypto/md5" "fmt" - "io/fs" "os" "os/exec" "os/user" @@ -738,64 +737,82 @@ func (w *CurrentWorker) setupHooks(ctx context.Context, jobInfo sdk.WorkflowNode // The error contains 'Executable file not found', the capa is not on the worker continue } + hookFilename := fmt.Sprintf("%d-%s-%s", hookConfig.Priority, integrationName, slug.Convert(hookConfig.Label)) - hookFilePath := path.Join(workingDir, "setup", hookFilename) - log.Info(ctx, "setting up hook %q", hookFilePath) - hookFile, err := fs.Create(hookFilePath) + w.hooks = append(w.hooks, workerHook{ + Config: hookConfig, + SetupPath: path.Join(workingDir, "setup", hookFilename), + TeardownPath: path.Join(workingDir, "teardown", hookFilename), + }) + } + + for _, h := range w.hooks { + infos := []sdk.SpawnInfo{{ + RemoteTime: time.Now(), + Message: sdk.SpawnMsg{ID: sdk.MsgSpawnInfoWorkerHookSetup.ID, Args: []interface{}{h.Config.Label}}, + }} + if err := w.Client().QueueJobSendSpawnInfo(ctx, w.currentJob.wJob.ID, infos); err != nil { + return sdk.WrapError(err, "cannot record QueueJobSendSpawnInfo for job (err spawn): %d", w.currentJob.wJob.ID) + } + + log.Info(ctx, "setting up hook at %q", h.SetupPath) + + hookFile, err := fs.Create(h.SetupPath) if err != nil { - return errors.Errorf("unable to open hook file %q in %q: %v", hookFilePath, w.basedir.Name(), err) + return errors.Errorf("unable to open hook file %q in %q: %v", h.SetupPath, w.basedir.Name(), err) } - if _, err := hookFile.WriteString(hookConfig.Setup); err != nil { + if _, err := hookFile.WriteString(h.Config.Setup); err != nil { _ = hookFile.Close - return errors.Errorf("unable to setup hook %q: %v", hookFilePath, err) + return errors.Errorf("unable to setup hook %q: %v", h.SetupPath, err) } if err := hookFile.Close(); err != nil { - return errors.Errorf("unable to setup hook %q: %v", hookFilePath, err) + return errors.Errorf("unable to setup hook %q: %v", h.SetupPath, err) } - hookFilePath = path.Join(workingDir, "teardown", hookFilename) - hookFile, err = fs.Create(hookFilePath) + hookFile, err = fs.Create(h.TeardownPath) if err != nil { - return errors.Errorf("unable to open hook file %q: %v", hookFilePath, err) + return errors.Errorf("unable to open hook file %q: %v", h.TeardownPath, err) } - if _, err := hookFile.WriteString(hookConfig.Teardown); err != nil { + if _, err := hookFile.WriteString(h.Config.Teardown); err != nil { _ = hookFile.Close - return errors.Errorf("unable to setup hook %q: %v", hookFilePath, err) + return errors.Errorf("unable to setup hook %q: %v", h.TeardownPath, err) } if err := hookFile.Close(); err != nil { - return errors.Errorf("unable to setup hook %q: %v", hookFilePath, err) + return errors.Errorf("unable to setup hook %q: %v", h.TeardownPath, err) } } } return nil } -func (w *CurrentWorker) executeHooksSetup(ctx context.Context, basedir afero.Fs, workingDir string) error { +func (w *CurrentWorker) executeHooksSetup(ctx context.Context, fs afero.Fs, workingDir string) error { if strings.EqualFold(runtime.GOOS, "windows") { log.Warn(ctx, "hooks are not supported on windows") return nil } var result = make(map[string]string) - var setupDir = path.Join(workingDir, "setup") - var absPath string - if x, ok := basedir.(*afero.BasePathFs); ok { - absPath, _ = x.RealPath(setupDir) - absPath, _ = filepath.Abs(path.Dir(absPath)) + basedir, ok := fs.(*afero.BasePathFs) + if !ok { + return sdk.WithStack(fmt.Errorf("invalid given basedir")) } - setupDir = filepath.Join(absPath, filepath.Base(setupDir)) - workerEnv := w.Environ() - err := filepath.Walk(setupDir, func(filepath string, info os.FileInfo, err error) error { + for _, h := range w.hooks { + filepath, err := basedir.RealPath(h.SetupPath) if err != nil { - return err + return sdk.WrapError(err, "cannot get real path for: %s", h.SetupPath) } - if info.IsDir() { - return nil + + infos := []sdk.SpawnInfo{{ + RemoteTime: time.Now(), + Message: sdk.SpawnMsg{ID: sdk.MsgSpawnInfoWorkerHookRun.ID, Args: []interface{}{h.Config.Label}}, + }} + if err := w.Client().QueueJobSendSpawnInfo(ctx, w.currentJob.wJob.ID, infos); err != nil { + return sdk.WrapError(err, "cannot record QueueJobSendSpawnInfo for job (err spawn): %d", w.currentJob.wJob.ID) } str := fmt.Sprintf("source %s ; echo '<<>>' ; env", filepath) @@ -821,22 +838,35 @@ func (w *CurrentWorker) executeHooksSetup(ctx context.Context, basedir afero.Fs, } } } - return nil - }) + } w.currentJob.envFromHooks = result - return errors.WithStack(err) + return nil } -func (w *CurrentWorker) executeHooksTeardown(_ context.Context, basedir afero.Fs, workingDir string) error { - err := afero.Walk(basedir, path.Join(workingDir, "setup"), func(path string, info fs.FileInfo, err error) error { - if info.IsDir() { - return nil +func (w *CurrentWorker) executeHooksTeardown(ctx context.Context, fs afero.Fs, workingDir string) error { + basedir, ok := fs.(*afero.BasePathFs) + if !ok { + return sdk.WithStack(fmt.Errorf("invalid given basedir")) + } + + for _, h := range w.hooks { + filepath, err := basedir.RealPath(h.SetupPath) + if err != nil { + return sdk.WrapError(err, "cannot get real path for: %s", h.SetupPath) + } + + infos := []sdk.SpawnInfo{{ + RemoteTime: time.Now(), + Message: sdk.SpawnMsg{ID: sdk.MsgSpawnInfoWorkerHookRunTeardown.ID, Args: []interface{}{h.Config.Label}}, + }} + if err := w.Client().QueueJobSendSpawnInfo(ctx, w.currentJob.wJob.ID, infos); err != nil { + return sdk.WrapError(err, "cannot record QueueJobSendSpawnInfo for job (err spawn): %d", w.currentJob.wJob.ID) } - cmd := exec.Command("bash", "-c", path) + + cmd := exec.Command("bash", "-c", filepath) if output, err := cmd.CombinedOutput(); err != nil { return errors.WithMessage(err, w.blur.String(string(output))) } - return nil - }) - return err + } + return nil } diff --git a/engine/worker/internal/start_test.go b/engine/worker/internal/start_test.go index 8b74322cc7..4e994b42ea 100644 --- a/engine/worker/internal/start_test.go +++ b/engine/worker/internal/start_test.go @@ -86,6 +86,11 @@ func TestStartWorkerWithABookedJob(t *testing.T) { Status: sdk.StatusBuilding, }) + gock.New("http://cds-api.local").Post("/queue/workflows/42/spawn/infos").Times(3). + HeaderPresent("Authorization"). + Reply(200). + JSON(nil) + gock.New("http://cds-api.local").Get("project/proj_key/workflows/workflow_name/runs/0").Times(1). HeaderPresent("Authorization"). Reply(200). @@ -284,14 +289,14 @@ export FOO_FROM_HOOK=BAR`, var checkRequest gock.ObserverFunc = func(request *http.Request, mock gock.Mock) { bodyContent, err := io.ReadAll(request.Body) - assert.NoError(t, err) + require.NoError(t, err) request.Body = io.NopCloser(bytes.NewReader(bodyContent)) if mock != nil { switch mock.Request().URLStruct.String() { case "http://cds-api.local/queue/workflows/42/step": var result sdk.StepStatus err := json.Unmarshal(bodyContent, &result) - assert.NoError(t, err) + require.NoError(t, err) switch result.StepOrder { case 0: @@ -320,15 +325,14 @@ export FOO_FROM_HOOK=BAR`, } case "http://cds-api.local/queue/workflows/42/result": var result sdk.Result - err := json.Unmarshal(bodyContent, &result) - assert.NoError(t, err) - assert.Equal(t, int64(42), result.BuildID) - assert.Equal(t, sdk.StatusFail, result.Status) + require.NoError(t, json.Unmarshal(bodyContent, &result)) + require.Equal(t, int64(42), result.BuildID) + require.Equal(t, sdk.StatusFail, result.Status) if len(result.NewVariables) > 0 { - assert.Equal(t, "cds.build.newvar", result.NewVariables[0].Name) + require.Equal(t, "cds.build.newvar", result.NewVariables[0].Name) // assert.Equal(t, "cds.semver", result.NewVariables[0].Name) // assert.Equal(t, "git.describe", result.NewVariables[0].Name) - assert.Equal(t, "newval", result.NewVariables[0].Value) + require.Equal(t, "newval", result.NewVariables[0].Value) } else { t.Error("missing new variables") } @@ -363,8 +367,7 @@ export FOO_FROM_HOOK=BAR`, ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) defer cancel() - err = internal.StartWorker(ctx, w, 42) - assert.NoError(t, err) + require.NoError(t, internal.StartWorker(ctx, w, 42)) var isDone bool if gock.IsDone() { @@ -380,7 +383,7 @@ export FOO_FROM_HOOK=BAR`, } } } - assert.True(t, isDone) + require.True(t, isDone) if gock.HasUnmatchedRequest() { reqs := gock.GetUnmatchedRequests() for _, req := range reqs { @@ -426,5 +429,4 @@ export FOO_FROM_HOOK=BAR`, assert.Equal(t, 1, strings.Count(logBuffer.String(), "HATCHERY_REGION=local-test")) assert.Equal(t, 1, strings.Count(logBuffer.String(), "BASEDIR=")) assert.Equal(t, 1, strings.Count(logBuffer.String(), "FOO_FROM_HOOK=BAR")) - } diff --git a/engine/worker/internal/types.go b/engine/worker/internal/types.go index 88e96a5eb4..96cb0f0e0c 100644 --- a/engine/worker/internal/types.go +++ b/engine/worker/internal/types.go @@ -69,6 +69,13 @@ type CurrentWorker struct { } client cdsclient.WorkerInterface blur *sdk.Blur + hooks []workerHook +} + +type workerHook struct { + Config sdk.WorkerHookSetupTeardownScripts + SetupPath string + TeardownPath string } // BuiltInAction defines builtin action signature diff --git a/sdk/messages.go b/sdk/messages.go index 64b194fda6..680568b146 100644 --- a/sdk/messages.go +++ b/sdk/messages.go @@ -97,6 +97,9 @@ var ( MsgWorkflowV3Preview = &Message{"MsgWorkflowV3Preview", trad{FR: "Le workflow a été généré en version 3 à partir d'une ancienne version", EN: "The workflow was generated in version 3 from an old version"}, nil, RunInfoTypeWarning} MsgSpawnInfoDisableSecretInjection = &Message{"MsgSpawnInfoDisableSecretInjection", trad{EN: "⚠ Project's secrets were not automatically injected for this job because of a region prerequisite: %s"}, nil, RunInfoTypInfo} MsgSpawnInfoManualSecretInjection = &Message{"MsgSpawnInfoManualSecretInjection", trad{EN: "Prerequisites of type secret matched %s secret(s)"}, nil, RunInfoTypInfo} + MsgSpawnInfoWorkerHookSetup = &Message{"MsgSpawnInfoWorkerHookSetup", trad{EN: "Setting up worker hook %q"}, nil, RunInfoTypInfo} + MsgSpawnInfoWorkerHookRun = &Message{"MsgSpawnInfoWorkerHookRun", trad{EN: "Running worker hook %q"}, nil, RunInfoTypInfo} + MsgSpawnInfoWorkerHookRunTeardown = &Message{"MsgSpawnInfoWorkerHookRunTeardown", trad{EN: "Running worker hook %q teardown"}, nil, RunInfoTypInfo} ) // Messages contains all sdk Messages @@ -178,6 +181,9 @@ var Messages = map[string]*Message{ MsgWorkflowV3Preview.ID: MsgWorkflowV3Preview, MsgSpawnInfoDisableSecretInjection.ID: MsgSpawnInfoDisableSecretInjection, MsgSpawnInfoManualSecretInjection.ID: MsgSpawnInfoManualSecretInjection, + MsgSpawnInfoWorkerHookSetup.ID: MsgSpawnInfoWorkerHookSetup, + MsgSpawnInfoWorkerHookRun.ID: MsgSpawnInfoWorkerHookRun, + MsgSpawnInfoWorkerHookRunTeardown.ID: MsgSpawnInfoWorkerHookRunTeardown, } //Message represent a struc format translated messages