diff --git a/cmd/single/start.go b/cmd/single/start.go index 3ad8038cd6..a786c0b7e4 100644 --- a/cmd/single/start.go +++ b/cmd/single/start.go @@ -18,6 +18,7 @@ import ( adminScheduler "github.com/flyteorg/flyte/flyteadmin/scheduler" propellerEntrypoint "github.com/flyteorg/flyte/flytepropeller/pkg/controller" propellerConfig "github.com/flyteorg/flyte/flytepropeller/pkg/controller/config" + "github.com/flyteorg/flyte/flytepropeller/pkg/controller/executors" "github.com/flyteorg/flyte/flytepropeller/pkg/signals" webhookEntrypoint "github.com/flyteorg/flyte/flytepropeller/pkg/webhook" webhookConfig "github.com/flyteorg/flyte/flytepropeller/pkg/webhook/config" @@ -33,9 +34,7 @@ import ( "github.com/spf13/cobra" "golang.org/x/sync/errgroup" _ "gorm.io/driver/postgres" // Required to import database driver. - "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/cache" - "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/metrics" ) @@ -122,22 +121,8 @@ func startPropeller(ctx context.Context, cfg Propeller) error { SyncPeriod: &propellerCfg.DownstreamEval.Duration, DefaultNamespaces: namespaceConfigs, }, - NewCache: func(config *rest.Config, options cache.Options) (cache.Cache, error) { - k8sCache, err := cache.New(config, options) - if err != nil { - return k8sCache, err - } - - return otelutils.WrapK8sCache(k8sCache), nil - }, - NewClient: func(config *rest.Config, options client.Options) (client.Client, error) { - k8sClient, err := client.New(config, options) - if err != nil { - return k8sClient, err - } - - return otelutils.WrapK8sClient(k8sClient), nil - }, + NewCache: executors.NewCache, + NewClient: executors.NewClient, Metrics: metricsserver.Options{ // Disable metrics serving BindAddress: "0", diff --git a/flytepropeller/cmd/controller/cmd/root.go b/flytepropeller/cmd/controller/cmd/root.go index a3db18833c..8696f3993a 100644 --- a/flytepropeller/cmd/controller/cmd/root.go +++ b/flytepropeller/cmd/controller/cmd/root.go @@ -12,16 +12,15 @@ import ( "github.com/spf13/cobra" "github.com/spf13/pflag" "golang.org/x/sync/errgroup" - "k8s.io/client-go/rest" "k8s.io/klog" "sigs.k8s.io/controller-runtime/pkg/cache" - "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/metrics" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" "github.com/flyteorg/flyte/flytepropeller/pkg/controller" config2 "github.com/flyteorg/flyte/flytepropeller/pkg/controller/config" + "github.com/flyteorg/flyte/flytepropeller/pkg/controller/executors" "github.com/flyteorg/flyte/flytepropeller/pkg/signals" "github.com/flyteorg/flyte/flytestdlib/config" "github.com/flyteorg/flyte/flytestdlib/config/viper" @@ -144,22 +143,8 @@ func executeRootCmd(baseCtx context.Context, cfg *config2.Config) error { SyncPeriod: &cfg.DownstreamEval.Duration, DefaultNamespaces: namespaceConfigs, }, - NewCache: func(config *rest.Config, options cache.Options) (cache.Cache, error) { - k8sCache, err := cache.New(config, options) - if err != nil { - return k8sCache, err - } - - return otelutils.WrapK8sCache(k8sCache), nil - }, - NewClient: func(config *rest.Config, options client.Options) (client.Client, error) { - k8sClient, err := client.New(config, options) - if err != nil { - return k8sClient, err - } - - return otelutils.WrapK8sClient(k8sClient), nil - }, + NewCache: executors.NewCache, + NewClient: executors.NewClient, Metrics: metricsserver.Options{ // Disable metrics serving BindAddress: "0", diff --git a/flytepropeller/cmd/controller/cmd/webhook.go b/flytepropeller/cmd/controller/cmd/webhook.go index f34f21d12c..e3c29ae3d9 100644 --- a/flytepropeller/cmd/controller/cmd/webhook.go +++ b/flytepropeller/cmd/controller/cmd/webhook.go @@ -5,9 +5,7 @@ import ( "github.com/spf13/cobra" "golang.org/x/sync/errgroup" - "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/cache" - "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/manager" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" ctrlWebhook "sigs.k8s.io/controller-runtime/pkg/webhook" @@ -110,9 +108,8 @@ func runWebhook(origContext context.Context, propellerCfg *config.Config, cfg *w SyncPeriod: &propellerCfg.DownstreamEval.Duration, DefaultNamespaces: namespaceConfigs, }, - NewClient: func(config *rest.Config, options client.Options) (client.Client, error) { - return executors.NewFallbackClientBuilder(webhookScope).Build(nil, config, options) - }, + NewCache: executors.NewCache, + NewClient: executors.NewClient, Metrics: metricsserver.Options{ // Disable metrics serving BindAddress: "0", diff --git a/flytepropeller/pkg/controller/executors/kube.go b/flytepropeller/pkg/controller/executors/kube.go index acd4f5c4f3..bdab0d91be 100644 --- a/flytepropeller/pkg/controller/executors/kube.go +++ b/flytepropeller/pkg/controller/executors/kube.go @@ -9,6 +9,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "github.com/flyteorg/flyte/flytestdlib/fastcheck" + "github.com/flyteorg/flyte/flytestdlib/otelutils" "github.com/flyteorg/flyte/flytestdlib/promutils" ) @@ -23,26 +24,66 @@ type Client interface { GetCache() cache.Cache } -// ClientBuilder builder is the interface for the client builder. -type ClientBuilder interface { - // Build returns a new client. - Build(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) +var NewCache = func(config *rest.Config, options cache.Options) (cache.Cache, error) { + k8sCache, err := cache.New(config, options) + if err != nil { + return k8sCache, err + } + + return otelutils.WrapK8sCache(k8sCache), nil +} + +var NewClient = func(config *rest.Config, options client.Options) (client.Client, error) { + var reader *fallbackClientReader + if options.Cache != nil && options.Cache.Reader != nil { + // if caching is enabled we create a fallback reader so we can attempt the client if the cache + // reader does not have the object + reader = &fallbackClientReader{ + orderedClients: []client.Reader{options.Cache.Reader}, + } + + options.Cache.Reader = reader + } + + // create the k8s client + k8sClient, err := client.New(config, options) + if err != nil { + return k8sClient, err + } + + k8sOtelClient := otelutils.WrapK8sClient(k8sClient) + if reader != nil { + // once the k8s client is created we set the fallback reader's client to the k8s client + reader.orderedClients = append(reader.orderedClients, k8sOtelClient) + } + + return k8sOtelClient, nil } -type FallbackClientBuilder struct { - scope promutils.Scope +// fallbackClientReader reads from the cache first and if not found then reads from the configured reader, which +// directly reads from the API +type fallbackClientReader struct { + orderedClients []client.Reader } -func (f *FallbackClientBuilder) Build(_ cache.Cache, config *rest.Config, options client.Options) (client.Client, error) { - return client.New(config, options) +func (c fallbackClientReader) Get(ctx context.Context, key client.ObjectKey, out client.Object, opts ...client.GetOption) (err error) { + for _, k8sClient := range c.orderedClients { + if err = k8sClient.Get(ctx, key, out, opts...); err == nil { + return nil + } + } + + return } -// NewFallbackClientBuilder Creates a new k8s client that uses the cached client for reads and falls back to making API -// calls if it failed. Write calls will always go to raw client directly. -func NewFallbackClientBuilder(scope promutils.Scope) *FallbackClientBuilder { - return &FallbackClientBuilder{ - scope: scope, +func (c fallbackClientReader) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) (err error) { + for _, k8sClient := range c.orderedClients { + if err = k8sClient.List(ctx, list, opts...); err == nil { + return nil + } } + + return } type writeThroughCachingWriter struct { diff --git a/flytepropeller/pkg/controller/executors/mocks/client_builder.go b/flytepropeller/pkg/controller/executors/mocks/client_builder.go deleted file mode 100644 index 3180f480fd..0000000000 --- a/flytepropeller/pkg/controller/executors/mocks/client_builder.go +++ /dev/null @@ -1,100 +0,0 @@ -// Code generated by mockery v1.0.1. DO NOT EDIT. - -package mocks - -import ( - cache "sigs.k8s.io/controller-runtime/pkg/cache" - client "sigs.k8s.io/controller-runtime/pkg/client" - - executors "github.com/flyteorg/flyte/flytepropeller/pkg/controller/executors" - - mock "github.com/stretchr/testify/mock" - - rest "k8s.io/client-go/rest" -) - -// ClientBuilder is an autogenerated mock type for the ClientBuilder type -type ClientBuilder struct { - mock.Mock -} - -type ClientBuilder_Build struct { - *mock.Call -} - -func (_m ClientBuilder_Build) Return(_a0 client.Client, _a1 error) *ClientBuilder_Build { - return &ClientBuilder_Build{Call: _m.Call.Return(_a0, _a1)} -} - -func (_m *ClientBuilder) OnBuild(_a0 cache.Cache, config *rest.Config, options client.Options) *ClientBuilder_Build { - c_call := _m.On("Build", _a0, config, options) - return &ClientBuilder_Build{Call: c_call} -} - -func (_m *ClientBuilder) OnBuildMatch(matchers ...interface{}) *ClientBuilder_Build { - c_call := _m.On("Build", matchers...) - return &ClientBuilder_Build{Call: c_call} -} - -// Build provides a mock function with given fields: _a0, config, options -func (_m *ClientBuilder) Build(_a0 cache.Cache, config *rest.Config, options client.Options) (client.Client, error) { - ret := _m.Called(_a0, config, options) - - var r0 client.Client - if rf, ok := ret.Get(0).(func(cache.Cache, *rest.Config, client.Options) client.Client); ok { - r0 = rf(_a0, config, options) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(client.Client) - } - } - - var r1 error - if rf, ok := ret.Get(1).(func(cache.Cache, *rest.Config, client.Options) error); ok { - r1 = rf(_a0, config, options) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -type ClientBuilder_WithUncached struct { - *mock.Call -} - -func (_m ClientBuilder_WithUncached) Return(_a0 executors.ClientBuilder) *ClientBuilder_WithUncached { - return &ClientBuilder_WithUncached{Call: _m.Call.Return(_a0)} -} - -func (_m *ClientBuilder) OnWithUncached(objs ...client.Object) *ClientBuilder_WithUncached { - c_call := _m.On("WithUncached", objs) - return &ClientBuilder_WithUncached{Call: c_call} -} - -func (_m *ClientBuilder) OnWithUncachedMatch(matchers ...interface{}) *ClientBuilder_WithUncached { - c_call := _m.On("WithUncached", matchers...) - return &ClientBuilder_WithUncached{Call: c_call} -} - -// WithUncached provides a mock function with given fields: objs -func (_m *ClientBuilder) WithUncached(objs ...client.Object) executors.ClientBuilder { - _va := make([]interface{}, len(objs)) - for _i := range objs { - _va[_i] = objs[_i] - } - var _ca []interface{} - _ca = append(_ca, _va...) - ret := _m.Called(_ca...) - - var r0 executors.ClientBuilder - if rf, ok := ret.Get(0).(func(...client.Object) executors.ClientBuilder); ok { - r0 = rf(objs...) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(executors.ClientBuilder) - } - } - - return r0 -}