diff --git a/flytepropeller/pkg/controller/nodes/task/backoff/atomic_time.go b/flytepropeller/pkg/controller/nodes/task/backoff/atomic_time.go new file mode 100644 index 0000000000..953b3b8345 --- /dev/null +++ b/flytepropeller/pkg/controller/nodes/task/backoff/atomic_time.go @@ -0,0 +1,31 @@ +package backoff + +import ( + "sync/atomic" + "time" +) + +// AtomicTime represents an atomic.Value that stores time.Time. +type AtomicTime struct { + v atomic.Value +} + +// Loads the underlying time.Time. +func (a *AtomicTime) Load() time.Time { + return a.v.Load().(time.Time) +} + +// Stores time.Time to the underlying atomic.Value +func (a *AtomicTime) Store(t time.Time) { + a.v.Store(t) +} + +// Creates a new Atomic time.Time +func NewAtomicTime(t time.Time) AtomicTime { + v := atomic.Value{} + v.Store(t) + + return AtomicTime{ + v: v, + } +} diff --git a/flytepropeller/pkg/controller/nodes/task/backoff/controller.go b/flytepropeller/pkg/controller/nodes/task/backoff/controller.go index 890e6d3ead..00a599ce3c 100644 --- a/flytepropeller/pkg/controller/nodes/task/backoff/controller.go +++ b/flytepropeller/pkg/controller/nodes/task/backoff/controller.go @@ -5,6 +5,8 @@ import ( "fmt" "time" + stdAtomic "github.com/lyft/flytestdlib/atomic" + "github.com/lyft/flytestdlib/logger" "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/k8s" @@ -24,8 +26,8 @@ func (m *Controller) GetOrCreateHandler(ctx context.Context, key string, backOff SimpleBackOffBlocker: &SimpleBackOffBlocker{ Clock: m.Clock, BackOffBaseSecond: backOffBaseSecond, - BackOffExponent: 0, - NextEligibleTime: m.Clock.Now(), + BackOffExponent: stdAtomic.NewUint32(0), + NextEligibleTime: NewAtomicTime(m.Clock.Now()), MaxBackOffDuration: maxBackOffDuration, }, ComputeResourceCeilings: &ComputeResourceCeilings{ computeResourceCeilings: v1.ResourceList{}, @@ -37,9 +39,11 @@ func (m *Controller) GetOrCreateHandler(ctx context.Context, key string, backOff } else { logger.Infof(ctx, "The back-off handler for [%v] has been created.\n", key) } + if ret, casted := h.(*ComputeResourceAwareBackOffHandler); casted { return ret } + return nil } @@ -47,25 +51,6 @@ func (m *Controller) GetBackOffHandler(key string) (*ComputeResourceAwareBackOff return m.backOffHandlerMap.Get(key) } -func (m *Controller) CreateBackOffHandler(ctx context.Context, key string, backOffBaseSecond int, maxBackOffDuration time.Duration) *ComputeResourceAwareBackOffHandler { - m.backOffHandlerMap.Set(key, &ComputeResourceAwareBackOffHandler{ - SimpleBackOffBlocker: &SimpleBackOffBlocker{ - Clock: m.Clock, - BackOffBaseSecond: backOffBaseSecond, - BackOffExponent: 0, - NextEligibleTime: m.Clock.Now(), - MaxBackOffDuration: maxBackOffDuration, - }, - ComputeResourceCeilings: &ComputeResourceCeilings{ - computeResourceCeilings: v1.ResourceList{}, - }, - }) - h, _ := m.backOffHandlerMap.Get(key) - h.reset() - logger.Infof(ctx, "The back-off handler for [%v] has been created.\n", key) - return h -} - func ComposeResourceKey(o k8s.Resource) string { return fmt.Sprintf("%v,%v", o.GroupVersionKind().String(), o.GetNamespace()) } diff --git a/flytepropeller/pkg/controller/nodes/task/backoff/handler.go b/flytepropeller/pkg/controller/nodes/task/backoff/handler.go index 396d9c6702..941a7443a1 100644 --- a/flytepropeller/pkg/controller/nodes/task/backoff/handler.go +++ b/flytepropeller/pkg/controller/nodes/task/backoff/handler.go @@ -8,6 +8,7 @@ import ( "time" "github.com/lyft/flyteplugins/go/tasks/errors" + stdAtomic "github.com/lyft/flytestdlib/atomic" stdErrors "github.com/lyft/flytestdlib/errors" "github.com/lyft/flytestdlib/logger" v1 "k8s.io/api/core/v1" @@ -24,33 +25,40 @@ var ( type SimpleBackOffBlocker struct { Clock clock.Clock BackOffBaseSecond int - BackOffExponent int - NextEligibleTime time.Time MaxBackOffDuration time.Duration + + // Mutable fields + BackOffExponent stdAtomic.Uint32 + NextEligibleTime AtomicTime } func (b *SimpleBackOffBlocker) isBlocking(t time.Time) bool { - return !b.NextEligibleTime.Before(t) + return !b.NextEligibleTime.Load().Before(t) } func (b *SimpleBackOffBlocker) getBlockExpirationTime() time.Time { - return b.NextEligibleTime + return b.NextEligibleTime.Load() } func (b *SimpleBackOffBlocker) reset() { - b.BackOffExponent = 0 - b.NextEligibleTime = b.Clock.Now() + b.BackOffExponent.Store(0) + b.NextEligibleTime.Store(b.Clock.Now()) } -func (b *SimpleBackOffBlocker) backOff() time.Duration { - backOffDuration := time.Duration(time.Second.Nanoseconds() * int64(math.Pow(float64(b.BackOffBaseSecond), float64(b.BackOffExponent)))) +func (b *SimpleBackOffBlocker) backOff(ctx context.Context) time.Duration { + logger.Debug(ctx, "BackOff params [BackOffBaseSecond: %v] [BackOffExponent: %v] [MaxBackOffDuration: %v]", + b.BackOffBaseSecond, b.BackOffExponent, b.MaxBackOffDuration) + + backOffDuration := time.Duration(time.Second.Nanoseconds() * int64(math.Pow(float64(b.BackOffBaseSecond), + float64(b.BackOffExponent.Load())))) if backOffDuration > b.MaxBackOffDuration { backOffDuration = b.MaxBackOffDuration + } else { + b.BackOffExponent.Inc() } - b.NextEligibleTime = b.Clock.Now().Add(backOffDuration) - b.BackOffExponent++ + b.NextEligibleTime.Store(b.Clock.Now().Add(backOffDuration)) return backOffDuration } @@ -142,7 +150,7 @@ func (h *ComputeResourceAwareBackOffHandler) Handle(ctx context.Context, operati // if the backOffBlocker is not blocking and we are still encountering insufficient resource issue, // we should increase the exponent in the backoff and update the NextEligibleTime - backOffDuration := h.SimpleBackOffBlocker.backOff() + backOffDuration := h.SimpleBackOffBlocker.backOff(ctx) logger.Infof(ctx, "The operation was attempted because the back-off handler is not blocking, but failed due to "+ "insufficient resource (backing off for a duration of [%v] to timestamp [%v])\n", backOffDuration, h.SimpleBackOffBlocker.NextEligibleTime) } else { diff --git a/flytepropeller/pkg/controller/nodes/task/backoff/handler_test.go b/flytepropeller/pkg/controller/nodes/task/backoff/handler_test.go index 6a3dd7bdad..702ffc122d 100644 --- a/flytepropeller/pkg/controller/nodes/task/backoff/handler_test.go +++ b/flytepropeller/pkg/controller/nodes/task/backoff/handler_test.go @@ -7,6 +7,10 @@ import ( "testing" "time" + stdAtomic "github.com/lyft/flytestdlib/atomic" + + "github.com/magiconair/properties/assert" + taskErrors "github.com/lyft/flyteplugins/go/tasks/errors" stdlibErrors "github.com/lyft/flytestdlib/errors" v1 "k8s.io/api/core/v1" @@ -48,7 +52,7 @@ func TestComputeResourceAwareBackOffHandler_Handle(t *testing.T) { args args wantErr bool wantErrCode stdlibErrors.ErrorCode - wantExp int + wantExp uint32 wantNextEligibleTime time.Time wantCeilings v1.ResourceList wantCallCount int @@ -59,8 +63,8 @@ func TestComputeResourceAwareBackOffHandler_Handle(t *testing.T) { SimpleBackOffBlocker: &SimpleBackOffBlocker{ Clock: tc, BackOffBaseSecond: 2, - BackOffExponent: 1, - NextEligibleTime: tc.Now().Add(time.Second * 7), + BackOffExponent: stdAtomic.NewUint32(1), + NextEligibleTime: NewAtomicTime(tc.Now().Add(time.Second * 7)), MaxBackOffDuration: 10 * time.Second, }, ComputeResourceCeilings: &ComputeResourceCeilings{ @@ -84,8 +88,8 @@ func TestComputeResourceAwareBackOffHandler_Handle(t *testing.T) { SimpleBackOffBlocker: &SimpleBackOffBlocker{ Clock: tc, BackOffBaseSecond: 2, - BackOffExponent: 1, - NextEligibleTime: tc.Now().Add(time.Second * 7), + BackOffExponent: stdAtomic.NewUint32(1), + NextEligibleTime: NewAtomicTime(tc.Now().Add(time.Second * 7)), MaxBackOffDuration: 10 * time.Second, }, ComputeResourceCeilings: &ComputeResourceCeilings{ @@ -111,8 +115,8 @@ func TestComputeResourceAwareBackOffHandler_Handle(t *testing.T) { SimpleBackOffBlocker: &SimpleBackOffBlocker{ Clock: tc, BackOffBaseSecond: 2, - BackOffExponent: 1, - NextEligibleTime: tc.Now().Add(time.Second * -2), + BackOffExponent: stdAtomic.NewUint32(1), + NextEligibleTime: NewAtomicTime(tc.Now().Add(time.Second * -2)), MaxBackOffDuration: 10 * time.Second, }, ComputeResourceCeilings: &ComputeResourceCeilings{ @@ -147,10 +151,10 @@ func TestComputeResourceAwareBackOffHandler_Handle(t *testing.T) { t.Errorf("Handle() errorCode = %v, wantErrCode %v", ec, tt.wantErrCode) } } - if tt.wantExp != h.BackOffExponent { + if tt.wantExp != h.BackOffExponent.Load() { t.Errorf("post-Handle() BackOffExponent = %v, wantBackOffExponent %v", h.BackOffExponent, tt.wantExp) } - if tt.wantNextEligibleTime != h.NextEligibleTime { + if tt.wantNextEligibleTime != h.NextEligibleTime.Load() { t.Errorf("post-Handle() NextEligibleTime = %v, wantNextEligibleTime %v", h.NextEligibleTime, tt.wantNextEligibleTime) } if !reflect.DeepEqual(h.computeResourceCeilings, tt.wantCeilings) { @@ -428,14 +432,14 @@ func TestSimpleBackOffBlocker_backOff(t *testing.T) { type fields struct { Clock clock.Clock BackOffBaseSecond int - BackOffExponent int + BackOffExponent uint32 NextEligibleTime time.Time MaxBackOffDuration time.Duration } tests := []struct { name string fields fields - wantExponent int + wantExponent uint32 wantDuration time.Duration }{ {name: "backoff should increase exponent", @@ -450,7 +454,7 @@ func TestSimpleBackOffBlocker_backOff(t *testing.T) { }, {name: "backoff should saturate", fields: fields{Clock: tc, BackOffBaseSecond: 2, BackOffExponent: 10, NextEligibleTime: tc.Now(), MaxBackOffDuration: maxBackOffDuration}, - wantExponent: 11, + wantExponent: 10, wantDuration: maxBackOffDuration, }, } @@ -459,17 +463,33 @@ func TestSimpleBackOffBlocker_backOff(t *testing.T) { b := &SimpleBackOffBlocker{ Clock: tt.fields.Clock, BackOffBaseSecond: tt.fields.BackOffBaseSecond, - BackOffExponent: tt.fields.BackOffExponent, - NextEligibleTime: tt.fields.NextEligibleTime, + BackOffExponent: stdAtomic.NewUint32(tt.fields.BackOffExponent), + NextEligibleTime: NewAtomicTime(tt.fields.NextEligibleTime), MaxBackOffDuration: tt.fields.MaxBackOffDuration, } - if got := b.backOff(); !reflect.DeepEqual(got, tt.wantDuration) { + if got := b.backOff(context.Background()); !reflect.DeepEqual(got, tt.wantDuration) { t.Errorf("backOff() = %v, want %v", got, tt.wantDuration) } - if gotExp := b.BackOffExponent; !reflect.DeepEqual(gotExp, tt.wantExponent) { - t.Errorf("backOffExponent = %v, want %v", gotExp, tt.wantExponent) + if gotExp := b.BackOffExponent; !reflect.DeepEqual(gotExp.Load(), tt.wantExponent) { + t.Errorf("backOffExponent = %v, want %v", gotExp.Load(), tt.wantExponent) } }) } + + t.Run("backoff many times after maxBackOffDuration is hit", func(t *testing.T) { + b := &SimpleBackOffBlocker{ + Clock: tc, + BackOffBaseSecond: 2, + BackOffExponent: stdAtomic.NewUint32(10), + NextEligibleTime: NewAtomicTime(tc.Now()), + MaxBackOffDuration: maxBackOffDuration, + } + + for i := 0; i < 10; i++ { + backOffDuration := b.backOff(context.Background()) + assert.Equal(t, maxBackOffDuration, backOffDuration) + assert.Equal(t, uint32(10), b.BackOffExponent.Load()) + } + }) } diff --git a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager_test.go b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager_test.go index 4be8a562cc..2574390719 100644 --- a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager_test.go +++ b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager_test.go @@ -343,7 +343,7 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) { refKey := backoff.ComposeResourceKey(referenceResource) podBackOffHandler, found := backOffController.GetBackOffHandler(refKey) assert.True(t, found) - assert.Equal(t, 1, podBackOffHandler.BackOffExponent) + assert.Equal(t, uint32(1), podBackOffHandler.BackOffExponent.Load()) }) }