Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
cache crd fields instead of wf closure
Browse files Browse the repository at this point in the history
Signed-off-by: Babis Kiosidis <[email protected]>
  • Loading branch information
ckiosidis committed Aug 9, 2022
1 parent 51e9cf2 commit b85ea6a
Show file tree
Hide file tree
Showing 16 changed files with 256 additions and 261 deletions.
4 changes: 2 additions & 2 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ import (
lister "github.com/flyteorg/flytepropeller/pkg/client/listers/flyteworkflow/v1alpha1"
"github.com/flyteorg/flytepropeller/pkg/compiler/transformers/k8s"
"github.com/flyteorg/flytepropeller/pkg/controller/config"
"github.com/flyteorg/flytepropeller/pkg/controller/crdfieldsstore"
"github.com/flyteorg/flytepropeller/pkg/controller/executors"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes"
errors3 "github.com/flyteorg/flytepropeller/pkg/controller/nodes/errors"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/recovery"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/subworkflow/launchplan"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/catalog"
"github.com/flyteorg/flytepropeller/pkg/controller/workflow"
"github.com/flyteorg/flytepropeller/pkg/controller/workflowclosurestore"
"github.com/flyteorg/flytepropeller/pkg/controller/workflowstore"
leader "github.com/flyteorg/flytepropeller/pkg/leaderelection"
"github.com/flyteorg/flytepropeller/pkg/utils"
Expand Down Expand Up @@ -431,7 +431,7 @@ func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Inter
return nil, stdErrs.Wrapf(errors3.CausedByError, err, "failed to initialize workflow store")
}

workflowClosureStore, err := workflowclosurestore.NewWorkflowClosureStore(ctx, workflowclosurestore.GetConfig(), store, scope.NewSubScope("wfclosuire"))
workflowClosureStore, err := crdfieldsstore.NewWfClosureCrdFieldsStore(ctx, crdfieldsstore.GetConfig(), store, scope.NewSubScope("wfclosuire"))
if err != nil {
return nil, stdErrs.Wrapf(errors3.CausedByError, err, "failed to initialize Workflow Closure store")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,32 +1,30 @@
package workflowclosurestore
package crdfieldsstore

import (
"context"
"time"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"

"github.com/flyteorg/flytestdlib/promutils"
"github.com/prometheus/client_golang/prometheus"

"github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
)

type activeWorkflowClosureMetrics struct {
type activeCrdFieldsMetrics struct {
TotalItems prometheus.Gauge
CacheHit prometheus.Counter
CacheMiss prometheus.Counter
ReadError prometheus.Counter
FetchLatency promutils.StopWatch
}

type activeWorkflowClosureStore struct {
workflowClosureStore WorkflowClosureStore
metrics *activeWorkflowClosureMetrics
store map[string]*core.CompiledWorkflowClosure
type activeWfClosureCrdFieldsStore struct {
wfClosureCrdFieldsStore WfClosureCrdFieldsStore
metrics *activeCrdFieldsMetrics
store map[string]*WfClosureCrdFields
}

func (a *activeWorkflowClosureStore) Get(ctx context.Context, dataReference v1alpha1.DataReference) (*core.CompiledWorkflowClosure, error) {
func (a *activeWfClosureCrdFieldsStore) Get(ctx context.Context, dataReference v1alpha1.DataReference) (*WfClosureCrdFields, error) {
location := dataReference.String()
if m, ok := a.store[location]; ok {
a.metrics.CacheHit.Inc()
Expand All @@ -36,20 +34,20 @@ func (a *activeWorkflowClosureStore) Get(ctx context.Context, dataReference v1al
a.metrics.CacheMiss.Inc()

timer := a.metrics.FetchLatency.Start()
workflowClosure, err := a.workflowClosureStore.Get(ctx, dataReference)
wfClosureCrdFields, err := a.wfClosureCrdFieldsStore.Get(ctx, dataReference)
timer.Stop()
if err != nil {
a.metrics.ReadError.Inc()
return nil, err
}

a.metrics.TotalItems.Inc()
a.store[location] = workflowClosure
return workflowClosure, nil
a.store[location] = wfClosureCrdFields
return wfClosureCrdFields, nil
}

func (a *activeWorkflowClosureStore) Remove(ctx context.Context, dataReference v1alpha1.DataReference) error {
if err := a.workflowClosureStore.Remove(ctx, dataReference); err != nil {
func (a *activeWfClosureCrdFieldsStore) Remove(ctx context.Context, dataReference v1alpha1.DataReference) error {
if err := a.wfClosureCrdFieldsStore.Remove(ctx, dataReference); err != nil {
return err
}

Expand All @@ -59,19 +57,19 @@ func (a *activeWorkflowClosureStore) Remove(ctx context.Context, dataReference v
return nil
}

func NewActiveWorkflowClosureStore(workflowClosureStore WorkflowClosureStore, scope promutils.Scope) WorkflowClosureStore {
func NewActiveWfClosureCrdFieldsStore(wfClosureCrdFieldsStore WfClosureCrdFieldsStore, scope promutils.Scope) WfClosureCrdFieldsStore {
activeScope := scope.NewSubScope("active")
metrics := &activeWorkflowClosureMetrics{
metrics := &activeCrdFieldsMetrics{
TotalItems: activeScope.MustNewGauge("total_items", "Total Items in cache"),
FetchLatency: activeScope.MustNewStopWatch("fetch", "Total Time to read from underlying datastorage", time.Millisecond),
CacheHit: activeScope.MustNewCounter("cache_hit", "Number of times object was found in active cache"),
CacheMiss: activeScope.MustNewCounter("cache_miss", "Number of times object was not found in active cache"),
ReadError: activeScope.MustNewCounter("cache_read_error", "Failed to read from underlying storage"),
}

return &activeWorkflowClosureStore{
workflowClosureStore: workflowClosureStore,
metrics: metrics,
store: map[string]*core.CompiledWorkflowClosure{},
return &activeWfClosureCrdFieldsStore{
wfClosureCrdFieldsStore: wfClosureCrdFieldsStore,
metrics: metrics,
store: map[string]*WfClosureCrdFields{},
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
package workflowclosurestore
package crdfieldsstore

import (
"context"
"fmt"
"reflect"
"testing"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"

"github.com/flyteorg/flytepropeller/pkg/controller/workflowclosurestore/mocks"
"github.com/flyteorg/flytepropeller/pkg/controller/crdfieldsstore/mocks"

"github.com/flyteorg/flytestdlib/promutils"

Expand All @@ -18,65 +16,65 @@ import (

func TestActiveWfClosureStore(t *testing.T) {
ctx := context.TODO()
workflowClosure := core.CompiledWorkflowClosure{}
wfClosureCrdFields := WfClosureCrdFields{}

t.Run("Happy", func(t *testing.T) {
// initialize mocks
mockStore := &mocks.WorkflowClosureStore{}
mockStore.OnGetMatch(mock.Anything, mock.Anything).Return(&workflowClosure, nil)
mockStore := &mocks.WfClosureCrdFieldsStore{}
mockStore.OnGetMatch(mock.Anything, mock.Anything).Return(&wfClosureCrdFields, nil)

scope := promutils.NewTestScope()
activeStore := NewActiveWorkflowClosureStore(mockStore, scope)
activeStore := NewActiveWfClosureCrdFieldsStore(mockStore, scope)

// Get from underlying WorkflowClosureStore
// Get from underlying WfClosureCrdFieldsStore
data, err := activeStore.Get(ctx, "foo")
assert.NoError(t, err)
assert.True(t, reflect.DeepEqual(workflowClosure, *data))
assert.True(t, reflect.DeepEqual(wfClosureCrdFields, *data))
mockStore.AssertNumberOfCalls(t, "Get", 1)

// Get from cache
data, err = activeStore.Get(ctx, "foo")
assert.NoError(t, err)
assert.True(t, reflect.DeepEqual(workflowClosure, *data))
assert.True(t, reflect.DeepEqual(wfClosureCrdFields, *data))
mockStore.AssertNumberOfCalls(t, "Get", 1)
})

t.Run("Remove", func(t *testing.T) {
// initialize mocks
mockStore := &mocks.WorkflowClosureStore{}
mockStore.OnGetMatch(mock.Anything, mock.Anything).Return(&workflowClosure, nil)
mockStore := &mocks.WfClosureCrdFieldsStore{}
mockStore.OnGetMatch(mock.Anything, mock.Anything).Return(&wfClosureCrdFields, nil)
mockStore.OnRemoveMatch(mock.Anything, mock.Anything).Return(nil)

scope := promutils.NewTestScope()
activeStore := NewActiveWorkflowClosureStore(mockStore, scope)
activeStore := NewActiveWfClosureCrdFieldsStore(mockStore, scope)

// Get from underlying WorkflowClosureStore
// Get from underlying WfClosureCrdFieldsStore
data, err := activeStore.Get(ctx, "foo")
assert.NoError(t, err)
assert.True(t, reflect.DeepEqual(workflowClosure, *data))
assert.True(t, reflect.DeepEqual(wfClosureCrdFields, *data))
mockStore.AssertNumberOfCalls(t, "Get", 1)

// Remove from underlying WorkflowClosureStore
// Remove from underlying WfClosureCrdFieldsStore
err = activeStore.Remove(ctx, "foo")
assert.NoError(t, err)
mockStore.AssertNumberOfCalls(t, "Remove", 1)

// Get from cache
data, err = activeStore.Get(ctx, "foo")
assert.NoError(t, err)
assert.True(t, reflect.DeepEqual(workflowClosure, *data))
assert.True(t, reflect.DeepEqual(wfClosureCrdFields, *data))
mockStore.AssertNumberOfCalls(t, "Get", 2)
})

t.Run("UnderlyingError", func(t *testing.T) {
// initialize mocks
mockStore := &mocks.WorkflowClosureStore{}
mockStore := &mocks.WfClosureCrdFieldsStore{}
mockStore.OnGetMatch(mock.Anything, mock.Anything).Return(nil, fmt.Errorf("foo"))

scope := promutils.NewTestScope()
activeStore := NewActiveWorkflowClosureStore(mockStore, scope)
activeStore := NewActiveWfClosureCrdFieldsStore(mockStore, scope)

// Get from underlying WorkflowClosureStore
// Get from underlying WfClosureCrdFieldsStore
data, err := activeStore.Get(ctx, "foo")
assert.Error(t, err)
assert.Nil(t, data)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package workflowclosurestore
package crdfieldsstore

import (
ctrlConfig "github.com/flyteorg/flytepropeller/pkg/controller/config"
Expand All @@ -20,7 +20,7 @@ var (
Size: 1000,
}

configSection = ctrlConfig.MustRegisterSubSection("wfClosureStore", defaultConfig)
configSection = ctrlConfig.MustRegisterSubSection("wfClosureCrdFields", defaultConfig)
)

type Config struct {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package workflowclosurestore
package crdfieldsstore

import "fmt"

Expand Down
42 changes: 42 additions & 0 deletions pkg/controller/crdfieldsstore/iface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package crdfieldsstore

import (
"context"
"fmt"

"github.com/flyteorg/flytestdlib/promutils"

"github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"

"github.com/flyteorg/flytestdlib/storage"
)

//go:generate mockery -all -output=mocks -case=underscore

// WfClosureCrdFields interface provides an abstraction for retrieving CompiledWorkflowClosure blobs
// and transforming them to the static CRD fields that it contains.
// Caching the WfClosureCrdFields instead of the CompiledWorkflowClosure to avoid the
// expensive transformations.
type WfClosureCrdFieldsStore interface {
Get(ctx context.Context, dataReference v1alpha1.DataReference) (*WfClosureCrdFields, error)
Remove(ctx context.Context, dataReference v1alpha1.DataReference) error
}

func NewWfClosureCrdFieldsStore(ctx context.Context, cfg *Config, dataStore *storage.DataStore, scope promutils.Scope) (WfClosureCrdFieldsStore, error) {
switch cfg.Policy {
case PolicyActive:
return NewActiveWfClosureCrdFieldsStore(NewPassthroughWfClosureCrdFieldsStore(dataStore), scope), nil
case PolicyLRU:
return NewLRUWfClosureCrdFieldsStore(NewPassthroughWfClosureCrdFieldsStore(dataStore), cfg.Size, scope)
case PolicyPassThrough:
return NewPassthroughWfClosureCrdFieldsStore(dataStore), nil
}

return nil, fmt.Errorf("empty workflow closure store config")
}

type WfClosureCrdFields struct {
*v1alpha1.WorkflowSpec `json:"spec"`
SubWorkflows map[v1alpha1.WorkflowID]*v1alpha1.WorkflowSpec `json:"subWorkflows,omitempty"`
Tasks map[v1alpha1.TaskID]*v1alpha1.TaskSpec `json:"tasks"`
}
Loading

0 comments on commit b85ea6a

Please sign in to comment.