From 656c61063e4b4158a4f8b295622efc6cac84ac28 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Tue, 16 Jan 2024 19:33:58 -0600 Subject: [PATCH 1/5] replaced kube executors Signed-off-by: Daniel Rammer --- flytepropeller/cmd/controller/cmd/root.go | 1 + .../pkg/controller/executors/kube.go | 60 ++++++++++++++++++- flytestdlib/otelutils/k8s.go | 22 ++++++- 3 files changed, 78 insertions(+), 5 deletions(-) diff --git a/flytepropeller/cmd/controller/cmd/root.go b/flytepropeller/cmd/controller/cmd/root.go index a3db18833c..0cb5aabd62 100644 --- a/flytepropeller/cmd/controller/cmd/root.go +++ b/flytepropeller/cmd/controller/cmd/root.go @@ -154,6 +154,7 @@ func executeRootCmd(baseCtx context.Context, cfg *config2.Config) error { }, NewClient: func(config *rest.Config, options client.Options) (client.Client, error) { k8sClient, err := client.New(config, options) + //k8sClient, err := executors.NewFallbackClientBuilder(propellerScope.NewSubScope("kube")).Build(nil, config, options) if err != nil { return k8sClient, err } diff --git a/flytepropeller/pkg/controller/executors/kube.go b/flytepropeller/pkg/controller/executors/kube.go index acd4f5c4f3..7dc62135b6 100644 --- a/flytepropeller/pkg/controller/executors/kube.go +++ b/flytepropeller/pkg/controller/executors/kube.go @@ -23,18 +23,72 @@ type Client interface { GetCache() cache.Cache } +// fallbackClientReader reads from the cache first and if not found then reads from the configured reader, which +// directly reads from the API +type fallbackClientReader struct { + orderedClients []client.Reader +} + +func (c fallbackClientReader) Get(ctx context.Context, key client.ObjectKey, out client.Object) (err error) { + for _, k8sClient := range c.orderedClients { + if err = k8sClient.Get(ctx, key, out); err == nil { + return nil + } + } + + return +} + +func (c fallbackClientReader) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) (err error) { + for _, k8sClient := range c.orderedClients { + if err = k8sClient.List(ctx, list, opts...); err == nil { + return nil + } + } + + return +} + // ClientBuilder builder is the interface for the client builder. type ClientBuilder interface { + // WithUncached takes a list of runtime objects (plain or lists) that users don't want to cache + // for this client. This function can be called multiple times, it should append to an internal slice. + WithUncached(objs ...client.Object) ClientBuilder + // Build returns a new client. Build(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) } type FallbackClientBuilder struct { - scope promutils.Scope + uncached []client.Object + scope promutils.Scope +} + +func (f *FallbackClientBuilder) WithUncached(objs ...client.Object) ClientBuilder { + f.uncached = append(f.uncached, objs...) + return f } -func (f *FallbackClientBuilder) Build(_ cache.Cache, config *rest.Config, options client.Options) (client.Client, error) { - return client.New(config, options) +func (f FallbackClientBuilder) Build(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) { + c, err := client.New(config, options) + if err != nil { + return nil, err + } + + c, err = newWriteThroughCachingWriter(c, 50000, f.scope) + if err != nil { + return nil, err + } + + return client.NewDelegatingClient(client.NewDelegatingClientInput{ + Client: c, + CacheReader: fallbackClientReader{ + orderedClients: []client.Reader{cache, c}, + }, + UncachedObjects: f.uncached, + // TODO figure out if this should be true? + // CacheUnstructured: true, + }) } // NewFallbackClientBuilder Creates a new k8s client that uses the cached client for reads and falls back to making API diff --git a/flytestdlib/otelutils/k8s.go b/flytestdlib/otelutils/k8s.go index 13afee9308..bed120d30b 100644 --- a/flytestdlib/otelutils/k8s.go +++ b/flytestdlib/otelutils/k8s.go @@ -3,7 +3,10 @@ package otelutils import ( "context" "fmt" + "time" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime/schema" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -21,7 +24,15 @@ func WrapK8sCache(c cache.Cache) cache.Cache { func (c *K8sCacheWrapper) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error { ctx, span := NewSpan(ctx, K8sClientTracer, fmt.Sprintf("%s.Cache/Get", k8sSpanPathPrefix)) defer span.End() - return c.Cache.Get(ctx, key, obj, opts...) + + err := c.Cache.Get(ctx, key, obj, opts...) + if time.Since(obj.GetCreationTimestamp().Time) < (time.Second * 3) { + obj = nil + return k8serrors.NewNotFound(schema.GroupResource{Group: "foo", Resource: "bar"}, key.Name) + } + + return err + //return c.Cache.Get(ctx, key, obj, opts...) } func (c *K8sCacheWrapper) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error { @@ -43,7 +54,14 @@ func WrapK8sClient(c client.Client) client.Client { func (c *K8sClientWrapper) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error { ctx, span := NewSpan(ctx, K8sClientTracer, fmt.Sprintf("%s.Client/Get", k8sSpanPathPrefix)) defer span.End() - return c.Client.Get(ctx, key, obj, opts...) + err := c.Client.Get(ctx, key, obj, opts...) + if time.Since(obj.GetCreationTimestamp().Time) < (time.Second * 3) { + obj = nil + return k8serrors.NewNotFound(schema.GroupResource{Group: "foo", Resource: "bar"}, key.Name) + } + + return err + //return c.Client.Get(ctx, key, obj, opts...) } func (c *K8sClientWrapper) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error { From f2b1171cbd85ba427d232b4e512a1840cc72cb0e Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Tue, 16 Jan 2024 22:07:39 -0600 Subject: [PATCH 2/5] creating a delegating client reader setup to emulate previous Signed-off-by: Daniel Rammer --- cmd/single/start.go | 21 +---- flytepropeller/cmd/controller/cmd/root.go | 22 +---- flytepropeller/cmd/controller/cmd/webhook.go | 7 +- .../pkg/controller/executors/kube.go | 93 ++++++++----------- 4 files changed, 49 insertions(+), 94 deletions(-) diff --git a/cmd/single/start.go b/cmd/single/start.go index 3ad8038cd6..a786c0b7e4 100644 --- a/cmd/single/start.go +++ b/cmd/single/start.go @@ -18,6 +18,7 @@ import ( adminScheduler "github.com/flyteorg/flyte/flyteadmin/scheduler" propellerEntrypoint "github.com/flyteorg/flyte/flytepropeller/pkg/controller" propellerConfig "github.com/flyteorg/flyte/flytepropeller/pkg/controller/config" + "github.com/flyteorg/flyte/flytepropeller/pkg/controller/executors" "github.com/flyteorg/flyte/flytepropeller/pkg/signals" webhookEntrypoint "github.com/flyteorg/flyte/flytepropeller/pkg/webhook" webhookConfig "github.com/flyteorg/flyte/flytepropeller/pkg/webhook/config" @@ -33,9 +34,7 @@ import ( "github.com/spf13/cobra" "golang.org/x/sync/errgroup" _ "gorm.io/driver/postgres" // Required to import database driver. - "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/cache" - "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/metrics" ) @@ -122,22 +121,8 @@ func startPropeller(ctx context.Context, cfg Propeller) error { SyncPeriod: &propellerCfg.DownstreamEval.Duration, DefaultNamespaces: namespaceConfigs, }, - NewCache: func(config *rest.Config, options cache.Options) (cache.Cache, error) { - k8sCache, err := cache.New(config, options) - if err != nil { - return k8sCache, err - } - - return otelutils.WrapK8sCache(k8sCache), nil - }, - NewClient: func(config *rest.Config, options client.Options) (client.Client, error) { - k8sClient, err := client.New(config, options) - if err != nil { - return k8sClient, err - } - - return otelutils.WrapK8sClient(k8sClient), nil - }, + NewCache: executors.NewCache, + NewClient: executors.NewClient, Metrics: metricsserver.Options{ // Disable metrics serving BindAddress: "0", diff --git a/flytepropeller/cmd/controller/cmd/root.go b/flytepropeller/cmd/controller/cmd/root.go index 0cb5aabd62..8f4a96a7a6 100644 --- a/flytepropeller/cmd/controller/cmd/root.go +++ b/flytepropeller/cmd/controller/cmd/root.go @@ -12,16 +12,15 @@ import ( "github.com/spf13/cobra" "github.com/spf13/pflag" "golang.org/x/sync/errgroup" - "k8s.io/client-go/rest" "k8s.io/klog" "sigs.k8s.io/controller-runtime/pkg/cache" - "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/metrics" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" "github.com/flyteorg/flyte/flytepropeller/pkg/controller" config2 "github.com/flyteorg/flyte/flytepropeller/pkg/controller/config" + "github.com/flyteorg/flyte/flytepropeller/pkg/controller/executors" "github.com/flyteorg/flyte/flytepropeller/pkg/signals" "github.com/flyteorg/flyte/flytestdlib/config" "github.com/flyteorg/flyte/flytestdlib/config/viper" @@ -144,23 +143,8 @@ func executeRootCmd(baseCtx context.Context, cfg *config2.Config) error { SyncPeriod: &cfg.DownstreamEval.Duration, DefaultNamespaces: namespaceConfigs, }, - NewCache: func(config *rest.Config, options cache.Options) (cache.Cache, error) { - k8sCache, err := cache.New(config, options) - if err != nil { - return k8sCache, err - } - - return otelutils.WrapK8sCache(k8sCache), nil - }, - NewClient: func(config *rest.Config, options client.Options) (client.Client, error) { - k8sClient, err := client.New(config, options) - //k8sClient, err := executors.NewFallbackClientBuilder(propellerScope.NewSubScope("kube")).Build(nil, config, options) - if err != nil { - return k8sClient, err - } - - return otelutils.WrapK8sClient(k8sClient), nil - }, + NewCache: executors.NewCache, + NewClient: executors.NewClient, Metrics: metricsserver.Options{ // Disable metrics serving BindAddress: "0", diff --git a/flytepropeller/cmd/controller/cmd/webhook.go b/flytepropeller/cmd/controller/cmd/webhook.go index f34f21d12c..f43641dd59 100644 --- a/flytepropeller/cmd/controller/cmd/webhook.go +++ b/flytepropeller/cmd/controller/cmd/webhook.go @@ -5,9 +5,7 @@ import ( "github.com/spf13/cobra" "golang.org/x/sync/errgroup" - "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/cache" - "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/manager" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" ctrlWebhook "sigs.k8s.io/controller-runtime/pkg/webhook" @@ -110,9 +108,8 @@ func runWebhook(origContext context.Context, propellerCfg *config.Config, cfg *w SyncPeriod: &propellerCfg.DownstreamEval.Duration, DefaultNamespaces: namespaceConfigs, }, - NewClient: func(config *rest.Config, options client.Options) (client.Client, error) { - return executors.NewFallbackClientBuilder(webhookScope).Build(nil, config, options) - }, + NewCache: executors.NewCache, + NewClient: executors.NewClient, Metrics: metricsserver.Options{ // Disable metrics serving BindAddress: "0", diff --git a/flytepropeller/pkg/controller/executors/kube.go b/flytepropeller/pkg/controller/executors/kube.go index 7dc62135b6..e7ab86e921 100644 --- a/flytepropeller/pkg/controller/executors/kube.go +++ b/flytepropeller/pkg/controller/executors/kube.go @@ -9,6 +9,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "github.com/flyteorg/flyte/flytestdlib/fastcheck" + "github.com/flyteorg/flyte/flytestdlib/otelutils" "github.com/flyteorg/flyte/flytestdlib/promutils" ) @@ -23,15 +24,53 @@ type Client interface { GetCache() cache.Cache } +var NewCache = func(config *rest.Config, options cache.Options) (cache.Cache, error) { + k8sCache, err := cache.New(config, options) + if err != nil { + return k8sCache, err + } + + return otelutils.WrapK8sCache(k8sCache), nil +} + +var NewClient = func(config *rest.Config, options client.Options) (client.Client, error) { + var reader *fallbackClientReader + if options.Cache != nil && options.Cache.Reader != nil { + // if caching is enabled we create a fallback reader so we can attempt the client if the cache + // reader does not have the object + reader = &fallbackClientReader{ + orderedClients: []client.Reader{options.Cache.Reader}, + } + + options.Cache.Reader = reader + } + + // create the k8s client + k8sClient, err := client.New(config, options) + if err != nil { + return k8sClient, err + } + + // TODO - should we wrap this in a writeThroughCachingWriter as well? + k8sOtelClient := otelutils.WrapK8sClient(k8sClient) + if reader != nil { + // once the k8s client is created we set the fallback reader's client to the k8s client + reader.orderedClients = append([]client.Reader{k8sOtelClient}, reader.orderedClients...) + } + + return k8sOtelClient, nil +} + // fallbackClientReader reads from the cache first and if not found then reads from the configured reader, which // directly reads from the API type fallbackClientReader struct { orderedClients []client.Reader } -func (c fallbackClientReader) Get(ctx context.Context, key client.ObjectKey, out client.Object) (err error) { + +func (c fallbackClientReader) Get(ctx context.Context, key client.ObjectKey, out client.Object, opts ...client.GetOption) (err error) { for _, k8sClient := range c.orderedClients { - if err = k8sClient.Get(ctx, key, out); err == nil { + if err = k8sClient.Get(ctx, key, out, opts...); err == nil { return nil } } @@ -49,56 +88,6 @@ func (c fallbackClientReader) List(ctx context.Context, list client.ObjectList, return } -// ClientBuilder builder is the interface for the client builder. -type ClientBuilder interface { - // WithUncached takes a list of runtime objects (plain or lists) that users don't want to cache - // for this client. This function can be called multiple times, it should append to an internal slice. - WithUncached(objs ...client.Object) ClientBuilder - - // Build returns a new client. - Build(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) -} - -type FallbackClientBuilder struct { - uncached []client.Object - scope promutils.Scope -} - -func (f *FallbackClientBuilder) WithUncached(objs ...client.Object) ClientBuilder { - f.uncached = append(f.uncached, objs...) - return f -} - -func (f FallbackClientBuilder) Build(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) { - c, err := client.New(config, options) - if err != nil { - return nil, err - } - - c, err = newWriteThroughCachingWriter(c, 50000, f.scope) - if err != nil { - return nil, err - } - - return client.NewDelegatingClient(client.NewDelegatingClientInput{ - Client: c, - CacheReader: fallbackClientReader{ - orderedClients: []client.Reader{cache, c}, - }, - UncachedObjects: f.uncached, - // TODO figure out if this should be true? - // CacheUnstructured: true, - }) -} - -// NewFallbackClientBuilder Creates a new k8s client that uses the cached client for reads and falls back to making API -// calls if it failed. Write calls will always go to raw client directly. -func NewFallbackClientBuilder(scope promutils.Scope) *FallbackClientBuilder { - return &FallbackClientBuilder{ - scope: scope, - } -} - type writeThroughCachingWriter struct { client.Client filter fastcheck.Filter From 674cdc009f3da0162bb612787aef71702e26f66c Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Tue, 16 Jan 2024 22:12:42 -0600 Subject: [PATCH 3/5] fix unit tests and lint Signed-off-by: Daniel Rammer --- flytepropeller/cmd/controller/cmd/root.go | 2 +- flytepropeller/cmd/controller/cmd/webhook.go | 2 +- .../pkg/controller/executors/kube.go | 1 - .../executors/mocks/client_builder.go | 100 ------------------ 4 files changed, 2 insertions(+), 103 deletions(-) delete mode 100644 flytepropeller/pkg/controller/executors/mocks/client_builder.go diff --git a/flytepropeller/cmd/controller/cmd/root.go b/flytepropeller/cmd/controller/cmd/root.go index 8f4a96a7a6..8696f3993a 100644 --- a/flytepropeller/cmd/controller/cmd/root.go +++ b/flytepropeller/cmd/controller/cmd/root.go @@ -143,7 +143,7 @@ func executeRootCmd(baseCtx context.Context, cfg *config2.Config) error { SyncPeriod: &cfg.DownstreamEval.Duration, DefaultNamespaces: namespaceConfigs, }, - NewCache: executors.NewCache, + NewCache: executors.NewCache, NewClient: executors.NewClient, Metrics: metricsserver.Options{ // Disable metrics serving diff --git a/flytepropeller/cmd/controller/cmd/webhook.go b/flytepropeller/cmd/controller/cmd/webhook.go index f43641dd59..e3c29ae3d9 100644 --- a/flytepropeller/cmd/controller/cmd/webhook.go +++ b/flytepropeller/cmd/controller/cmd/webhook.go @@ -108,7 +108,7 @@ func runWebhook(origContext context.Context, propellerCfg *config.Config, cfg *w SyncPeriod: &propellerCfg.DownstreamEval.Duration, DefaultNamespaces: namespaceConfigs, }, - NewCache: executors.NewCache, + NewCache: executors.NewCache, NewClient: executors.NewClient, Metrics: metricsserver.Options{ // Disable metrics serving diff --git a/flytepropeller/pkg/controller/executors/kube.go b/flytepropeller/pkg/controller/executors/kube.go index e7ab86e921..c2ea3139cd 100644 --- a/flytepropeller/pkg/controller/executors/kube.go +++ b/flytepropeller/pkg/controller/executors/kube.go @@ -67,7 +67,6 @@ type fallbackClientReader struct { orderedClients []client.Reader } - func (c fallbackClientReader) Get(ctx context.Context, key client.ObjectKey, out client.Object, opts ...client.GetOption) (err error) { for _, k8sClient := range c.orderedClients { if err = k8sClient.Get(ctx, key, out, opts...); err == nil { diff --git a/flytepropeller/pkg/controller/executors/mocks/client_builder.go b/flytepropeller/pkg/controller/executors/mocks/client_builder.go deleted file mode 100644 index 3180f480fd..0000000000 --- a/flytepropeller/pkg/controller/executors/mocks/client_builder.go +++ /dev/null @@ -1,100 +0,0 @@ -// Code generated by mockery v1.0.1. DO NOT EDIT. - -package mocks - -import ( - cache "sigs.k8s.io/controller-runtime/pkg/cache" - client "sigs.k8s.io/controller-runtime/pkg/client" - - executors "github.com/flyteorg/flyte/flytepropeller/pkg/controller/executors" - - mock "github.com/stretchr/testify/mock" - - rest "k8s.io/client-go/rest" -) - -// ClientBuilder is an autogenerated mock type for the ClientBuilder type -type ClientBuilder struct { - mock.Mock -} - -type ClientBuilder_Build struct { - *mock.Call -} - -func (_m ClientBuilder_Build) Return(_a0 client.Client, _a1 error) *ClientBuilder_Build { - return &ClientBuilder_Build{Call: _m.Call.Return(_a0, _a1)} -} - -func (_m *ClientBuilder) OnBuild(_a0 cache.Cache, config *rest.Config, options client.Options) *ClientBuilder_Build { - c_call := _m.On("Build", _a0, config, options) - return &ClientBuilder_Build{Call: c_call} -} - -func (_m *ClientBuilder) OnBuildMatch(matchers ...interface{}) *ClientBuilder_Build { - c_call := _m.On("Build", matchers...) - return &ClientBuilder_Build{Call: c_call} -} - -// Build provides a mock function with given fields: _a0, config, options -func (_m *ClientBuilder) Build(_a0 cache.Cache, config *rest.Config, options client.Options) (client.Client, error) { - ret := _m.Called(_a0, config, options) - - var r0 client.Client - if rf, ok := ret.Get(0).(func(cache.Cache, *rest.Config, client.Options) client.Client); ok { - r0 = rf(_a0, config, options) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(client.Client) - } - } - - var r1 error - if rf, ok := ret.Get(1).(func(cache.Cache, *rest.Config, client.Options) error); ok { - r1 = rf(_a0, config, options) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -type ClientBuilder_WithUncached struct { - *mock.Call -} - -func (_m ClientBuilder_WithUncached) Return(_a0 executors.ClientBuilder) *ClientBuilder_WithUncached { - return &ClientBuilder_WithUncached{Call: _m.Call.Return(_a0)} -} - -func (_m *ClientBuilder) OnWithUncached(objs ...client.Object) *ClientBuilder_WithUncached { - c_call := _m.On("WithUncached", objs) - return &ClientBuilder_WithUncached{Call: c_call} -} - -func (_m *ClientBuilder) OnWithUncachedMatch(matchers ...interface{}) *ClientBuilder_WithUncached { - c_call := _m.On("WithUncached", matchers...) - return &ClientBuilder_WithUncached{Call: c_call} -} - -// WithUncached provides a mock function with given fields: objs -func (_m *ClientBuilder) WithUncached(objs ...client.Object) executors.ClientBuilder { - _va := make([]interface{}, len(objs)) - for _i := range objs { - _va[_i] = objs[_i] - } - var _ca []interface{} - _ca = append(_ca, _va...) - ret := _m.Called(_ca...) - - var r0 executors.ClientBuilder - if rf, ok := ret.Get(0).(func(...client.Object) executors.ClientBuilder); ok { - r0 = rf(objs...) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(executors.ClientBuilder) - } - } - - return r0 -} From c359d67f80e29c471f2342ac7cfa730cc2f9a12a Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Wed, 17 Jan 2024 01:01:55 -0600 Subject: [PATCH 4/5] fixing client ordering Signed-off-by: Daniel Rammer --- flytepropeller/pkg/controller/executors/kube.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/flytepropeller/pkg/controller/executors/kube.go b/flytepropeller/pkg/controller/executors/kube.go index c2ea3139cd..bdab0d91be 100644 --- a/flytepropeller/pkg/controller/executors/kube.go +++ b/flytepropeller/pkg/controller/executors/kube.go @@ -51,11 +51,10 @@ var NewClient = func(config *rest.Config, options client.Options) (client.Client return k8sClient, err } - // TODO - should we wrap this in a writeThroughCachingWriter as well? k8sOtelClient := otelutils.WrapK8sClient(k8sClient) if reader != nil { // once the k8s client is created we set the fallback reader's client to the k8s client - reader.orderedClients = append([]client.Reader{k8sOtelClient}, reader.orderedClients...) + reader.orderedClients = append(reader.orderedClients, k8sOtelClient) } return k8sOtelClient, nil From 70853299783424993b9ddcf9afed0c224cdcdca0 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Wed, 17 Jan 2024 01:10:18 -0600 Subject: [PATCH 5/5] removed error mocking Signed-off-by: Daniel Rammer --- flytestdlib/otelutils/k8s.go | 22 ++-------------------- 1 file changed, 2 insertions(+), 20 deletions(-) diff --git a/flytestdlib/otelutils/k8s.go b/flytestdlib/otelutils/k8s.go index bed120d30b..13afee9308 100644 --- a/flytestdlib/otelutils/k8s.go +++ b/flytestdlib/otelutils/k8s.go @@ -3,10 +3,7 @@ package otelutils import ( "context" "fmt" - "time" - k8serrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/runtime/schema" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -24,15 +21,7 @@ func WrapK8sCache(c cache.Cache) cache.Cache { func (c *K8sCacheWrapper) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error { ctx, span := NewSpan(ctx, K8sClientTracer, fmt.Sprintf("%s.Cache/Get", k8sSpanPathPrefix)) defer span.End() - - err := c.Cache.Get(ctx, key, obj, opts...) - if time.Since(obj.GetCreationTimestamp().Time) < (time.Second * 3) { - obj = nil - return k8serrors.NewNotFound(schema.GroupResource{Group: "foo", Resource: "bar"}, key.Name) - } - - return err - //return c.Cache.Get(ctx, key, obj, opts...) + return c.Cache.Get(ctx, key, obj, opts...) } func (c *K8sCacheWrapper) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error { @@ -54,14 +43,7 @@ func WrapK8sClient(c client.Client) client.Client { func (c *K8sClientWrapper) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error { ctx, span := NewSpan(ctx, K8sClientTracer, fmt.Sprintf("%s.Client/Get", k8sSpanPathPrefix)) defer span.End() - err := c.Client.Get(ctx, key, obj, opts...) - if time.Since(obj.GetCreationTimestamp().Time) < (time.Second * 3) { - obj = nil - return k8serrors.NewNotFound(schema.GroupResource{Group: "foo", Resource: "bar"}, key.Name) - } - - return err - //return c.Client.Get(ctx, key, obj, opts...) + return c.Client.Get(ctx, key, obj, opts...) } func (c *K8sClientWrapper) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error {