diff --git a/venona/pkg/agent/agent_test.go b/venona/pkg/agent/agent_test.go index 2d6a1438..dd2bf69e 100644 --- a/venona/pkg/agent/agent_test.go +++ b/venona/pkg/agent/agent_test.go @@ -259,7 +259,7 @@ func Test_executeAgentTask(t *testing.T) { a := &Agent{ log: logger.New(logger.Options{}), } - err := a.executeAgentTask(tt.task) + err := a.executeAgentTask(context.Background(), tt.task) if err != nil || tt.wantErr != "" { assert.EqualError(t, err, tt.wantErr) } diff --git a/venona/pkg/codefresh/codefresh_mock.go b/venona/pkg/codefresh/codefresh_mock.go index cb94e639..61e5f1ea 100644 --- a/venona/pkg/codefresh/codefresh_mock.go +++ b/venona/pkg/codefresh/codefresh_mock.go @@ -174,6 +174,50 @@ func (_c *MockCodefresh_Tasks_Call) RunAndReturn(run func(context.Context) (task return _c } +// ReportTaskStatus provides a mock function with given fields: ctx, id, status +func (_m *MockCodefresh) ReportTaskStatus(ctx context.Context, id string, status task.TaskStatus) error { + ret := _m.Called(ctx, id, status) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, task.TaskStatus) error); ok { + r0 = rf(ctx, id, status) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockCodefresh_ReportTaskStatus_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ReportTaskStatus' +type MockCodefresh_ReportTaskStatus_Call struct { + *mock.Call +} + +// ReportTaskStatus is a helper method to define mock.On call +// - ctx context.Context +// - id string +// - status task.TaskStatus +func (_e *MockCodefresh_Expecter) ReportTaskStatus(ctx interface{}, id interface{}, status interface{}) *MockCodefresh_ReportTaskStatus_Call { + return &MockCodefresh_ReportTaskStatus_Call{Call: _e.mock.On("ReportTaskStatus", ctx, id, status)} +} + +func (_c *MockCodefresh_ReportTaskStatus_Call) Run(run func(ctx context.Context, id string, status task.TaskStatus)) *MockCodefresh_ReportTaskStatus_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(task.TaskStatus)) + }) + return _c +} + +func (_c *MockCodefresh_ReportTaskStatus_Call) Return(_a0 error) *MockCodefresh_ReportTaskStatus_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockCodefresh_ReportTaskStatus_Call) RunAndReturn(run func(context.Context, string, task.TaskStatus) error) *MockCodefresh_ReportTaskStatus_Call { + _c.Call.Return(run) + return _c +} + // NewMockCodefresh creates a new instance of MockCodefresh. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewMockCodefresh(t interface { diff --git a/venona/pkg/errors/errors.go b/venona/pkg/errors/errors.go new file mode 100644 index 00000000..81ffb9d9 --- /dev/null +++ b/venona/pkg/errors/errors.go @@ -0,0 +1,24 @@ +// Copyright 2020 The Codefresh Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package errors + +type RetriableError interface { + IsRetriable() bool +} + +func IsRetriable(err error) bool { + e, ok := err.(RetriableError) + return ok && e.IsRetriable() +} diff --git a/venona/pkg/kubernetes/kubernetes.go b/venona/pkg/kubernetes/kubernetes.go index e064aa40..7590d237 100644 --- a/venona/pkg/kubernetes/kubernetes.go +++ b/venona/pkg/kubernetes/kubernetes.go @@ -42,8 +42,8 @@ const ( type ( // Kubernetes API client Kubernetes interface { - CreateResource(ctx context.Context, taskType task.Type, spec interface{}) *K8sError - DeleteResource(ctx context.Context, opts DeleteOptions) *K8sError + CreateResource(ctx context.Context, taskType task.Type, spec interface{}) error + DeleteResource(ctx context.Context, opts DeleteOptions) error } // Options for Kubernetes @@ -76,7 +76,7 @@ type ( K8sError struct { error - IsRetriable bool + isRetriable bool } ) @@ -86,7 +86,11 @@ var ( removeFinalizersJsonPatch = []byte(`[{ "op": "remove", "path": "/metadata/finalizers" }]`) ) -func NewK8sError(err error, operation K8sOperation) *K8sError { +func (e K8sError) IsRetriable() bool { + return e.isRetriable +} + +func NewK8sError(err error, operation K8sOperation) error { isNotRetriable := k8serrors.IsBadRequest(err) || k8serrors.IsForbidden(err) || k8serrors.IsMethodNotSupported(err) || @@ -99,7 +103,7 @@ func NewK8sError(err error, operation K8sOperation) *K8sError { return &K8sError{ error: err, - IsRetriable: !isNotRetriable, + isRetriable: !isNotRetriable, } } @@ -127,7 +131,7 @@ func New(opts Options) (Kubernetes, error) { }, err } -func (k kube) CreateResource(ctx context.Context, taskType task.Type, spec interface{}) *K8sError { +func (k kube) CreateResource(ctx context.Context, taskType task.Type, spec interface{}) error { start := time.Now() bytes, err := json.Marshal(spec) if err != nil { @@ -170,7 +174,7 @@ func (k kube) CreateResource(ctx context.Context, taskType task.Type, spec inter return nil } -func (k kube) DeleteResource(ctx context.Context, opts DeleteOptions) *K8sError { +func (k kube) DeleteResource(ctx context.Context, opts DeleteOptions) error { start := time.Now() switch opts.Kind { case task.TypeDeletePVC: diff --git a/venona/pkg/queue/queue.go b/venona/pkg/queue/queue.go index 9665c238..9abdcb0a 100644 --- a/venona/pkg/queue/queue.go +++ b/venona/pkg/queue/queue.go @@ -21,6 +21,7 @@ import ( "time" "github.com/codefresh-io/go/venona/pkg/codefresh" + ierrors "github.com/codefresh-io/go/venona/pkg/errors" "github.com/codefresh-io/go/venona/pkg/logger" "github.com/codefresh-io/go/venona/pkg/metrics" "github.com/codefresh-io/go/venona/pkg/monitoring" @@ -185,7 +186,7 @@ func (wfq *wfQueueImpl) handleWorkflow(ctx context.Context, wf *workflow.Workflo metrics.ObserveWorkflowMetrics(wf.Type, sinceCreation, inRunner, processed) } -func (wfq *wfQueueImpl) reportTaskStatus(ctx context.Context, taskDef task.Task, err *runtime.HandleTaskError) { +func (wfq *wfQueueImpl) reportTaskStatus(ctx context.Context, taskDef task.Task, err error) { status := task.TaskStatus{ OccurredAt: time.Now(), StatusRevision: taskDef.Metadata.CurrentStatusRevision + 1, @@ -193,7 +194,7 @@ func (wfq *wfQueueImpl) reportTaskStatus(ctx context.Context, taskDef task.Task, if err != nil { status.Status = task.StatusError status.Reason = err.Error() - status.IsRetriable = err.IsRetriable + status.IsRetriable = ierrors.IsRetriable(err) } else { status.Status = task.StatusSuccess } diff --git a/venona/pkg/runtime/runtime.go b/venona/pkg/runtime/runtime.go index 2237a099..8e60828f 100644 --- a/venona/pkg/runtime/runtime.go +++ b/venona/pkg/runtime/runtime.go @@ -19,6 +19,7 @@ import ( "encoding/json" "fmt" + ierrors "github.com/codefresh-io/go/venona/pkg/errors" "github.com/codefresh-io/go/venona/pkg/kubernetes" "github.com/codefresh-io/go/venona/pkg/task" ) @@ -26,7 +27,7 @@ import ( type ( // Runtime API client Runtime interface { - HandleTask(ctx context.Context, t *task.Task) *HandleTaskError + HandleTask(ctx context.Context, t *task.Task) error } // Options for runtime @@ -40,14 +41,18 @@ type ( HandleTaskError struct { error - IsRetriable bool + isRetriable bool } ) -func NewHandleTaskError(err error, isRetriable bool) *HandleTaskError { +func (e HandleTaskError) IsRetriable() bool { + return e.isRetriable +} + +func NewHandleTaskError(err error, isRetriable bool) error { return &HandleTaskError{ error: err, - IsRetriable: isRetriable, + isRetriable: isRetriable, } } @@ -58,12 +63,12 @@ func New(opts Options) Runtime { } } -func (r runtime) HandleTask(ctx context.Context, t *task.Task) *HandleTaskError { +func (r runtime) HandleTask(ctx context.Context, t *task.Task) error { switch t.Type { case task.TypeCreatePVC, task.TypeCreatePod: err := r.client.CreateResource(ctx, t.Type, t.Spec) if err != nil { - return NewHandleTaskError(fmt.Errorf("failed creating resource: %w", err), err.IsRetriable) // TODO: Return already executed tasks in order to terminate them + return NewHandleTaskError(fmt.Errorf("failed creating resource: %w", err), ierrors.IsRetriable(err)) // TODO: Return already executed tasks in order to terminate them } case task.TypeDeletePVC, task.TypeDeletePod: opts := kubernetes.DeleteOptions{} @@ -78,7 +83,7 @@ func (r runtime) HandleTask(ctx context.Context, t *task.Task) *HandleTaskError } if err := r.client.DeleteResource(ctx, opts); err != nil { - return NewHandleTaskError(fmt.Errorf("failed deleting resource: %w", err), err.IsRetriable) + return NewHandleTaskError(fmt.Errorf("failed deleting resource: %w", err), ierrors.IsRetriable(err)) } default: return NewHandleTaskError(fmt.Errorf("unknown task type \"%s\"", t.Type), false)