Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
add metrics/formatting
Browse files Browse the repository at this point in the history
Signed-off-by: Babis Kiosidis <[email protected]>
  • Loading branch information
Babis Kiosidis committed Jul 28, 2022
1 parent 82cb636 commit f877e36
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 23 deletions.
4 changes: 2 additions & 2 deletions pkg/apis/flyteworkflow/v1alpha1/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,8 @@ func (in *WorkflowSpec) GetNodes() []NodeID {

type StaticWorkflowData struct {
*WorkflowSpec `json:"spec"`
SubWorkflows map[WorkflowID]*WorkflowSpec `json:"subWorkflows,omitempty"`
Tasks map[TaskID]*TaskSpec `json:"tasks"`
SubWorkflows map[WorkflowID]*WorkflowSpec `json:"subWorkflows,omitempty"`
Tasks map[TaskID]*TaskSpec `json:"tasks"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ import (
lister "github.com/flyteorg/flytepropeller/pkg/client/listers/flyteworkflow/v1alpha1"
"github.com/flyteorg/flytepropeller/pkg/compiler/transformers/k8s"
"github.com/flyteorg/flytepropeller/pkg/controller/config"
"github.com/flyteorg/flytepropeller/pkg/controller/crdoffloadstore"
"github.com/flyteorg/flytepropeller/pkg/controller/executors"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes"
errors3 "github.com/flyteorg/flytepropeller/pkg/controller/nodes/errors"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/recovery"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/subworkflow/launchplan"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/catalog"
"github.com/flyteorg/flytepropeller/pkg/controller/crdoffloadstore"
"github.com/flyteorg/flytepropeller/pkg/controller/workflow"
"github.com/flyteorg/flytepropeller/pkg/controller/workflowstore"
leader "github.com/flyteorg/flytepropeller/pkg/leaderelection"
Expand Down Expand Up @@ -429,7 +429,7 @@ func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Inter
return nil, stdErrs.Wrapf(errors3.CausedByError, err, "failed to initialize workflow store")
}

crdOffloadStore, err := crdoffloadstore.NewCRDOffloadStore(ctx, crdoffloadstore.GetConfig(), store)
crdOffloadStore, err := crdoffloadstore.NewCRDOffloadStore(ctx, crdoffloadstore.GetConfig(), store, scope.NewSubScope("crdoffload"))
if err != nil {
return nil, stdErrs.Wrapf(errors3.CausedByError, err, "failed to initialize CRD offload store")
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/crdoffloadstore/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
type Policy = string

const (
PolicyInMemory = "InMemory" // TODO - we need to change this name
PolicyLRU = "LRU"
PolicyInMemory = "InMemory" // TODO - we need to change this name
PolicyLRU = "LRU"
PolicyPassThrough = "PassThrough"
)

Expand Down
8 changes: 5 additions & 3 deletions pkg/controller/crdoffloadstore/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"fmt"

"github.com/flyteorg/flytestdlib/promutils"

"github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"

"github.com/flyteorg/flytestdlib/storage"
Expand All @@ -18,12 +20,12 @@ type CRDOffloadStore interface {
Remove(ctx context.Context, dataReference v1alpha1.DataReference) error
}

func NewCRDOffloadStore(ctx context.Context, cfg *Config, dataStore *storage.DataStore) (CRDOffloadStore, error) {
func NewCRDOffloadStore(ctx context.Context, cfg *Config, dataStore *storage.DataStore, scope promutils.Scope) (CRDOffloadStore, error) {
switch cfg.Policy {
case PolicyInMemory:
return NewInmemoryCRDOffloadStore(NewPassthroughCRDOffloadStore(dataStore)), nil
return NewInmemoryCRDOffloadStore(NewPassthroughCRDOffloadStore(dataStore), scope), nil
case PolicyLRU:
return NewLRUCRDOffloadStore(NewPassthroughCRDOffloadStore(dataStore), cfg.Size)
return NewLRUCRDOffloadStore(NewPassthroughCRDOffloadStore(dataStore), cfg.Size, scope)
case PolicyPassThrough:
return NewPassthroughCRDOffloadStore(dataStore), nil
}
Expand Down
32 changes: 30 additions & 2 deletions pkg/controller/crdoffloadstore/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,43 @@ package crdoffloadstore

import (
"context"
"time"

"github.com/flyteorg/flytestdlib/promutils"
"github.com/prometheus/client_golang/prometheus"

"github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
)

type inmemoryCRDOffloadMetrics struct {
TotalItems prometheus.Gauge
CacheHit prometheus.Counter
CacheMiss prometheus.Counter
ReadError prometheus.Counter
FetchLatency promutils.StopWatch
}
type inmemoryCRDOffloadStore struct {
crdOffloadStore CRDOffloadStore
store map[string]*v1alpha1.StaticWorkflowData
store map[string]*v1alpha1.StaticWorkflowData
metrics *inmemoryCRDOffloadMetrics
}

func (i *inmemoryCRDOffloadStore) Get(ctx context.Context, dataReference v1alpha1.DataReference) (*v1alpha1.StaticWorkflowData, error) {
location := dataReference.String()
if m, ok := i.store[location]; ok {
i.metrics.CacheHit.Inc()
return m, nil
}

timer := i.metrics.FetchLatency.Start()
staticWorkflowData, err := i.crdOffloadStore.Get(ctx, dataReference)
timer.Stop()
if err != nil {
i.metrics.ReadError.Inc()
return nil, err
}

i.metrics.TotalItems.Inc()
i.store[location] = staticWorkflowData
return staticWorkflowData, nil
}
Expand All @@ -31,13 +48,24 @@ func (i *inmemoryCRDOffloadStore) Remove(ctx context.Context, dataReference v1al
return err
}

i.metrics.TotalItems.Dec()
delete(i.store, dataReference.String())

return nil
}

func NewInmemoryCRDOffloadStore(crdOffloadStore CRDOffloadStore) CRDOffloadStore {
func NewInmemoryCRDOffloadStore(crdOffloadStore CRDOffloadStore, scope promutils.Scope) CRDOffloadStore {
inmemoryScope := scope.NewSubScope("inmemory")
metrics := &inmemoryCRDOffloadMetrics{
TotalItems: inmemoryScope.MustNewGauge("total_items", "Total Items in cache"),
FetchLatency: inmemoryScope.MustNewStopWatch("fetch", "Total Time to read from underlying datastorage", time.Millisecond),
CacheHit: inmemoryScope.MustNewCounter("cache_hit", "Number of times object was found in inmemory cache"),
CacheMiss: inmemoryScope.MustNewCounter("cache_miss", "Number of times object was not found in inmemory cache"),
ReadError: inmemoryScope.MustNewCounter("cache_read_error", "Failed to read from underlying storage"),
}
return &inmemoryCRDOffloadStore{
crdOffloadStore: crdOffloadStore,
store: map[string]*v1alpha1.StaticWorkflowData{},
metrics: metrics,
}
}
32 changes: 28 additions & 4 deletions pkg/controller/crdoffloadstore/lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,27 @@ package crdoffloadstore
import (
"context"
"fmt"
"time"

"github.com/flyteorg/flytestdlib/promutils"
"github.com/prometheus/client_golang/prometheus"

"github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"

"github.com/hashicorp/golang-lru"
lru "github.com/hashicorp/golang-lru"
)

type crdoffloadstoreMetrics struct {
CacheHit prometheus.Counter
CacheMiss prometheus.Counter
CacheReadError prometheus.Counter
FetchLatency promutils.StopWatch
}

type lruCRDOffloadStore struct {
cache *lru.Cache
crdOffloadStore CRDOffloadStore
metrics *crdoffloadstoreMetrics
}

func (l *lruCRDOffloadStore) Get(ctx context.Context, dataReference v1alpha1.DataReference) (*v1alpha1.StaticWorkflowData, error) {
Expand All @@ -20,18 +32,22 @@ func (l *lruCRDOffloadStore) Get(ctx context.Context, dataReference v1alpha1.Dat
if ok {
staticWorkflowData, ok := s.(*v1alpha1.StaticWorkflowData)
if !ok {
l.metrics.CacheReadError.Inc()
return nil, fmt.Errorf("cached item in crd offload store is not expected type '*v1alpha1.StaticWorkflowData'")
}

l.metrics.CacheHit.Inc()
return staticWorkflowData, nil
}

timer := l.metrics.FetchLatency.Start()
// retrieve StaticWorkflowData from underlying CRDOffloadStore
staticWorkflowData, err := l.crdOffloadStore.Get(ctx, dataReference)
timer.Stop()
if err != nil {
return nil, err
}

l.metrics.CacheMiss.Inc()
// add StaticWorkflowData to cache and return
l.cache.Add(dataReference, staticWorkflowData)
return staticWorkflowData, nil
Expand All @@ -47,14 +63,22 @@ func (l *lruCRDOffloadStore) Remove(ctx context.Context, dataReference v1alpha1.
return nil
}

func NewLRUCRDOffloadStore(crdOffloadStore CRDOffloadStore, size int) (CRDOffloadStore, error) {
func NewLRUCRDOffloadStore(crdOffloadStore CRDOffloadStore, size int, scope promutils.Scope) (CRDOffloadStore, error) {
cache, err := lru.New(size)
if err != nil {
return nil, err
}

return &lruCRDOffloadStore {
lruScope := scope.NewSubScope("lru")
metrics := &crdoffloadstoreMetrics{
FetchLatency: lruScope.MustNewStopWatch("fetch", "Total Time to read from underlying datastorage", time.Millisecond),
CacheHit: lruScope.MustNewCounter("cache_hit", "Number of times object was found in lru cache"),
CacheMiss: lruScope.MustNewCounter("cache_miss", "Number of times object was not found in lru cache"),
CacheReadError: lruScope.MustNewCounter("cache_read_error", "Failed to read from lru cache"),
}
return &lruCRDOffloadStore{
cache: cache,
crdOffloadStore: crdOffloadStore,
metrics: metrics,
}, nil
}
1 change: 0 additions & 1 deletion pkg/controller/crdoffloadstore/passthrough.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"encoding/json"

"github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"

"github.com/flyteorg/flytestdlib/storage"
)

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
eventsErr "github.com/flyteorg/flytepropeller/events/errors"
"github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/flyteorg/flytepropeller/pkg/controller/config"
"github.com/flyteorg/flytepropeller/pkg/controller/executors"
"github.com/flyteorg/flytepropeller/pkg/controller/crdoffloadstore"
"github.com/flyteorg/flytepropeller/pkg/controller/executors"
"github.com/flyteorg/flytepropeller/pkg/controller/workflowstore"

"github.com/flyteorg/flytestdlib/contextutils"
Expand Down
11 changes: 5 additions & 6 deletions pkg/controller/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ import (
"testing"
"time"

"github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/static"
staticobjmock "github.com/flyteorg/flytepropeller/pkg/controller/staticobjstore/mocks"
crdoffloadstoremock "github.com/flyteorg/flytepropeller/pkg/controller/crdoffloadstore/mocks"

"github.com/flyteorg/flytepropeller/pkg/controller/workflowstore/mocks"
"github.com/pkg/errors"
Expand Down Expand Up @@ -834,8 +833,8 @@ func TestPropellerHandler_OffloadedCrd(t *testing.T) {
MaxWorkflowRetries: 0,
}

staticObjMock := &staticobjmock.WorkflowStaticObjectStore{}
p := NewPropellerHandler(ctx, cfg, s, staticObjMock, exec, scope)
offloadmock := &crdoffloadstoremock.CRDOffloadStore{}
p := NewPropellerHandler(ctx, cfg, s, offloadmock, exec, scope)

const namespace = "test"
const name = "123"
Expand All @@ -846,14 +845,14 @@ func TestPropellerHandler_OffloadedCrd(t *testing.T) {
Name: name,
Namespace: namespace,
},
WorkflowStaticExecutionObj: "some-file-location",
OffloadDataReference: "some-file-location",
}))
exec.HandleCb = func(ctx context.Context, w *v1alpha1.FlyteWorkflow) error {
w.GetExecutionStatus().UpdatePhase(v1alpha1.WorkflowPhaseSucceeding, "done", nil)
return nil
}

staticObjMock.OnGetMatch(mock.Anything, mock.Anything).Return(&static.WorkflowStaticExecutionObj{
offloadmock.OnGetMatch(mock.Anything, mock.Anything).Return(&v1alpha1.StaticWorkflowData{
WorkflowSpec: &v1alpha1.WorkflowSpec{ID: "static-id"},
SubWorkflows: nil,
Tasks: nil,
Expand Down

0 comments on commit f877e36

Please sign in to comment.