Skip to content

Commit

Permalink
Fix exponent overflow (flyteorg#100)
Browse files Browse the repository at this point in the history
Backoff controller has two issues: 1) backoff() can overflow the exponent value leading to 0 backoff duration all the time. 2) There is a race condition in updating fields of the simple backoff handler. this PR should address both.
  • Loading branch information
EngHabu authored Mar 31, 2020
1 parent e1e9ca2 commit fd3571d
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 50 deletions.
31 changes: 31 additions & 0 deletions pkg/controller/nodes/task/backoff/atomic_time.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package backoff

import (
"sync/atomic"
"time"
)

// AtomicTime represents an atomic.Value that stores time.Time.
type AtomicTime struct {
v atomic.Value
}

// Loads the underlying time.Time.
func (a *AtomicTime) Load() time.Time {
return a.v.Load().(time.Time)
}

// Stores time.Time to the underlying atomic.Value
func (a *AtomicTime) Store(t time.Time) {
a.v.Store(t)
}

// Creates a new Atomic time.Time
func NewAtomicTime(t time.Time) AtomicTime {
v := atomic.Value{}
v.Store(t)

return AtomicTime{
v: v,
}
}
27 changes: 6 additions & 21 deletions pkg/controller/nodes/task/backoff/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"time"

stdAtomic "github.com/lyft/flytestdlib/atomic"

"github.com/lyft/flytestdlib/logger"

"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/k8s"
Expand All @@ -24,8 +26,8 @@ func (m *Controller) GetOrCreateHandler(ctx context.Context, key string, backOff
SimpleBackOffBlocker: &SimpleBackOffBlocker{
Clock: m.Clock,
BackOffBaseSecond: backOffBaseSecond,
BackOffExponent: 0,
NextEligibleTime: m.Clock.Now(),
BackOffExponent: stdAtomic.NewUint32(0),
NextEligibleTime: NewAtomicTime(m.Clock.Now()),
MaxBackOffDuration: maxBackOffDuration,
}, ComputeResourceCeilings: &ComputeResourceCeilings{
computeResourceCeilings: v1.ResourceList{},
Expand All @@ -37,35 +39,18 @@ func (m *Controller) GetOrCreateHandler(ctx context.Context, key string, backOff
} else {
logger.Infof(ctx, "The back-off handler for [%v] has been created.\n", key)
}

if ret, casted := h.(*ComputeResourceAwareBackOffHandler); casted {
return ret
}

return nil
}

func (m *Controller) GetBackOffHandler(key string) (*ComputeResourceAwareBackOffHandler, bool) {
return m.backOffHandlerMap.Get(key)
}

func (m *Controller) CreateBackOffHandler(ctx context.Context, key string, backOffBaseSecond int, maxBackOffDuration time.Duration) *ComputeResourceAwareBackOffHandler {
m.backOffHandlerMap.Set(key, &ComputeResourceAwareBackOffHandler{
SimpleBackOffBlocker: &SimpleBackOffBlocker{
Clock: m.Clock,
BackOffBaseSecond: backOffBaseSecond,
BackOffExponent: 0,
NextEligibleTime: m.Clock.Now(),
MaxBackOffDuration: maxBackOffDuration,
},
ComputeResourceCeilings: &ComputeResourceCeilings{
computeResourceCeilings: v1.ResourceList{},
},
})
h, _ := m.backOffHandlerMap.Get(key)
h.reset()
logger.Infof(ctx, "The back-off handler for [%v] has been created.\n", key)
return h
}

func ComposeResourceKey(o k8s.Resource) string {
return fmt.Sprintf("%v,%v", o.GroupVersionKind().String(), o.GetNamespace())
}
Expand Down
30 changes: 19 additions & 11 deletions pkg/controller/nodes/task/backoff/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/lyft/flyteplugins/go/tasks/errors"
stdAtomic "github.com/lyft/flytestdlib/atomic"
stdErrors "github.com/lyft/flytestdlib/errors"
"github.com/lyft/flytestdlib/logger"
v1 "k8s.io/api/core/v1"
Expand All @@ -24,33 +25,40 @@ var (
type SimpleBackOffBlocker struct {
Clock clock.Clock
BackOffBaseSecond int
BackOffExponent int
NextEligibleTime time.Time
MaxBackOffDuration time.Duration

// Mutable fields
BackOffExponent stdAtomic.Uint32
NextEligibleTime AtomicTime
}

func (b *SimpleBackOffBlocker) isBlocking(t time.Time) bool {
return !b.NextEligibleTime.Before(t)
return !b.NextEligibleTime.Load().Before(t)
}

func (b *SimpleBackOffBlocker) getBlockExpirationTime() time.Time {
return b.NextEligibleTime
return b.NextEligibleTime.Load()
}

func (b *SimpleBackOffBlocker) reset() {
b.BackOffExponent = 0
b.NextEligibleTime = b.Clock.Now()
b.BackOffExponent.Store(0)
b.NextEligibleTime.Store(b.Clock.Now())
}

func (b *SimpleBackOffBlocker) backOff() time.Duration {
backOffDuration := time.Duration(time.Second.Nanoseconds() * int64(math.Pow(float64(b.BackOffBaseSecond), float64(b.BackOffExponent))))
func (b *SimpleBackOffBlocker) backOff(ctx context.Context) time.Duration {
logger.Debug(ctx, "BackOff params [BackOffBaseSecond: %v] [BackOffExponent: %v] [MaxBackOffDuration: %v]",
b.BackOffBaseSecond, b.BackOffExponent, b.MaxBackOffDuration)

backOffDuration := time.Duration(time.Second.Nanoseconds() * int64(math.Pow(float64(b.BackOffBaseSecond),
float64(b.BackOffExponent.Load()))))

if backOffDuration > b.MaxBackOffDuration {
backOffDuration = b.MaxBackOffDuration
} else {
b.BackOffExponent.Inc()
}

b.NextEligibleTime = b.Clock.Now().Add(backOffDuration)
b.BackOffExponent++
b.NextEligibleTime.Store(b.Clock.Now().Add(backOffDuration))
return backOffDuration
}

Expand Down Expand Up @@ -142,7 +150,7 @@ func (h *ComputeResourceAwareBackOffHandler) Handle(ctx context.Context, operati
// if the backOffBlocker is not blocking and we are still encountering insufficient resource issue,
// we should increase the exponent in the backoff and update the NextEligibleTime

backOffDuration := h.SimpleBackOffBlocker.backOff()
backOffDuration := h.SimpleBackOffBlocker.backOff(ctx)
logger.Infof(ctx, "The operation was attempted because the back-off handler is not blocking, but failed due to "+
"insufficient resource (backing off for a duration of [%v] to timestamp [%v])\n", backOffDuration, h.SimpleBackOffBlocker.NextEligibleTime)
} else {
Expand Down
54 changes: 37 additions & 17 deletions pkg/controller/nodes/task/backoff/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ import (
"testing"
"time"

stdAtomic "github.com/lyft/flytestdlib/atomic"

"github.com/magiconair/properties/assert"

taskErrors "github.com/lyft/flyteplugins/go/tasks/errors"
stdlibErrors "github.com/lyft/flytestdlib/errors"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -48,7 +52,7 @@ func TestComputeResourceAwareBackOffHandler_Handle(t *testing.T) {
args args
wantErr bool
wantErrCode stdlibErrors.ErrorCode
wantExp int
wantExp uint32
wantNextEligibleTime time.Time
wantCeilings v1.ResourceList
wantCallCount int
Expand All @@ -59,8 +63,8 @@ func TestComputeResourceAwareBackOffHandler_Handle(t *testing.T) {
SimpleBackOffBlocker: &SimpleBackOffBlocker{
Clock: tc,
BackOffBaseSecond: 2,
BackOffExponent: 1,
NextEligibleTime: tc.Now().Add(time.Second * 7),
BackOffExponent: stdAtomic.NewUint32(1),
NextEligibleTime: NewAtomicTime(tc.Now().Add(time.Second * 7)),
MaxBackOffDuration: 10 * time.Second,
},
ComputeResourceCeilings: &ComputeResourceCeilings{
Expand All @@ -84,8 +88,8 @@ func TestComputeResourceAwareBackOffHandler_Handle(t *testing.T) {
SimpleBackOffBlocker: &SimpleBackOffBlocker{
Clock: tc,
BackOffBaseSecond: 2,
BackOffExponent: 1,
NextEligibleTime: tc.Now().Add(time.Second * 7),
BackOffExponent: stdAtomic.NewUint32(1),
NextEligibleTime: NewAtomicTime(tc.Now().Add(time.Second * 7)),
MaxBackOffDuration: 10 * time.Second,
},
ComputeResourceCeilings: &ComputeResourceCeilings{
Expand All @@ -111,8 +115,8 @@ func TestComputeResourceAwareBackOffHandler_Handle(t *testing.T) {
SimpleBackOffBlocker: &SimpleBackOffBlocker{
Clock: tc,
BackOffBaseSecond: 2,
BackOffExponent: 1,
NextEligibleTime: tc.Now().Add(time.Second * -2),
BackOffExponent: stdAtomic.NewUint32(1),
NextEligibleTime: NewAtomicTime(tc.Now().Add(time.Second * -2)),
MaxBackOffDuration: 10 * time.Second,
},
ComputeResourceCeilings: &ComputeResourceCeilings{
Expand Down Expand Up @@ -147,10 +151,10 @@ func TestComputeResourceAwareBackOffHandler_Handle(t *testing.T) {
t.Errorf("Handle() errorCode = %v, wantErrCode %v", ec, tt.wantErrCode)
}
}
if tt.wantExp != h.BackOffExponent {
if tt.wantExp != h.BackOffExponent.Load() {
t.Errorf("post-Handle() BackOffExponent = %v, wantBackOffExponent %v", h.BackOffExponent, tt.wantExp)
}
if tt.wantNextEligibleTime != h.NextEligibleTime {
if tt.wantNextEligibleTime != h.NextEligibleTime.Load() {
t.Errorf("post-Handle() NextEligibleTime = %v, wantNextEligibleTime %v", h.NextEligibleTime, tt.wantNextEligibleTime)
}
if !reflect.DeepEqual(h.computeResourceCeilings, tt.wantCeilings) {
Expand Down Expand Up @@ -428,14 +432,14 @@ func TestSimpleBackOffBlocker_backOff(t *testing.T) {
type fields struct {
Clock clock.Clock
BackOffBaseSecond int
BackOffExponent int
BackOffExponent uint32
NextEligibleTime time.Time
MaxBackOffDuration time.Duration
}
tests := []struct {
name string
fields fields
wantExponent int
wantExponent uint32
wantDuration time.Duration
}{
{name: "backoff should increase exponent",
Expand All @@ -450,7 +454,7 @@ func TestSimpleBackOffBlocker_backOff(t *testing.T) {
},
{name: "backoff should saturate",
fields: fields{Clock: tc, BackOffBaseSecond: 2, BackOffExponent: 10, NextEligibleTime: tc.Now(), MaxBackOffDuration: maxBackOffDuration},
wantExponent: 11,
wantExponent: 10,
wantDuration: maxBackOffDuration,
},
}
Expand All @@ -459,17 +463,33 @@ func TestSimpleBackOffBlocker_backOff(t *testing.T) {
b := &SimpleBackOffBlocker{
Clock: tt.fields.Clock,
BackOffBaseSecond: tt.fields.BackOffBaseSecond,
BackOffExponent: tt.fields.BackOffExponent,
NextEligibleTime: tt.fields.NextEligibleTime,
BackOffExponent: stdAtomic.NewUint32(tt.fields.BackOffExponent),
NextEligibleTime: NewAtomicTime(tt.fields.NextEligibleTime),
MaxBackOffDuration: tt.fields.MaxBackOffDuration,
}

if got := b.backOff(); !reflect.DeepEqual(got, tt.wantDuration) {
if got := b.backOff(context.Background()); !reflect.DeepEqual(got, tt.wantDuration) {
t.Errorf("backOff() = %v, want %v", got, tt.wantDuration)
}
if gotExp := b.BackOffExponent; !reflect.DeepEqual(gotExp, tt.wantExponent) {
t.Errorf("backOffExponent = %v, want %v", gotExp, tt.wantExponent)
if gotExp := b.BackOffExponent; !reflect.DeepEqual(gotExp.Load(), tt.wantExponent) {
t.Errorf("backOffExponent = %v, want %v", gotExp.Load(), tt.wantExponent)
}
})
}

t.Run("backoff many times after maxBackOffDuration is hit", func(t *testing.T) {
b := &SimpleBackOffBlocker{
Clock: tc,
BackOffBaseSecond: 2,
BackOffExponent: stdAtomic.NewUint32(10),
NextEligibleTime: NewAtomicTime(tc.Now()),
MaxBackOffDuration: maxBackOffDuration,
}

for i := 0; i < 10; i++ {
backOffDuration := b.backOff(context.Background())
assert.Equal(t, maxBackOffDuration, backOffDuration)
assert.Equal(t, uint32(10), b.BackOffExponent.Load())
}
})
}
2 changes: 1 addition & 1 deletion pkg/controller/nodes/task/k8s/plugin_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) {
refKey := backoff.ComposeResourceKey(referenceResource)
podBackOffHandler, found := backOffController.GetBackOffHandler(refKey)
assert.True(t, found)
assert.Equal(t, 1, podBackOffHandler.BackOffExponent)
assert.Equal(t, uint32(1), podBackOffHandler.BackOffExponent.Load())
})
}

Expand Down

0 comments on commit fd3571d

Please sign in to comment.