Skip to content

Commit

Permalink
Reintroduce k8s client fallback to cache lookups (#4733)
Browse files Browse the repository at this point in the history
* replaced kube executors

Signed-off-by: Daniel Rammer <[email protected]>

* creating a delegating client reader setup to emulate previous

Signed-off-by: Daniel Rammer <[email protected]>

* fix unit tests and lint

Signed-off-by: Daniel Rammer <[email protected]>

* fixing client ordering

Signed-off-by: Daniel Rammer <[email protected]>

* removed error mocking

Signed-off-by: Daniel Rammer <[email protected]>

---------

Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw authored Jan 17, 2024
1 parent 0c8dc61 commit ada7695
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 154 deletions.
21 changes: 3 additions & 18 deletions cmd/single/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
adminScheduler "github.com/flyteorg/flyte/flyteadmin/scheduler"
propellerEntrypoint "github.com/flyteorg/flyte/flytepropeller/pkg/controller"
propellerConfig "github.com/flyteorg/flyte/flytepropeller/pkg/controller/config"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/executors"
"github.com/flyteorg/flyte/flytepropeller/pkg/signals"
webhookEntrypoint "github.com/flyteorg/flyte/flytepropeller/pkg/webhook"
webhookConfig "github.com/flyteorg/flyte/flytepropeller/pkg/webhook/config"
Expand All @@ -33,9 +34,7 @@ import (
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
_ "gorm.io/driver/postgres" // Required to import database driver.
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/metrics"
)
Expand Down Expand Up @@ -122,22 +121,8 @@ func startPropeller(ctx context.Context, cfg Propeller) error {
SyncPeriod: &propellerCfg.DownstreamEval.Duration,
DefaultNamespaces: namespaceConfigs,
},
NewCache: func(config *rest.Config, options cache.Options) (cache.Cache, error) {
k8sCache, err := cache.New(config, options)
if err != nil {
return k8sCache, err
}

return otelutils.WrapK8sCache(k8sCache), nil
},
NewClient: func(config *rest.Config, options client.Options) (client.Client, error) {
k8sClient, err := client.New(config, options)
if err != nil {
return k8sClient, err
}

return otelutils.WrapK8sClient(k8sClient), nil
},
NewCache: executors.NewCache,
NewClient: executors.NewClient,
Metrics: metricsserver.Options{
// Disable metrics serving
BindAddress: "0",
Expand Down
21 changes: 3 additions & 18 deletions flytepropeller/cmd/controller/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,15 @@ import (
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"golang.org/x/sync/errgroup"
"k8s.io/client-go/rest"
"k8s.io/klog"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/metrics"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"

"github.com/flyteorg/flyte/flytepropeller/pkg/controller"
config2 "github.com/flyteorg/flyte/flytepropeller/pkg/controller/config"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/executors"
"github.com/flyteorg/flyte/flytepropeller/pkg/signals"
"github.com/flyteorg/flyte/flytestdlib/config"
"github.com/flyteorg/flyte/flytestdlib/config/viper"
Expand Down Expand Up @@ -144,22 +143,8 @@ func executeRootCmd(baseCtx context.Context, cfg *config2.Config) error {
SyncPeriod: &cfg.DownstreamEval.Duration,
DefaultNamespaces: namespaceConfigs,
},
NewCache: func(config *rest.Config, options cache.Options) (cache.Cache, error) {
k8sCache, err := cache.New(config, options)
if err != nil {
return k8sCache, err
}

return otelutils.WrapK8sCache(k8sCache), nil
},
NewClient: func(config *rest.Config, options client.Options) (client.Client, error) {
k8sClient, err := client.New(config, options)
if err != nil {
return k8sClient, err
}

return otelutils.WrapK8sClient(k8sClient), nil
},
NewCache: executors.NewCache,
NewClient: executors.NewClient,
Metrics: metricsserver.Options{
// Disable metrics serving
BindAddress: "0",
Expand Down
7 changes: 2 additions & 5 deletions flytepropeller/cmd/controller/cmd/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ import (

"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
ctrlWebhook "sigs.k8s.io/controller-runtime/pkg/webhook"
Expand Down Expand Up @@ -110,9 +108,8 @@ func runWebhook(origContext context.Context, propellerCfg *config.Config, cfg *w
SyncPeriod: &propellerCfg.DownstreamEval.Duration,
DefaultNamespaces: namespaceConfigs,
},
NewClient: func(config *rest.Config, options client.Options) (client.Client, error) {
return executors.NewFallbackClientBuilder(webhookScope).Build(nil, config, options)
},
NewCache: executors.NewCache,
NewClient: executors.NewClient,
Metrics: metricsserver.Options{
// Disable metrics serving
BindAddress: "0",
Expand Down
67 changes: 54 additions & 13 deletions flytepropeller/pkg/controller/executors/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/flyteorg/flyte/flytestdlib/fastcheck"
"github.com/flyteorg/flyte/flytestdlib/otelutils"
"github.com/flyteorg/flyte/flytestdlib/promutils"
)

Expand All @@ -23,26 +24,66 @@ type Client interface {
GetCache() cache.Cache
}

// ClientBuilder builder is the interface for the client builder.
type ClientBuilder interface {
// Build returns a new client.
Build(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error)
var NewCache = func(config *rest.Config, options cache.Options) (cache.Cache, error) {
k8sCache, err := cache.New(config, options)
if err != nil {
return k8sCache, err
}

return otelutils.WrapK8sCache(k8sCache), nil
}

var NewClient = func(config *rest.Config, options client.Options) (client.Client, error) {
var reader *fallbackClientReader
if options.Cache != nil && options.Cache.Reader != nil {
// if caching is enabled we create a fallback reader so we can attempt the client if the cache
// reader does not have the object
reader = &fallbackClientReader{
orderedClients: []client.Reader{options.Cache.Reader},
}

options.Cache.Reader = reader
}

// create the k8s client
k8sClient, err := client.New(config, options)
if err != nil {
return k8sClient, err
}

k8sOtelClient := otelutils.WrapK8sClient(k8sClient)
if reader != nil {
// once the k8s client is created we set the fallback reader's client to the k8s client
reader.orderedClients = append(reader.orderedClients, k8sOtelClient)
}

return k8sOtelClient, nil
}

type FallbackClientBuilder struct {
scope promutils.Scope
// fallbackClientReader reads from the cache first and if not found then reads from the configured reader, which
// directly reads from the API
type fallbackClientReader struct {
orderedClients []client.Reader
}

func (f *FallbackClientBuilder) Build(_ cache.Cache, config *rest.Config, options client.Options) (client.Client, error) {
return client.New(config, options)
func (c fallbackClientReader) Get(ctx context.Context, key client.ObjectKey, out client.Object, opts ...client.GetOption) (err error) {
for _, k8sClient := range c.orderedClients {
if err = k8sClient.Get(ctx, key, out, opts...); err == nil {
return nil
}
}

return
}

// NewFallbackClientBuilder Creates a new k8s client that uses the cached client for reads and falls back to making API
// calls if it failed. Write calls will always go to raw client directly.
func NewFallbackClientBuilder(scope promutils.Scope) *FallbackClientBuilder {
return &FallbackClientBuilder{
scope: scope,
func (c fallbackClientReader) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) (err error) {
for _, k8sClient := range c.orderedClients {
if err = k8sClient.List(ctx, list, opts...); err == nil {
return nil
}
}

return
}

type writeThroughCachingWriter struct {
Expand Down
100 changes: 0 additions & 100 deletions flytepropeller/pkg/controller/executors/mocks/client_builder.go

This file was deleted.

0 comments on commit ada7695

Please sign in to comment.