From 56f3db497553a4e0fe9051448a63f8e8a1f92f13 Mon Sep 17 00:00:00 2001 From: Zhenya Tikhonov Date: Mon, 2 Dec 2024 23:50:14 +0400 Subject: [PATCH] feat(venona): report task status to te platform (#522) --- charts/cf-runtime/Chart.yaml | 6 +- charts/cf-runtime/README.md | 2 +- .../_main-container.yaml | 1 + .../runtime/runtime-env-spec-tmpl.yaml | 1 + .../private_registry_test.yaml | 1 + .../tests/runtime/runtime_onprem_test.yaml | 1 + .../tests/runtime/runtime_test.yaml | 1 + venona/VERSION | 2 +- venona/cmd/start.go | 6 +- venona/pkg/agent/agent.go | 43 ++++++++--- venona/pkg/agent/agent_test.go | 14 ++-- venona/pkg/codefresh/codefresh.go | 39 ++++++++-- venona/pkg/codefresh/codefresh_mock.go | 44 +++++++++++ venona/pkg/codefresh/codefresh_test.go | 26 ++++++- venona/pkg/errors/errors.go | 24 ++++++ venona/pkg/kubernetes/kubernetes.go | 52 ++++++++++--- venona/pkg/kubernetes/kubernetes_test.go | 73 +++++++++++++++++++ venona/pkg/queue/queue.go | 46 +++++++++--- venona/pkg/queue/queue_test.go | 4 +- venona/pkg/runtime/runtime.go | 33 ++++++--- venona/pkg/task/task.go | 31 +++++++- venona/pkg/workflow/workflow.go | 4 +- venonactl/VERSION | 2 +- 23 files changed, 389 insertions(+), 67 deletions(-) create mode 100644 venona/pkg/errors/errors.go diff --git a/charts/cf-runtime/Chart.yaml b/charts/cf-runtime/Chart.yaml index fcfe86b0..c9fae26b 100644 --- a/charts/cf-runtime/Chart.yaml +++ b/charts/cf-runtime/Chart.yaml @@ -1,7 +1,7 @@ apiVersion: v2 description: A Helm chart for Codefresh Runner name: cf-runtime -version: 7.1.10 +version: 7.2.0 keywords: - codefresh - runner @@ -17,8 +17,8 @@ annotations: artifacthub.io/containsSecurityUpdates: "false" # Supported kinds: `added`, `changed`, `deprecated`, `removed`, `fixed`, `security`: artifacthub.io/changes: | - - kind: fixed - description: "Using cache for multiple tags (Engine 1.174.19)" + - kind: added + description: "Chart version exposed to the `venona` and `engine` pods" dependencies: - name: cf-common repository: oci://quay.io/codefresh/charts diff --git a/charts/cf-runtime/README.md b/charts/cf-runtime/README.md index 60a0d410..a3cfdb44 100644 --- a/charts/cf-runtime/README.md +++ b/charts/cf-runtime/README.md @@ -1,6 +1,6 @@ ## Codefresh Runner -![Version: 7.1.10](https://img.shields.io/badge/Version-7.1.10-informational?style=flat-square) +![Version: 7.2.0](https://img.shields.io/badge/Version-7.2.0-informational?style=flat-square) Helm chart for deploying [Codefresh Runner](https://codefresh.io/docs/docs/installation/codefresh-runner/) to Kubernetes. diff --git a/charts/cf-runtime/templates/_components/runner/environment-variables/_main-container.yaml b/charts/cf-runtime/templates/_components/runner/environment-variables/_main-container.yaml index 4d3f0304..a9e3b78d 100644 --- a/charts/cf-runtime/templates/_components/runner/environment-variables/_main-container.yaml +++ b/charts/cf-runtime/templates/_components/runner/environment-variables/_main-container.yaml @@ -16,6 +16,7 @@ CODEFRESH_TOKEN: name: {{ include "runner.fullname" . }} key: agent-codefresh-token DOCKER_REGISTRY: {{ .Values.global.imageRegistry }} +RUNTIME_CHART_VERSION: {{ .Chart.Version }} {{- end }} {{- define "runner.environment-variables" }} diff --git a/charts/cf-runtime/templates/runtime/runtime-env-spec-tmpl.yaml b/charts/cf-runtime/templates/runtime/runtime-env-spec-tmpl.yaml index d1a8bfad..b0d08664 100644 --- a/charts/cf-runtime/templates/runtime/runtime-env-spec-tmpl.yaml +++ b/charts/cf-runtime/templates/runtime/runtime-env-spec-tmpl.yaml @@ -37,6 +37,7 @@ runtimeScheduler: CR_6177_FIXER: {{ include "runtime.runtimeImageName" (dict "registry" $imageRegistry "imageFullName" $engineContext.runtimeImages.CR_6177_FIXER) | squote }} GC_BUILDER_IMAGE: {{ include "runtime.runtimeImageName" (dict "registry" $imageRegistry "imageFullName" $engineContext.runtimeImages.GC_BUILDER_IMAGE) | squote }} COSIGN_IMAGE_SIGNER_IMAGE: {{ include "runtime.runtimeImageName" (dict "registry" $imageRegistry "imageFullName" $engineContext.runtimeImages.COSIGN_IMAGE_SIGNER_IMAGE) | squote }} + RUNTIME_CHART_VERSION: {{ .Chart.Version }} {{- with $engineContext.userEnvVars }} userEnvVars: {{- toYaml . | nindent 4 }} {{- end }} diff --git a/charts/cf-runtime/tests/private-registry/private_registry_test.yaml b/charts/cf-runtime/tests/private-registry/private_registry_test.yaml index f6e5ac58..a416dc24 100644 --- a/charts/cf-runtime/tests/private-registry/private_registry_test.yaml +++ b/charts/cf-runtime/tests/private-registry/private_registry_test.yaml @@ -57,6 +57,7 @@ tests: CR_6177_FIXER: 'somedomain.io/alpine:tagoverride' GC_BUILDER_IMAGE: 'somedomain.io/codefresh/cf-gc-builder:tagoverride' COSIGN_IMAGE_SIGNER_IMAGE: 'somedomain.io/codefresh/cf-cosign-image-signer:tagoverride' + RUNTIME_CHART_VERSION: [\w.-]+ workflowLimits: MAXIMUM_ALLOWED_TIME_BEFORE_PRE_STEPS_SUCCESS: 600 MAXIMUM_ALLOWED_WORKFLOW_AGE_BEFORE_TERMINATION: 86400 diff --git a/charts/cf-runtime/tests/runtime/runtime_onprem_test.yaml b/charts/cf-runtime/tests/runtime/runtime_onprem_test.yaml index f37c719b..546950d7 100644 --- a/charts/cf-runtime/tests/runtime/runtime_onprem_test.yaml +++ b/charts/cf-runtime/tests/runtime/runtime_onprem_test.yaml @@ -70,6 +70,7 @@ tests: CR_6177_FIXER: 'alpine:tagoverride' GC_BUILDER_IMAGE: 'quay.io/codefresh/cf-gc-builder:tagoverride' COSIGN_IMAGE_SIGNER_IMAGE: 'quay.io/codefresh/cf-cosign-image-signer:tagoverride' + RUNTIME_CHART_VERSION: [\w.-]+ workflowLimits: MAXIMUM_ALLOWED_TIME_BEFORE_PRE_STEPS_SUCCESS: 600 MAXIMUM_ALLOWED_WORKFLOW_AGE_BEFORE_TERMINATION: 86400 diff --git a/charts/cf-runtime/tests/runtime/runtime_test.yaml b/charts/cf-runtime/tests/runtime/runtime_test.yaml index 3f17dfca..6d963013 100644 --- a/charts/cf-runtime/tests/runtime/runtime_test.yaml +++ b/charts/cf-runtime/tests/runtime/runtime_test.yaml @@ -71,6 +71,7 @@ tests: CR_6177_FIXER: 'alpine:tagoverride' GC_BUILDER_IMAGE: 'quay.io/codefresh/cf-gc-builder:tagoverride' COSIGN_IMAGE_SIGNER_IMAGE: 'quay.io/codefresh/cf-cosign-image-signer:tagoverride' + RUNTIME_CHART_VERSION: [\w.-]+ userEnvVars: - name: ALICE valueFrom: diff --git a/venona/VERSION b/venona/VERSION index 3cc3669f..227cea21 100644 --- a/venona/VERSION +++ b/venona/VERSION @@ -1 +1 @@ -1.10.8 +2.0.0 diff --git a/venona/cmd/start.go b/venona/cmd/start.go index 566b8f19..c09ff3e5 100644 --- a/venona/cmd/start.go +++ b/venona/cmd/start.go @@ -230,9 +230,13 @@ func run(options startOptions) { httpClient.Transport = monitor.NewRoundTripper(httpClient.Transport) + userAgent := fmt.Sprintf("cf-classic-runner/%s", version) + if runtimeVersion := os.Getenv("RUNTIME_CHART_VERSION"); runtimeVersion != "" { + userAgent += fmt.Sprintf(" cf-classic-runtime/%s", runtimeVersion) + } httpHeaders := http.Header{} { - httpHeaders.Add("User-Agent", fmt.Sprintf("codefresh-runner-%s", version)) + httpHeaders.Add("User-Agent", userAgent) } cf = codefresh.New(codefresh.Options{ diff --git a/venona/pkg/agent/agent.go b/venona/pkg/agent/agent.go index 398d0b12..6956b877 100644 --- a/venona/pkg/agent/agent.go +++ b/venona/pkg/agent/agent.go @@ -125,6 +125,7 @@ func New(opts *Options) (*Agent, error) { Monitor: opts.Monitor, Concurrency: opts.Concurrency, BufferSize: opts.BufferSize, + Codefresh: opts.Codefresh, }) return &Agent{ id: id, @@ -192,7 +193,7 @@ func (a *Agent) startTaskPullerRoutine(ctx context.Context) { // perform all agentTasks (in goroutine) for i := range agentTasks { - a.handleAgentTask(&agentTasks[i]) + a.handleAgentTask(ctx, &agentTasks[i]) } // send all wfTasks to tasksQueue @@ -241,6 +242,25 @@ func (a *Agent) reportStatus(ctx context.Context, status codefresh.AgentStatus) } } +func (a *Agent) reportTaskStatus(ctx context.Context, taskDef task.Task, err error) { + status := task.TaskStatus{ + OccurredAt: time.Now(), + StatusRevision: taskDef.Metadata.CurrentStatusRevision + 1, + } + if err != nil { + status.Status = task.StatusError + status.Reason = err.Error() + status.IsRetriable = true // TODO: make this configurable depending on the error + } else { + status.Status = task.StatusSuccess + } + + statusErr := a.cf.ReportTaskStatus(ctx, taskDef.Id, status) + if statusErr != nil { + a.log.Error("failed reporting task status", "error", statusErr, "task", taskDef.Id, "workflow", taskDef.Metadata.WorkflowId) + } +} + func (a *Agent) getTasks(ctx context.Context) (task.Tasks, []*workflow.Workflow) { tasks := a.pullTasks(ctx) return a.splitTasks(tasks) @@ -276,10 +296,10 @@ func (a *Agent) splitTasks(tasks task.Tasks) (task.Tasks, []*workflow.Workflow) t.Timeline.Pulled = pullTime agentTasks = append(agentTasks, t) case task.TypeCreatePod, task.TypeCreatePVC, task.TypeDeletePod, task.TypeDeletePVC: - wf, ok := wfMap[t.Metadata.Workflow] + wf, ok := wfMap[t.Metadata.WorkflowId] if !ok { wf = workflow.New(t.Metadata) - wfMap[t.Metadata.Workflow] = wf + wfMap[t.Metadata.WorkflowId] = wf } err := wf.AddTask(&t) @@ -287,7 +307,7 @@ func (a *Agent) splitTasks(tasks task.Tasks) (task.Tasks, []*workflow.Workflow) a.log.Error("failed adding task to workflow", "error", err) } default: - a.log.Error("unrecognized task type", "type", t.Type, "tid", t.Metadata.Workflow, "runtime", t.Metadata.ReName) + a.log.Error("unrecognized task type", "type", t.Type, "tid", t.Metadata.WorkflowId, "runtime", t.Metadata.ReName) } } @@ -313,14 +333,14 @@ func (a *Agent) splitTasks(tasks task.Tasks) (task.Tasks, []*workflow.Workflow) return agentTasks, workflows } -func (a *Agent) handleAgentTask(t *task.Task) { - a.log.Info("executing agent task", "tid", t.Metadata.Workflow) +func (a *Agent) handleAgentTask(ctx context.Context, t *task.Task) { + a.log.Info("executing agent task", "tid", t.Metadata.WorkflowId) a.wg.Add(1) go func() { defer a.wg.Done() txn := task.NewTaskTransaction(a.monitor, t.Metadata) defer txn.End() - err := a.executeAgentTask(t) + err := a.executeAgentTask(ctx, t) if err != nil { a.log.Error(err.Error()) @@ -330,7 +350,7 @@ func (a *Agent) handleAgentTask(t *task.Task) { }() } -func (a *Agent) executeAgentTask(t *task.Task) error { +func (a *Agent) executeAgentTask(ctx context.Context, t *task.Task) error { t.Timeline.Started = time.Now() specJSON, err := json.Marshal(t.Spec) if err != nil { @@ -348,9 +368,12 @@ func (a *Agent) executeAgentTask(t *task.Task) error { } err = e(&spec, a.log) + if t.Metadata.ShouldReportStatus { + a.reportTaskStatus(ctx, *t, err) + } sinceCreation, inRunner, processed := t.GetLatency() a.log.Info("Done handling agent task", - "tid", t.Metadata.Workflow, + "tid", t.Metadata.WorkflowId, "time since creation", sinceCreation, "time in runner", inRunner, "processing time", processed, @@ -410,7 +433,7 @@ func proxyRequest(t *task.AgentTask, log logger.Logger) error { func groupTasks(tasks task.Tasks) map[string]task.Tasks { candidates := map[string]task.Tasks{} for _, task := range tasks { - name := task.Metadata.Workflow + name := task.Metadata.WorkflowId if name == "" { // If for some reason the task is not related to any workflow // Might heppen in older versions on Codefresh diff --git a/venona/pkg/agent/agent_test.go b/venona/pkg/agent/agent_test.go index 51576efe..dd2bf69e 100644 --- a/venona/pkg/agent/agent_test.go +++ b/venona/pkg/agent/agent_test.go @@ -38,17 +38,17 @@ func Test_groupTasks(t *testing.T) { tasks: task.Tasks{ { Metadata: task.Metadata{ - Workflow: "1", + WorkflowId: "1", }, }, { Metadata: task.Metadata{ - Workflow: "2", + WorkflowId: "2", }, }, { Metadata: task.Metadata{ - Workflow: "1", + WorkflowId: "1", }, }, }, @@ -56,19 +56,19 @@ func Test_groupTasks(t *testing.T) { "1": { { Metadata: task.Metadata{ - Workflow: "1", + WorkflowId: "1", }, }, { Metadata: task.Metadata{ - Workflow: "1", + WorkflowId: "1", }, }, }, "2": { { Metadata: task.Metadata{ - Workflow: "2", + WorkflowId: "2", }, }, }, @@ -259,7 +259,7 @@ func Test_executeAgentTask(t *testing.T) { a := &Agent{ log: logger.New(logger.Options{}), } - err := a.executeAgentTask(tt.task) + err := a.executeAgentTask(context.Background(), tt.task) if err != nil || tt.wantErr != "" { assert.EqualError(t, err, tt.wantErr) } diff --git a/venona/pkg/codefresh/codefresh.go b/venona/pkg/codefresh/codefresh.go index be30c787..bc87c6de 100644 --- a/venona/pkg/codefresh/codefresh.go +++ b/venona/pkg/codefresh/codefresh.go @@ -34,6 +34,7 @@ type ( // Codefresh API client Codefresh interface { Tasks(ctx context.Context) (task.Tasks, error) + ReportTaskStatus(ctx context.Context, id string, status task.TaskStatus) error ReportStatus(ctx context.Context, status AgentStatus) error Host() string } @@ -79,7 +80,10 @@ func New(opts Options) Codefresh { // Tasks get from Codefresh all latest tasks func (c cf) Tasks(ctx context.Context) (task.Tasks, error) { - res, err := c.doRequest(ctx, "GET", nil, "api", "agent", c.agentID, "tasks") + query := map[string]string{ + "waitForStatusReport": "true", + } + res, err := c.doRequest(ctx, "GET", nil, query, "api", "agent", c.agentID, "tasks") if err != nil { return nil, err } @@ -92,6 +96,20 @@ func (c cf) Tasks(ctx context.Context) (task.Tasks, error) { return tasks, nil } +func (c cf) ReportTaskStatus(ctx context.Context, id string, status task.TaskStatus) error { + s, err := status.Marshal() + if err != nil { + return fmt.Errorf("failed marshalling when reporting task status: %w", err) + } + + _, err = c.doRequest(ctx, "POST", bytes.NewBuffer(s), nil, "api", "agent", c.agentID, "tasks", string(id), "statuses") + if err != nil { + return fmt.Errorf("failed sending request when reporting task status: %w", err) + } + + return nil +} + // Host returns the host func (c cf) Host() string { return c.host @@ -104,7 +122,7 @@ func (c cf) ReportStatus(ctx context.Context, status AgentStatus) error { return fmt.Errorf("failed marshalling when reporting status: %w", err) } - _, err = c.doRequest(ctx, "PUT", bytes.NewBuffer(s), "api", "agent", c.agentID, "status") + _, err = c.doRequest(ctx, "PUT", bytes.NewBuffer(s), nil, "api", "agent", c.agentID, "status") if err != nil { return fmt.Errorf("failed sending request when reporting status: %w", err) } @@ -119,7 +137,7 @@ func (c cf) buildErrorFromResponse(status int, body []byte) error { } } -func (c cf) prepareURL(paths ...string) (*url.URL, error) { +func (c cf) prepareURL(query map[string]string, paths ...string) (*url.URL, error) { u, err := url.Parse(c.host) if err != nil { return nil, err @@ -135,11 +153,18 @@ func (c cf) prepareURL(paths ...string) (*url.URL, error) { u.Path = path.Join(accPath...) u.RawPath = path.Join(accRawPath...) + + rawQuery := url.Values{} + for k, v := range query { + rawQuery.Set(k, v) + } + u.RawQuery = rawQuery.Encode() + return u, nil } -func (c cf) prepareRequest(method string, data io.Reader, apis ...string) (*http.Request, error) { - u, err := c.prepareURL(apis...) +func (c cf) prepareRequest(method string, data io.Reader, query map[string]string, apis ...string) (*http.Request, error) { + u, err := c.prepareURL(query, apis...) if err != nil { return nil, err } @@ -158,8 +183,8 @@ func (c cf) prepareRequest(method string, data io.Reader, apis ...string) (*http return req, nil } -func (c cf) doRequest(ctx context.Context, method string, body io.Reader, apis ...string) ([]byte, error) { - req, err := c.prepareRequest(method, body, apis...) +func (c cf) doRequest(ctx context.Context, method string, body io.Reader, query map[string]string, apis ...string) ([]byte, error) { + req, err := c.prepareRequest(method, body, query, apis...) if err != nil { return nil, err } diff --git a/venona/pkg/codefresh/codefresh_mock.go b/venona/pkg/codefresh/codefresh_mock.go index cb94e639..61e5f1ea 100644 --- a/venona/pkg/codefresh/codefresh_mock.go +++ b/venona/pkg/codefresh/codefresh_mock.go @@ -174,6 +174,50 @@ func (_c *MockCodefresh_Tasks_Call) RunAndReturn(run func(context.Context) (task return _c } +// ReportTaskStatus provides a mock function with given fields: ctx, id, status +func (_m *MockCodefresh) ReportTaskStatus(ctx context.Context, id string, status task.TaskStatus) error { + ret := _m.Called(ctx, id, status) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, task.TaskStatus) error); ok { + r0 = rf(ctx, id, status) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockCodefresh_ReportTaskStatus_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ReportTaskStatus' +type MockCodefresh_ReportTaskStatus_Call struct { + *mock.Call +} + +// ReportTaskStatus is a helper method to define mock.On call +// - ctx context.Context +// - id string +// - status task.TaskStatus +func (_e *MockCodefresh_Expecter) ReportTaskStatus(ctx interface{}, id interface{}, status interface{}) *MockCodefresh_ReportTaskStatus_Call { + return &MockCodefresh_ReportTaskStatus_Call{Call: _e.mock.On("ReportTaskStatus", ctx, id, status)} +} + +func (_c *MockCodefresh_ReportTaskStatus_Call) Run(run func(ctx context.Context, id string, status task.TaskStatus)) *MockCodefresh_ReportTaskStatus_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(task.TaskStatus)) + }) + return _c +} + +func (_c *MockCodefresh_ReportTaskStatus_Call) Return(_a0 error) *MockCodefresh_ReportTaskStatus_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockCodefresh_ReportTaskStatus_Call) RunAndReturn(run func(context.Context, string, task.TaskStatus) error) *MockCodefresh_ReportTaskStatus_Call { + _c.Call.Return(run) + return _c +} + // NewMockCodefresh creates a new instance of MockCodefresh. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewMockCodefresh(t interface { diff --git a/venona/pkg/codefresh/codefresh_test.go b/venona/pkg/codefresh/codefresh_test.go index b841d146..ed5672d4 100644 --- a/venona/pkg/codefresh/codefresh_test.go +++ b/venona/pkg/codefresh/codefresh_test.go @@ -69,6 +69,7 @@ func Test_cf_prepareURL(t *testing.T) { tests := map[string]struct { fields args paths []string + query map[string]string want *url.URL wantErr bool }{ @@ -94,6 +95,29 @@ func Test_cf_prepareURL(t *testing.T) { wantErr: false, want: mustURL("http://url/docker:desktop%2Fserver"), }, + "Append query": { + query: map[string]string{ + "key": "value", + "keyTwo": "valueTwo", + }, + paths: []string{"docker:desktop/server"}, + fields: args{ + host: "http://url", + }, + wantErr: false, + want: mustURL("http://url/docker:desktop%2Fserver?key=value&keyTwo=valueTwo"), + }, + "Escape query": { + query: map[string]string{ + "ke+y": "va+lu=e", + }, + paths: []string{"docker:desktop/server"}, + fields: args{ + host: "http://url", + }, + wantErr: false, + want: mustURL("http://url/docker:desktop%2Fserver?ke%2By=va%2Blu%3De"), + }, } for name, tt := range tests { t.Run(name, func(t *testing.T) { @@ -103,7 +127,7 @@ func Test_cf_prepareURL(t *testing.T) { agentID: tt.fields.agentID, httpClient: tt.fields.httpClient, } - url, err := c.prepareURL(tt.paths...) + url, err := c.prepareURL(tt.query, tt.paths...) if tt.wantErr { assert.Error(t, err) } diff --git a/venona/pkg/errors/errors.go b/venona/pkg/errors/errors.go new file mode 100644 index 00000000..81ffb9d9 --- /dev/null +++ b/venona/pkg/errors/errors.go @@ -0,0 +1,24 @@ +// Copyright 2020 The Codefresh Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package errors + +type RetriableError interface { + IsRetriable() bool +} + +func IsRetriable(err error) bool { + e, ok := err.(RetriableError) + return ok && e.IsRetriable() +} diff --git a/venona/pkg/kubernetes/kubernetes.go b/venona/pkg/kubernetes/kubernetes.go index 5c8cc72c..7590d237 100644 --- a/venona/pkg/kubernetes/kubernetes.go +++ b/venona/pkg/kubernetes/kubernetes.go @@ -26,6 +26,7 @@ import ( "github.com/codefresh-io/go/venona/pkg/task" v1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" @@ -33,6 +34,11 @@ import ( "k8s.io/client-go/rest" ) +const ( + TypeK8sCreateResource K8sOperation = "CreateResource" + TypeK8sDeleteResource K8sOperation = "DeleteResource" +) + type ( // Kubernetes API client Kubernetes interface { @@ -65,6 +71,13 @@ type ( log logger.Logger forceDeletePvc bool } + + K8sOperation string + + K8sError struct { + error + isRetriable bool + } ) var ( @@ -73,6 +86,27 @@ var ( removeFinalizersJsonPatch = []byte(`[{ "op": "remove", "path": "/metadata/finalizers" }]`) ) +func (e K8sError) IsRetriable() bool { + return e.isRetriable +} + +func NewK8sError(err error, operation K8sOperation) error { + isNotRetriable := k8serrors.IsBadRequest(err) || + k8serrors.IsForbidden(err) || + k8serrors.IsMethodNotSupported(err) || + k8serrors.IsRequestEntityTooLargeError(err) || + k8serrors.IsNotAcceptable(err) || + k8serrors.IsUnsupportedMediaType(err) || + k8serrors.IsUnauthorized(err) || + (operation == TypeK8sCreateResource && k8serrors.IsAlreadyExists(err)) || + (operation == TypeK8sDeleteResource && (k8serrors.IsNotFound(err) || k8serrors.IsGone(err))) + + return &K8sError{ + error: err, + isRetriable: !isNotRetriable, + } +} + // NewInCluster build Kubernetes API based on local in cluster runtime func NewInCluster(log logger.Logger, qps float32, burst int, forceDeletePvc bool) (Kubernetes, error) { client, err := buildKubeInCluster(qps, burst) @@ -101,12 +135,12 @@ func (k kube) CreateResource(ctx context.Context, taskType task.Type, spec inter start := time.Now() bytes, err := json.Marshal(spec) if err != nil { - return fmt.Errorf("failed marshalling when creating resource: %w", err) + return NewK8sError(fmt.Errorf("failed marshalling when creating resource: %w", err), TypeK8sCreateResource) } obj, _, err := kubeDecode(bytes, nil, nil) if err != nil { - return fmt.Errorf("failed decoding when creating resource: %w", err) + return NewK8sError(fmt.Errorf("failed decoding when creating resource: %w", err), TypeK8sCreateResource) } var namespace, name string @@ -115,18 +149,18 @@ func (k kube) CreateResource(ctx context.Context, taskType task.Type, spec inter namespace, name = obj.Namespace, obj.Name _, err = k.client.CoreV1().PersistentVolumeClaims(namespace).Create(ctx, obj, metav1.CreateOptions{}) if err != nil { - return fmt.Errorf("failed creating persistent volume claims \"%s\\%s\": %w", namespace, obj.Name, err) + return NewK8sError(fmt.Errorf("failed creating persistent volume claims \"%s\\%s\": %w", namespace, obj.Name, err), TypeK8sCreateResource) } case *v1.Pod: namespace, name = obj.Namespace, obj.Name _, err = k.client.CoreV1().Pods(namespace).Create(ctx, obj, metav1.CreateOptions{}) if err != nil { - return fmt.Errorf("failed creating pod \"%s\\%s\": %w", namespace, obj.Name, err) + return NewK8sError(fmt.Errorf("failed creating pod \"%s\\%s\": %w", namespace, obj.Name, err), TypeK8sCreateResource) } metrics.IncWorkflowRetries(name) default: - return fmt.Errorf("failed creating resource of type %s", obj.GetObjectKind().GroupVersionKind()) + return NewK8sError(fmt.Errorf("failed creating resource of type %s", obj.GetObjectKind().GroupVersionKind()), TypeK8sCreateResource) } processed := time.Since(start) @@ -146,22 +180,22 @@ func (k kube) DeleteResource(ctx context.Context, opts DeleteOptions) error { case task.TypeDeletePVC: err := k.client.CoreV1().PersistentVolumeClaims(opts.Namespace).Delete(ctx, opts.Name, metav1.DeleteOptions{}) if err != nil { - return fmt.Errorf("failed deleting persistent volume claim \"%s\\%s\": %w", opts.Namespace, opts.Name, err) + return NewK8sError(fmt.Errorf("failed deleting persistent volume claim \"%s\\%s\": %w", opts.Namespace, opts.Name, err), TypeK8sDeleteResource) } if k.forceDeletePvc { _, err := k.client.CoreV1().PersistentVolumeClaims(opts.Namespace).Patch(ctx, opts.Name, types.JSONPatchType, removeFinalizersJsonPatch, metav1.PatchOptions{}) if err != nil { - return fmt.Errorf("failed removing finalizers from PVC \"%s\\%s\": %w", opts.Namespace, opts.Name, err) + return NewK8sError(fmt.Errorf("failed removing finalizers from PVC \"%s\\%s\": %w", opts.Namespace, opts.Name, err), TypeK8sDeleteResource) } } case task.TypeDeletePod: err := k.client.CoreV1().Pods(opts.Namespace).Delete(ctx, opts.Name, metav1.DeleteOptions{}) if err != nil { - return fmt.Errorf("failed deleting pod \"%s\\%s\": %w", opts.Namespace, opts.Name, err) + return NewK8sError(fmt.Errorf("failed deleting pod \"%s\\%s\": %w", opts.Namespace, opts.Name, err), TypeK8sDeleteResource) } default: - return fmt.Errorf("failed deleting resource of type %s", opts.Kind) + return NewK8sError(fmt.Errorf("failed deleting resource of type %s", opts.Kind), TypeK8sDeleteResource) } processed := time.Since(start) diff --git a/venona/pkg/kubernetes/kubernetes_test.go b/venona/pkg/kubernetes/kubernetes_test.go index 4886a146..7d1cfd31 100644 --- a/venona/pkg/kubernetes/kubernetes_test.go +++ b/venona/pkg/kubernetes/kubernetes_test.go @@ -16,13 +16,16 @@ package kubernetes import ( "context" + "errors" "testing" + ierrors "github.com/codefresh-io/go/venona/pkg/errors" "github.com/codefresh-io/go/venona/pkg/logger" "github.com/codefresh-io/go/venona/pkg/task" "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/fake" ) @@ -212,3 +215,73 @@ func Test_kube_DeleteResource(t *testing.T) { }) } } + +func Test_NewK8sError(t *testing.T) { + nonRetriableErrors := []k8serrors.StatusError{ + *k8serrors.NewBadRequest("reason"), + *k8serrors.NewForbidden(v1.Resource("pods"), "some-pod", errors.New("reason")), + *k8serrors.NewMethodNotSupported(v1.Resource("pods"), "some-pod"), + *k8serrors.NewRequestEntityTooLargeError("reason"), + *k8serrors.NewUnauthorized("reason"), + } + + for _, e := range nonRetriableErrors { + t.Run("should return a non-retriable error regardless of the operation", func(t *testing.T) { + err0 := NewK8sError(&e, TypeK8sCreateResource) + assert.False(t, ierrors.IsRetriable(err0)) + err1 := NewK8sError(&e, TypeK8sDeleteResource) + assert.False(t, ierrors.IsRetriable(err1)) + }) + } +} + +func Test_NewK8sError_CreateResource(t *testing.T) { + nonRetriableErrors := []k8serrors.StatusError{ + *k8serrors.NewAlreadyExists(v1.Resource("pods"), "some-pod"), + } + + for _, e := range nonRetriableErrors { + t.Run("should return a non-retriable error for create resource operation", func(t *testing.T) { + err := NewK8sError(&e, TypeK8sCreateResource) + assert.False(t, ierrors.IsRetriable(err)) + }) + } + + retriableErrors := []k8serrors.StatusError{ + *k8serrors.NewInternalError(errors.New("reason")), + *k8serrors.NewTimeoutError("reason", 1), + } + + for _, e := range retriableErrors { + t.Run("should return a retriable error for create resource operation", func(t *testing.T) { + err := NewK8sError(&e, TypeK8sCreateResource) + assert.True(t, ierrors.IsRetriable(err)) + }) + } +} + +func Test_NewK8sError_DeleteResource(t *testing.T) { + nonRetriableErrors := []k8serrors.StatusError{ + *k8serrors.NewNotFound(v1.Resource("pods"), "some-pod"), + *k8serrors.NewGone("reason"), + } + + for _, e := range nonRetriableErrors { + t.Run("should return a non-retriable error for delete resource operation", func(t *testing.T) { + err := NewK8sError(&e, TypeK8sDeleteResource) + assert.False(t, ierrors.IsRetriable(err)) + }) + } + + retriableErrors := []k8serrors.StatusError{ + *k8serrors.NewInternalError(errors.New("reason")), + *k8serrors.NewTimeoutError("reason", 1), + } + + for _, e := range retriableErrors { + t.Run("should return a retriable error for delete resource operation", func(t *testing.T) { + err := NewK8sError(&e, TypeK8sDeleteResource) + assert.True(t, ierrors.IsRetriable(err)) + }) + } +} diff --git a/venona/pkg/queue/queue.go b/venona/pkg/queue/queue.go index 5386f9aa..e0195be8 100644 --- a/venona/pkg/queue/queue.go +++ b/venona/pkg/queue/queue.go @@ -20,6 +20,8 @@ import ( "sync" "time" + "github.com/codefresh-io/go/venona/pkg/codefresh" + ierrors "github.com/codefresh-io/go/venona/pkg/errors" "github.com/codefresh-io/go/venona/pkg/logger" "github.com/codefresh-io/go/venona/pkg/metrics" "github.com/codefresh-io/go/venona/pkg/monitoring" @@ -45,6 +47,7 @@ type ( Monitor monitoring.Monitor Concurrency int BufferSize int + Codefresh codefresh.Codefresh } wfQueueImpl struct { @@ -57,6 +60,7 @@ type ( stop []chan bool activeWorkflows map[string]struct{} mutex sync.Mutex + cf codefresh.Codefresh } ) @@ -73,6 +77,7 @@ func New(opts *Options) WorkflowQueue { concurrency: opts.Concurrency, stop: make([]chan bool, opts.Concurrency), activeWorkflows: make(map[string]struct{}), + cf: opts.Codefresh, } } @@ -116,22 +121,22 @@ func (wfq *wfQueueImpl) handleChannel(ctx context.Context, stopChan chan bool, i ctxCancelled = true case wf := <-wfq.queue: wfq.mutex.Lock() - if _, ok := wfq.activeWorkflows[wf.Metadata.Workflow]; ok { + if _, ok := wfq.activeWorkflows[wf.Metadata.WorkflowId]; ok { // Workflow is already being handled, enqueue it again and skip processing wfq.mutex.Unlock() - wfq.log.Info("Workflow", wf.Metadata.Workflow, " is already being handled, enqueue it again and skip processing") + wfq.log.Info("Workflow", wf.Metadata.WorkflowId, " is already being handled, enqueue it again and skip processing") time.Sleep(100 * time.Millisecond) wfq.Enqueue(wf) continue } // Mark the workflow as active - wfq.activeWorkflows[wf.Metadata.Workflow] = struct{}{} + wfq.activeWorkflows[wf.Metadata.WorkflowId] = struct{}{} wfq.mutex.Unlock() - wfq.log.Info("handling workflow", "handlerId", id, "workflow", wf.Metadata.Workflow) + wfq.log.Info("handling workflow", "handlerId", id, "workflow", wf.Metadata.WorkflowId) wfq.handleWorkflow(ctx, wf) wfq.mutex.Lock() - delete(wfq.activeWorkflows, wf.Metadata.Workflow) + delete(wfq.activeWorkflows, wf.Metadata.WorkflowId) wfq.mutex.Unlock() default: if ctxCancelled { @@ -149,7 +154,7 @@ func (wfq *wfQueueImpl) handleWorkflow(ctx context.Context, wf *workflow.Workflo txn := task.NewTaskTransaction(wfq.monitor, wf.Metadata) defer txn.End() - workflow := wf.Metadata.Workflow + workflow := wf.Metadata.WorkflowId reName := wf.Metadata.ReName runtime, ok := wfq.runtimes[reName] if !ok { @@ -159,16 +164,20 @@ func (wfq *wfQueueImpl) handleWorkflow(ctx context.Context, wf *workflow.Workflo } for i := range wf.Tasks { - err := runtime.HandleTask(ctx, wf.Tasks[i]) + taskDef := wf.Tasks[i] + err := runtime.HandleTask(ctx, taskDef) if err != nil { - wfq.log.Error("failed handling task", "error", err, "workflow", workflow) + wfq.log.Error("failed handling task", "error", err, "workflow", workflow, "task", taskDef.Id) txn.NoticeError(errRuntimeNotFound) } + if taskDef.Metadata.ShouldReportStatus { + wfq.reportTaskStatus(ctx, *taskDef, err) + } } sinceCreation, inRunner, processed := wf.GetLatency() wfq.log.Info("Done handling workflow", - "workflow", wf.Metadata.Workflow, + "workflow", wf.Metadata.WorkflowId, "runtime", wf.Metadata.ReName, "time since creation", sinceCreation, "time in runner", inRunner, @@ -176,3 +185,22 @@ func (wfq *wfQueueImpl) handleWorkflow(ctx context.Context, wf *workflow.Workflo ) metrics.ObserveWorkflowMetrics(wf.Type, sinceCreation, inRunner, processed) } + +func (wfq *wfQueueImpl) reportTaskStatus(ctx context.Context, taskDef task.Task, err error) { + status := task.TaskStatus{ + OccurredAt: time.Now(), + StatusRevision: taskDef.Metadata.CurrentStatusRevision + 1, + } + if err != nil { + status.Status = task.StatusError + status.Reason = err.Error() + status.IsRetriable = ierrors.IsRetriable(err) + } else { + status.Status = task.StatusSuccess + } + + statusErr := wfq.cf.ReportTaskStatus(ctx, taskDef.Id, status) + if statusErr != nil { + wfq.log.Error("failed reporting task status", "error", statusErr, "task", taskDef.Id, "workflow", taskDef.Metadata.WorkflowId) + } +} diff --git a/venona/pkg/queue/queue_test.go b/venona/pkg/queue/queue_test.go index ff0cf2aa..b8e8f420 100644 --- a/venona/pkg/queue/queue_test.go +++ b/venona/pkg/queue/queue_test.go @@ -33,8 +33,8 @@ import ( func makeWorkflow(wfID string, numOfTasks int) *workflow.Workflow { metadata := task.Metadata{ - Workflow: wfID, - ReName: "some-rt", + WorkflowId: wfID, + ReName: "some-rt", } wf := workflow.New(metadata) for i := 0; i < numOfTasks; i++ { diff --git a/venona/pkg/runtime/runtime.go b/venona/pkg/runtime/runtime.go index 437a7d4f..8e60828f 100644 --- a/venona/pkg/runtime/runtime.go +++ b/venona/pkg/runtime/runtime.go @@ -19,6 +19,7 @@ import ( "encoding/json" "fmt" + ierrors "github.com/codefresh-io/go/venona/pkg/errors" "github.com/codefresh-io/go/venona/pkg/kubernetes" "github.com/codefresh-io/go/venona/pkg/task" ) @@ -37,8 +38,24 @@ type ( runtime struct { client kubernetes.Kubernetes } + + HandleTaskError struct { + error + isRetriable bool + } ) +func (e HandleTaskError) IsRetriable() bool { + return e.isRetriable +} + +func NewHandleTaskError(err error, isRetriable bool) error { + return &HandleTaskError{ + error: err, + isRetriable: isRetriable, + } +} + // New creates new Runtime client func New(opts Options) Runtime { return &runtime{ @@ -47,31 +64,29 @@ func New(opts Options) Runtime { } func (r runtime) HandleTask(ctx context.Context, t *task.Task) error { - var err error - switch t.Type { case task.TypeCreatePVC, task.TypeCreatePod: - err = r.client.CreateResource(ctx, t.Type, t.Spec) + err := r.client.CreateResource(ctx, t.Type, t.Spec) if err != nil { - return fmt.Errorf("failed creating resource: %w", err) // TODO: Return already executed tasks in order to terminate them + return NewHandleTaskError(fmt.Errorf("failed creating resource: %w", err), ierrors.IsRetriable(err)) // TODO: Return already executed tasks in order to terminate them } case task.TypeDeletePVC, task.TypeDeletePod: opts := kubernetes.DeleteOptions{} opts.Kind = t.Type b, err := json.Marshal(t.Spec) if err != nil { - return fmt.Errorf("failed to marshal task spec: %w", err) + return NewHandleTaskError(fmt.Errorf("failed to marshal task spec: %w", err), false) } if err := json.Unmarshal(b, &opts); err != nil { - return fmt.Errorf("failed to unmarshal task spec: %w", err) + return NewHandleTaskError(fmt.Errorf("failed to unmarshal task spec: %w", err), false) } - if err = r.client.DeleteResource(ctx, opts); err != nil { - return fmt.Errorf("failed deleting resource: %w", err) + if err := r.client.DeleteResource(ctx, opts); err != nil { + return NewHandleTaskError(fmt.Errorf("failed deleting resource: %w", err), ierrors.IsRetriable(err)) } default: - return fmt.Errorf("unknown task type \"%s\"", t.Type) + return NewHandleTaskError(fmt.Errorf("unknown task type \"%s\"", t.Type), false) } return nil diff --git a/venona/pkg/task/task.go b/venona/pkg/task/task.go index 1780d6df..58d3dcbb 100644 --- a/venona/pkg/task/task.go +++ b/venona/pkg/task/task.go @@ -31,6 +31,11 @@ const ( TypeAgentTask Type = "AgentTask" ) +const ( + StatusSuccess Status = "Success" + StatusError Status = "Error" +) + type ( // Tasks array Tasks []Task @@ -38,8 +43,12 @@ type ( // Type of the task Type string + // Task status + Status string + // Task options Task struct { + Id string `json:"_id"` Type Type `json:"type"` Metadata Metadata `json:"metadata"` Spec interface{} `json:"spec"` @@ -50,9 +59,11 @@ type ( // Metadata options Metadata struct { - CreatedAt string `json:"createdAt"` - ReName string `json:"reName"` - Workflow string `json:"workflow"` + CreatedAt string `json:"createdAt"` + ReName string `json:"reName"` + WorkflowId string `json:"workflowId"` + CurrentStatusRevision int `json:"currentStatusRevision"` + ShouldReportStatus bool `json:"shouldReportStatus"` } // Timeline values @@ -66,6 +77,14 @@ type ( Type string `json:"type"` Params map[string]interface{} `json:"params"` } + + TaskStatus struct { + Status Status `json:"status"` + OccurredAt time.Time `json:"occurredAt"` + StatusRevision int `json:"statusRevision"` + IsRetriable bool `json:"isRetriable"` + Reason string `json:"reason,omitempty"` + } ) // UnmarshalTasks with json @@ -80,6 +99,10 @@ func (r *Tasks) Marshal() ([]byte, error) { return json.Marshal(r) } +func (r *TaskStatus) Marshal() ([]byte, error) { + return json.Marshal(r) +} + // Less compares two tasks by their CreatedAt values func Less(task1 Task, task2 Task) bool { return task1.Metadata.CreatedAt < task2.Metadata.CreatedAt @@ -88,7 +111,7 @@ func Less(task1 Task, task2 Task) bool { // NewTaskTransaction creates a new transaction with task-specific attributes func NewTaskTransaction(monitor monitoring.Monitor, m Metadata) monitoring.Transaction { txn := monitor.NewTransaction("runner-tasks-execution") - txn.AddAttribute("tid", m.Workflow) + txn.AddAttribute("tid", m.WorkflowId) txn.AddAttribute("runtime-environment", m.ReName) return txn } diff --git a/venona/pkg/workflow/workflow.go b/venona/pkg/workflow/workflow.go index b6e14e46..ea83c337 100644 --- a/venona/pkg/workflow/workflow.go +++ b/venona/pkg/workflow/workflow.go @@ -52,8 +52,8 @@ func New(metadata task.Metadata) *Workflow { // AddTask adds a specific task to its matching parent worklow func (wf *Workflow) AddTask(t *task.Task) error { - if wf.Metadata.ReName != t.Metadata.ReName || wf.Metadata.Workflow != t.Metadata.Workflow { - return fmt.Errorf("mismatch runtime or workflow id, %s/%s is different from %s/%s", wf.Metadata.ReName, wf.Metadata.Workflow, t.Metadata.ReName, t.Metadata.Workflow) + if wf.Metadata.ReName != t.Metadata.ReName || wf.Metadata.WorkflowId != t.Metadata.WorkflowId { + return fmt.Errorf("mismatch runtime or workflow id, %s/%s is different from %s/%s", wf.Metadata.ReName, wf.Metadata.WorkflowId, t.Metadata.ReName, t.Metadata.WorkflowId) } if wf.Metadata.CreatedAt > t.Metadata.CreatedAt { diff --git a/venonactl/VERSION b/venonactl/VERSION index 3cc3669f..227cea21 100644 --- a/venonactl/VERSION +++ b/venonactl/VERSION @@ -1 +1 @@ -1.10.8 +2.0.0