Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Tracking terminated workflows in LRU cache (#450)
Browse files Browse the repository at this point in the history
* added lru filter to track terminated workflows

Signed-off-by: Daniel Rammer <[email protected]>

* updated unit tests

Signed-off-by: Daniel Rammer <[email protected]>

* add test on terminated filter

Signed-off-by: Daniel Rammer <[email protected]>

Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw authored Sep 1, 2022
1 parent 87c289c commit aa3e9a6
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 14 deletions.
5 changes: 5 additions & 0 deletions pkg/controller/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ func (p *Propeller) Handle(ctx context.Context, namespace, name string) error {
logger.Warningf(ctx, "Workflow namespace[%v]/name[%v] not found, may be deleted.", namespace, name)
return nil
}
if workflowstore.IsWorkflowTerminated(fetchErr) {
p.metrics.RoundSkipped.Inc()
logger.Warningf(ctx, "Workflow namespace[%v]/name[%v] has already been terminated.", namespace, name)
return nil
}
if workflowstore.IsWorkflowStale(fetchErr) {
p.metrics.RoundSkipped.Inc()
logger.Warningf(ctx, "Workflow namespace[%v]/name[%v] Stale.", namespace, name)
Expand Down
8 changes: 8 additions & 0 deletions pkg/controller/workflowstore/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ var ErrStaleWorkflowError = fmt.Errorf("stale Workflow Found error")
// ErrWorkflowNotFound indicates that the workflow does not exist and it is safe to ignore the event
var ErrWorkflowNotFound = fmt.Errorf("workflow not-found error")

// ErrWorkflowTerminated indicates that the workflow being operated on has previously been stored in a terminal state.
var ErrWorkflowTerminated = fmt.Errorf("workflow has already been terminated")

// ErrWorkflowToLarge is returned in cased an update operation fails because the Workflow object (CRD) has surpassed the Datastores
// supported limit.
var ErrWorkflowToLarge = fmt.Errorf("workflow too large")
Expand All @@ -27,6 +30,11 @@ func IsWorkflowStale(err error) bool {
return errors.Cause(err) == ErrStaleWorkflowError
}

// IsWorkflowTerminated returns true if the error is caused by ErrWorkflowTerminated
func IsWorkflowTerminated(err error) bool {
return errors.Cause(err) == ErrWorkflowTerminated
}

// IsWorkflowTooLarge returns true if the error is caused by ErrWorkflowToLarge
func IsWorkflowTooLarge(err error) bool {
return errors.Cause(err) == ErrWorkflowToLarge
Expand Down
17 changes: 13 additions & 4 deletions pkg/controller/workflowstore/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,23 @@ import (
func NewWorkflowStore(ctx context.Context, cfg *Config, lister v1alpha1.FlyteWorkflowLister,
workflows flyteworkflowv1alpha1.FlyteworkflowV1alpha1Interface, scope promutils.Scope) (FlyteWorkflow, error) {

var workflowStore FlyteWorkflow
var err error

switch cfg.Policy {
case PolicyInMemory:
return NewInMemoryWorkflowStore(), nil
workflowStore = NewInMemoryWorkflowStore()
case PolicyPassThrough:
return NewPassthroughWorkflowStore(ctx, scope, workflows, lister), nil
workflowStore = NewPassthroughWorkflowStore(ctx, scope, workflows, lister)
case PolicyResourceVersionCache:
return NewResourceVersionCachingStore(ctx, scope, NewPassthroughWorkflowStore(ctx, scope, workflows, lister)), nil
workflowStore, err = NewResourceVersionCachingStore(ctx, scope, NewPassthroughWorkflowStore(ctx, scope, workflows, lister))
}

if err != nil {
return nil, err
} else if workflowStore == nil {
return nil, fmt.Errorf("empty workflow store config")
}

return nil, fmt.Errorf("empty workflow store config")
return workflowStore, err
}
31 changes: 27 additions & 4 deletions pkg/controller/workflowstore/resource_version_caching.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ import (
"fmt"
"sync"

"github.com/flyteorg/flytestdlib/promutils/labeled"

"github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"

"github.com/flyteorg/flytestdlib/fastcheck"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/flyteorg/flytestdlib/promutils/labeled"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
Expand All @@ -32,6 +33,7 @@ type resourceVersionCaching struct {
w FlyteWorkflow
metrics *resourceVersionMetrics
lastUpdatedResourceVersionCache sync.Map
terminatedFilter fastcheck.Filter
}

func (r *resourceVersionCaching) updateRevisionCache(ctx context.Context, namespace, name, resourceVersion string, isTerminated bool) {
Expand All @@ -56,6 +58,13 @@ func (r *resourceVersionCaching) isResourceVersionSameAsPrevious(ctx context.Con
}

func (r *resourceVersionCaching) Get(ctx context.Context, namespace, name string) (*v1alpha1.FlyteWorkflow, error) {
// Check if the resource version key has already been stored in a terminal phase. Processing
// terminated FlyteWorkflows can occur when workflow updates are reported after a workflow
// has already completed.
if r.terminatedFilter.Contains(ctx, []byte(resourceVersionKey(namespace, name))) {
return nil, ErrWorkflowTerminated
}

w, err := r.w.Get(ctx, namespace, name)
if err != nil {
return nil, err
Expand Down Expand Up @@ -89,6 +98,10 @@ func (r *resourceVersionCaching) UpdateStatus(ctx context.Context, workflow *v1a
} else {
r.metrics.workflowRedundantUpdatesCount.Inc(ctx)
}

if newWF.GetExecutionStatus().IsTerminated() {
r.terminatedFilter.Add(ctx, []byte(resourceVersionKey(workflow.Namespace, workflow.Name)))
}
}

return newWF, nil
Expand Down Expand Up @@ -117,12 +130,21 @@ func (r *resourceVersionCaching) Update(ctx context.Context, workflow *v1alpha1.
} else {
r.metrics.workflowRedundantUpdatesCount.Inc(ctx)
}

if newWF.GetExecutionStatus().IsTerminated() {
r.terminatedFilter.Add(ctx, []byte(resourceVersionKey(workflow.Namespace, workflow.Name)))
}
}

return newWF, nil
}

func NewResourceVersionCachingStore(_ context.Context, scope promutils.Scope, workflowStore FlyteWorkflow) FlyteWorkflow {
func NewResourceVersionCachingStore(_ context.Context, scope promutils.Scope, workflowStore FlyteWorkflow) (FlyteWorkflow, error) {
filter, err := fastcheck.NewLRUCacheFilter(1000, scope.NewSubScope("terminated_filter"))
if err != nil {
return nil, err
}

return &resourceVersionCaching{
w: workflowStore,
metrics: &resourceVersionMetrics{
Expand All @@ -131,5 +153,6 @@ func NewResourceVersionCachingStore(_ context.Context, scope promutils.Scope, wo
workflowRedundantUpdatesCount: labeled.NewCounter("wf_redundant", "Workflow Update called but ectd. detected no actual update to the workflow.", scope, labeled.EmitUnlabeledMetric),
},
lastUpdatedResourceVersionCache: sync.Map{},
}
terminatedFilter: filter,
}, nil
}
24 changes: 18 additions & 6 deletions pkg/controller/workflowstore/resource_version_caching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ func TestResourceVersionCaching_Get_NotInCache(t *testing.T) {

scope := promutils.NewTestScope()
l := &mockWFNamespaceLister{}
wfStore := NewResourceVersionCachingStore(ctx, scope, NewPassthroughWorkflowStore(ctx, scope, mockClient, &mockWFLister{V: l}))
wfStore, err := NewResourceVersionCachingStore(ctx, scope, NewPassthroughWorkflowStore(ctx, scope, mockClient, &mockWFLister{V: l}))
assert.NoError(t, err)

t.Run("notFound", func(t *testing.T) {
l.GetCb = func(name string) (*v1alpha1.FlyteWorkflow, error) {
Expand Down Expand Up @@ -162,7 +163,8 @@ func TestResourceVersionCaching_Get_UpdateAndRead(t *testing.T) {
newWf := wf.DeepCopy()
newWf.Status.Phase = v1alpha1.WorkflowPhaseSucceeding

wfStore := NewResourceVersionCachingStore(ctx, scope, NewPassthroughWorkflowStore(ctx, scope, mockClient, &mockWFLister{V: l}))
wfStore, err := NewResourceVersionCachingStore(ctx, scope, NewPassthroughWorkflowStore(ctx, scope, mockClient, &mockWFLister{V: l}))
assert.NoError(t, err)
// Insert a new workflow with R1
_, err = wfStore.Update(ctx, newWf, PriorityClassCritical)
assert.NoError(t, err)
Expand All @@ -181,9 +183,10 @@ func TestResourceVersionCaching_Get_UpdateAndRead(t *testing.T) {

scope := promutils.NewTestScope()
l := &mockWFNamespaceLister{}
wfStore := NewResourceVersionCachingStore(ctx, scope, NewPassthroughWorkflowStore(ctx, scope, mockClient, &mockWFLister{V: l}))
wfStore, err := NewResourceVersionCachingStore(ctx, scope, NewPassthroughWorkflowStore(ctx, scope, mockClient, &mockWFLister{V: l}))
assert.NoError(t, err)
// Insert a new workflow with R1
_, err := wfStore.Update(ctx, wf, PriorityClassCritical)
_, err = wfStore.Update(ctx, wf, PriorityClassCritical)
assert.NoError(t, err)

// Update the workflow version
Expand Down Expand Up @@ -213,7 +216,8 @@ func TestResourceVersionCaching_Get_UpdateAndRead(t *testing.T) {

scope := promutils.NewTestScope()
l := &mockWFNamespaceLister{}
wfStore := NewResourceVersionCachingStore(ctx, scope, NewPassthroughWorkflowStore(ctx, scope, mockClient, &mockWFLister{V: l}))
wfStore, err := NewResourceVersionCachingStore(ctx, scope, NewPassthroughWorkflowStore(ctx, scope, mockClient, &mockWFLister{V: l}))
assert.NoError(t, err)
// Insert a new workflow with R1
_, err = wfStore.Update(ctx, wf, PriorityClassCritical)
assert.NoError(t, err)
Expand Down Expand Up @@ -250,7 +254,8 @@ func TestResourceVersionCaching_UpdateTerminated(t *testing.T) {

scope := promutils.NewTestScope()
l := &mockWFNamespaceLister{}
wfStore := NewResourceVersionCachingStore(ctx, scope, NewPassthroughWorkflowStore(ctx, scope, mockClient, &mockWFLister{V: l}))
wfStore, err := NewResourceVersionCachingStore(ctx, scope, NewPassthroughWorkflowStore(ctx, scope, mockClient, &mockWFLister{V: l}))
assert.NoError(t, err)
// Insert a new workflow with R1
_, err = wfStore.Update(ctx, newWf, PriorityClassCritical)
assert.NoError(t, err)
Expand All @@ -269,4 +274,11 @@ func TestResourceVersionCaching_UpdateTerminated(t *testing.T) {
assert.False(t, ok)
assert.Nil(t, v)

// validate that terminated workflows are not retrievable
terminated := rvStore.terminatedFilter.Contains(ctx, []byte(resourceVersionKey(namespace, name)))
assert.True(t, terminated)

terminatedWf, err := wfStore.Get(ctx, namespace, name)
assert.Nil(t, terminatedWf)
assert.True(t, IsWorkflowTerminated(err))
}

0 comments on commit aa3e9a6

Please sign in to comment.