Skip to content

Commit

Permalink
Kubeclient has a filter to prevent unnecessary calls to KubeAPI #minor (
Browse files Browse the repository at this point in the history
flyteorg#339)

* [wip] Kubeclient has a filter to prevent unnecessary calls to KubeAPI

- Filter is existence check and needs to provide zero false positives,
false negatives are ok
- Every object that was created or deleted is recorded, so that in
subsequent rounds, if needed to be created or deleted can be skipped

Signed-off-by: Ketan Umare <[email protected]>

* updated

Signed-off-by: Ketan Umare <[email protected]>

* Added a test

Signed-off-by: Ketan Umare <[email protected]>

* fixed

Signed-off-by: Ketan Umare <[email protected]>

* lint fix

Signed-off-by: Ketan Umare <[email protected]>

* updated

Signed-off-by: Ketan Umare <[email protected]>

* updated

Signed-off-by: Ketan Umare <[email protected]>

* comment updated

Signed-off-by: Ketan Umare <[email protected]>

* comment updatd

Signed-off-by: Ketan Umare <[email protected]>

* updated size

Signed-off-by: Ketan Umare <[email protected]>

* removed erroneous print

Signed-off-by: Ketan Umare <[email protected]>

* Improved error message

Signed-off-by: Ketan Umare <[email protected]>
  • Loading branch information
kumare3 authored Oct 19, 2021
1 parent 871e09c commit bc032fa
Show file tree
Hide file tree
Showing 9 changed files with 218 additions and 16 deletions.
2 changes: 1 addition & 1 deletion cmd/controller/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func executeRootCmd(cfg *config2.Config) {
mgr, err := manager.New(kubecfg, manager.Options{
Namespace: limitNamespace,
SyncPeriod: &cfg.DownstreamEval.Duration,
ClientBuilder: executors.NewFallbackClientBuilder(),
ClientBuilder: executors.NewFallbackClientBuilder(propellerScope.NewSubScope("kube")),
})
if err != nil {
logger.Fatalf(ctx, "Failed to initialize controller run-time manager. Error: %v", err)
Expand Down
5 changes: 3 additions & 2 deletions cmd/controller/cmd/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func runWebhook(origContext context.Context, propellerCfg *config.Config, cfg *w

// Add the propeller subscope because the MetricsPrefix only has "flyte:" to get uniform collection of metrics.
propellerScope := promutils.NewScope(cfg.MetricsPrefix).NewSubScope("propeller").NewSubScope(safeMetricName(propellerCfg.LimitNamespace))
webhookScope := propellerScope.NewSubScope("webhook")

go func() {
err := profutils.StartProfilingServerWithDefaultHandlers(ctx, propellerCfg.ProfilerPort.Port, nil)
Expand All @@ -125,7 +126,7 @@ func runWebhook(origContext context.Context, propellerCfg *config.Config, cfg *w
limitNamespace = propellerCfg.LimitNamespace
}

secretsWebhook := webhook.NewPodMutator(cfg, propellerScope.NewSubScope("webhook"))
secretsWebhook := webhook.NewPodMutator(cfg, webhookScope)

// Creates a MutationConfig to instruct ApiServer to call this service whenever a Pod is being created.
err = createMutationConfig(ctx, kubeClient, secretsWebhook)
Expand All @@ -138,7 +139,7 @@ func runWebhook(origContext context.Context, propellerCfg *config.Config, cfg *w
CertDir: cfg.CertDir,
Namespace: limitNamespace,
SyncPeriod: &propellerCfg.DownstreamEval.Duration,
ClientBuilder: executors.NewFallbackClientBuilder(),
ClientBuilder: executors.NewFallbackClientBuilder(webhookScope),
})

if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/fatih/color v1.10.0
github.com/flyteorg/flyteidl v0.21.4
github.com/flyteorg/flyteplugins v0.7.1
github.com/flyteorg/flytestdlib v0.3.36
github.com/flyteorg/flytestdlib v0.4.0
github.com/ghodss/yaml v1.0.0
github.com/go-redis/redis v6.15.7+incompatible
github.com/go-test/deep v1.0.7
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,9 @@ github.com/flyteorg/flyteidl v0.21.4/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/e
github.com/flyteorg/flyteplugins v0.7.1 h1:YdCEQtdPeol7u6LkopGTIfPLAhy3KcclQa+DZFauK8w=
github.com/flyteorg/flyteplugins v0.7.1/go.mod h1:kOiuXk1ddIEVSPoHcc4kBfVQcLuyf8jw3vWJT2Was90=
github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220=
github.com/flyteorg/flytestdlib v0.3.36 h1:XLvc7kfc9XkQBpPvNXevh5+Ijbgmd7gEOHTWhdEY5eA=
github.com/flyteorg/flytestdlib v0.3.36/go.mod h1:7cDWkY3v7xsoesFcDdu6DSW5Q2U2W5KlHUbUHSwBG1Q=
github.com/flyteorg/flytestdlib v0.4.0 h1:cEMkNfjocCuBSLzM9tKjsODhkr5gXTZAGl6k62FrT60=
github.com/flyteorg/flytestdlib v0.4.0/go.mod h1:7cDWkY3v7xsoesFcDdu6DSW5Q2U2W5KlHUbUHSwBG1Q=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4=
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/events/node_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ import (

//go:generate mockery -all -output=mocks -case=underscore

// Recorder for Node events
// NodeEventRecorder records Node events
type NodeEventRecorder interface {
// Records node execution events indicating the node has undergone a phase change and additional metadata.
// RecordNodeEvent records execution events indicating the node has undergone a phase change and additional metadata.
RecordNodeEvent(ctx context.Context, event *event.NodeExecutionEvent, eventConfig *config.EventConfig) error
}

Expand Down
74 changes: 70 additions & 4 deletions pkg/controller/executors/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ package executors

import (
"context"
"fmt"

"github.com/flyteorg/flytestdlib/fastcheck"
"github.com/flyteorg/flytestdlib/promutils"

"k8s.io/client-go/rest"

Expand All @@ -11,7 +15,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

// A friendly controller-runtime client that gets passed to executors
// Client is a friendlier controller-runtime client that gets passed to executors
type Client interface {
// GetClient returns a client configured with the Config
GetClient() client.Client
Expand All @@ -20,6 +24,8 @@ type Client interface {
GetCache() cache.Cache
}

// fallbackClientReader reads from the cache first and if not found then reads from the configured reader, which
// directly reads from the API
type fallbackClientReader struct {
orderedClients []client.Reader
}
Expand All @@ -46,6 +52,7 @@ func (c fallbackClientReader) List(ctx context.Context, list client.ObjectList,

type FallbackClientBuilder struct {
uncached []client.Object
scope promutils.Scope
}

func (f *FallbackClientBuilder) WithUncached(objs ...client.Object) cluster.ClientBuilder {
Expand All @@ -59,6 +66,11 @@ func (f FallbackClientBuilder) Build(cache cache.Cache, config *rest.Config, opt
return nil, err
}

c, err = newWriteThroughCachingWriter(c, 50000, f.scope)
if err != nil {
return nil, err
}

return client.NewDelegatingClient(client.NewDelegatingClientInput{
Client: c,
CacheReader: fallbackClientReader{
Expand All @@ -70,8 +82,62 @@ func (f FallbackClientBuilder) Build(cache cache.Cache, config *rest.Config, opt
})
}

// Creates a new k8s client that uses the cached client for reads and falls back to making API
// NewFallbackClientBuilder Creates a new k8s client that uses the cached client for reads and falls back to making API
// calls if it failed. Write calls will always go to raw client directly.
func NewFallbackClientBuilder() *FallbackClientBuilder {
return &FallbackClientBuilder{}
func NewFallbackClientBuilder(scope promutils.Scope) *FallbackClientBuilder {
return &FallbackClientBuilder{
scope: scope,
}
}

type writeThroughCachingWriter struct {
client.Client
filter fastcheck.Filter
}

func IDFromObject(obj client.Object, op string) []byte {
return []byte(fmt.Sprintf("%s:%s:%s:%s:%s", obj.GetClusterName(), obj.GetObjectKind().GroupVersionKind().String(), obj.GetNamespace(), obj.GetName(), op))
}

// Create first checks the local cache if the object with id was previously successfully saved, if not then
// saves the object obj in the Kubernetes cluster
func (w writeThroughCachingWriter) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error {
// "c" represents create
id := IDFromObject(obj, "c")
if w.filter.Contains(ctx, id) {
return nil
}
err := w.Client.Create(ctx, obj, opts...)
if err != nil {
return err
}
w.filter.Add(ctx, id)
return nil
}

// Delete first checks the local cache if the object with id was previously successfully deleted, if not then
// deletes the given obj from Kubernetes cluster.
func (w writeThroughCachingWriter) Delete(ctx context.Context, obj client.Object, opts ...client.DeleteOption) error {
// "d" represents delete
id := IDFromObject(obj, "d")
if w.filter.Contains(ctx, id) {
return nil
}
err := w.Client.Delete(ctx, obj, opts...)
if err != nil {
return err
}
w.filter.Add(ctx, id)
return nil
}

func newWriteThroughCachingWriter(c client.Client, cacheSize int, scope promutils.Scope) (writeThroughCachingWriter, error) {
filter, err := fastcheck.NewOppoBloomFilter(cacheSize, scope.NewSubScope("kube_filter"))
if err != nil {
return writeThroughCachingWriter{}, err
}
return writeThroughCachingWriter{
Client: c,
filter: filter,
}, nil
}
134 changes: 134 additions & 0 deletions pkg/controller/executors/kube_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package executors

import (
"context"
"fmt"
"reflect"
"testing"

"github.com/flyteorg/flytestdlib/contextutils"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/flyteorg/flytestdlib/promutils/labeled"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func TestIdFromObject(t *testing.T) {
type args struct {
ns string
name string
cluster string
kind string
op string
}
tests := []struct {
name string
args args
want string
}{
{"default", args{"default", "name", "", "pod", "c"}, ":/v1, Kind=pod:default:name:c"},
{"no-cluster", args{"my-ns", "name", "", "pod", "c"}, ":/v1, Kind=pod:my-ns:name:c"},
{"differ-oper", args{"default", "name", "", "pod", "d"}, ":/v1, Kind=pod:default:name:d"},
{"withcluster", args{"default", "name", "cluster", "pod", "d"}, "cluster:/v1, Kind=pod:default:name:d"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: tt.args.ns,
Name: tt.args.name,
ClusterName: tt.args.cluster,
},
TypeMeta: metav1.TypeMeta{
Kind: tt.args.kind,
APIVersion: "v1",
},
}
if got := IDFromObject(p, tt.args.op); !reflect.DeepEqual(got, []byte(tt.want)) {
t.Errorf("IDFromObject() = %s, want %s", string(got), tt.want)
}
})
}
}

type singleInvokeClient struct {
client.Client
createCalled bool
deleteCalled bool
}

func (f *singleInvokeClient) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error {
if f.createCalled {
return fmt.Errorf("create called more than once")
}
f.createCalled = true
return nil
}

func (f *singleInvokeClient) Delete(ctx context.Context, obj client.Object, opts ...client.DeleteOption) error {
if f.deleteCalled {
return fmt.Errorf("delete called more than once")
}
f.deleteCalled = true
return nil
}

func TestWriteThroughCachingWriter_Create(t *testing.T) {
ctx := context.TODO()
c := &singleInvokeClient{}
w, err := newWriteThroughCachingWriter(c, 1000, promutils.NewTestScope())
assert.NoError(t, err)

p := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "name",
ClusterName: "cluster",
},
TypeMeta: metav1.TypeMeta{
Kind: "pod",
APIVersion: "v1",
},
}

err = w.Create(ctx, p)
assert.NoError(t, err)

assert.True(t, c.createCalled)

err = w.Create(ctx, p)
assert.NoError(t, err)
}

func TestWriteThroughCachingWriter_Delete(t *testing.T) {
ctx := context.TODO()
c := &singleInvokeClient{}
w, err := newWriteThroughCachingWriter(c, 1000, promutils.NewTestScope())
assert.NoError(t, err)

p := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "name",
ClusterName: "cluster",
},
TypeMeta: metav1.TypeMeta{
Kind: "pod",
APIVersion: "v1",
},
}

err = w.Delete(ctx, p)
assert.NoError(t, err)

assert.True(t, c.deleteCalled)

err = w.Delete(ctx, p)
assert.NoError(t, err)
}

func init() {
labeled.SetMetricKeys(contextutils.ExecIDKey)
}
4 changes: 2 additions & 2 deletions pkg/controller/executors/mocks/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

func NewFakeKubeClient() *Client {
c := Client{}
c.On("GetClient").Return(fake.NewClientBuilder().WithRuntimeObjects().Build())
c.On("GetCache").Return(&informertest.FakeInformers{})
c.OnGetClient().Return(fake.NewClientBuilder().WithRuntimeObjects().Build())
c.OnGetCache().Return(&informertest.FakeInformers{})
return &c
}
6 changes: 3 additions & 3 deletions pkg/controller/nodes/executor.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Core Nodes Executor implementation
// Package nodes contains the Core Nodes Executor implementation and a subpackage for every node kind
// This module implements the core Nodes executor.
// This executor is the starting point for executing any node in the workflow. Since Nodes in a workflow are composable,
// i.e., one node may contain other nodes, the Node Handler is recursive in nature.
Expand All @@ -9,7 +9,7 @@
// - Task: Arguably the most important handler as it handles all tasks. These include all plugins. The goal of the workflow is
// is to run tasks, thus every workflow will contain atleast one TaskNode (except for the case, where the workflow
// is purely a meta-workflow and can run other workflows
// - SubWorkflow: This is one of the most important handlers. It can executes Workflows that are nested inside a workflow
// - SubWorkflow: This is one of the most important handlers. It can execute Workflows that are nested inside a workflow
// - DynamicTask Handler: This is just a decorator on the Task Handler. It handles cases, in which the Task returns a futures
// file. Every Task is actually executed through the DynamicTaskHandler
// - Branch Handler: This handler is used to execute branches
Expand Down Expand Up @@ -129,7 +129,7 @@ func (c *nodeExecutor) IdempotentRecordEvent(ctx context.Context, nodeEvent *eve
return fmt.Errorf("event recording attempt of with nil node Event ID")
}

logger.Infof(ctx, "Recording event p[%+v]", nodeEvent)
logger.Infof(ctx, "Recording NodeEvent [%s] phase[%s]", nodeEvent.GetId().String(), nodeEvent.Phase.String())
err := c.nodeRecorder.RecordNodeEvent(ctx, nodeEvent, c.eventConfig)
if err != nil {
if nodeEvent.GetId().NodeId == v1alpha1.EndNodeID {
Expand Down

0 comments on commit bc032fa

Please sign in to comment.