Skip to content

Commit

Permalink
✨ Add TransformFuncByObject Option for Informer Cache (#1805)
Browse files Browse the repository at this point in the history
* add new cache.Options field to customize transform

* fixup! add new cache.Options field to customize transform

* fixup! add new cache.Options field to customize transform

flatten arrow code
  • Loading branch information
Alex Zielenski authored Apr 20, 2022
1 parent eb39b8e commit da9d35c
Show file tree
Hide file tree
Showing 5 changed files with 323 additions and 11 deletions.
34 changes: 33 additions & 1 deletion pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,18 @@ type Options struct {
// Be very careful with this, when enabled you must DeepCopy any object before mutating it,
// otherwise you will mutate the object in the cache.
UnsafeDisableDeepCopyByObject DisableDeepCopyByObject

// TransformByObject is a map from GVKs to transformer functions which
// get applied when objects of the transformation are about to be committed
// to cache.
//
// This function is called both for new objects to enter the cache,
// and for updated objects.
TransformByObject TransformByObject

// DefaultTransform is the transform used for all GVKs which do
// not have an explicit transform func set in TransformByObject
DefaultTransform toolscache.TransformFunc
}

var defaultResyncTime = 10 * time.Hour
Expand All @@ -146,7 +158,12 @@ func New(config *rest.Config, opts Options) (Cache, error) {
if err != nil {
return nil, err
}
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace, selectorsByGVK, disableDeepCopyByGVK)
transformByGVK, err := convertToTransformByKindAndGVK(opts.TransformByObject, opts.DefaultTransform, opts.Scheme)
if err != nil {
return nil, err
}

im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace, selectorsByGVK, disableDeepCopyByGVK, transformByGVK)
return &informerCache{InformersMap: im}, nil
}

Expand Down Expand Up @@ -241,3 +258,18 @@ func convertToDisableDeepCopyByGVK(disableDeepCopyByObject DisableDeepCopyByObje
}
return disableDeepCopyByGVK, nil
}

// TransformByObject associate a client.Object's GVK to a transformer function
// to be applied when storing the object into the cache.
type TransformByObject map[client.Object]toolscache.TransformFunc

func convertToTransformByKindAndGVK(t TransformByObject, defaultTransform toolscache.TransformFunc, scheme *runtime.Scheme) (internal.TransformFuncByObject, error) {
result := internal.NewTransformFuncByObject()
for obj, transformation := range t {
if err := result.Set(obj, scheme, transformation); err != nil {
return nil, err
}
}
result.SetDefault(defaultTransform)
return result, nil
}
217 changes: 217 additions & 0 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ import (

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
kscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -121,6 +123,221 @@ var _ = Describe("Multi-Namespace Informer Cache", func() {
var _ = Describe("Informer Cache without DeepCopy", func() {
CacheTest(cache.New, cache.Options{UnsafeDisableDeepCopyByObject: cache.DisableDeepCopyByObject{cache.ObjectAll{}: true}})
})

var _ = Describe("Cache with transformers", func() {
var (
informerCache cache.Cache
informerCacheCtx context.Context
informerCacheCancel context.CancelFunc
knownPod1 client.Object
knownPod2 client.Object
knownPod3 client.Object
knownPod4 client.Object
knownPod5 client.Object
knownPod6 client.Object
)

getTransformValue := func(obj client.Object) string {
accessor, err := meta.Accessor(obj)
if err == nil {
annotations := accessor.GetAnnotations()
if val, exists := annotations["transformed"]; exists {
return val
}
}
return ""
}

BeforeEach(func() {
informerCacheCtx, informerCacheCancel = context.WithCancel(context.Background())
Expect(cfg).NotTo(BeNil())

By("creating three pods")
cl, err := client.New(cfg, client.Options{})
Expect(err).NotTo(HaveOccurred())
err = ensureNode(testNodeOne, cl)
Expect(err).NotTo(HaveOccurred())
err = ensureNamespace(testNamespaceOne, cl)
Expect(err).NotTo(HaveOccurred())
err = ensureNamespace(testNamespaceTwo, cl)
Expect(err).NotTo(HaveOccurred())
err = ensureNamespace(testNamespaceThree, cl)
Expect(err).NotTo(HaveOccurred())
// Includes restart policy since these objects are indexed on this field.
knownPod1 = createPod("test-pod-1", testNamespaceOne, corev1.RestartPolicyNever)
knownPod2 = createPod("test-pod-2", testNamespaceTwo, corev1.RestartPolicyAlways)
knownPod3 = createPodWithLabels("test-pod-3", testNamespaceTwo, corev1.RestartPolicyOnFailure, map[string]string{"common-label": "common"})
knownPod4 = createPodWithLabels("test-pod-4", testNamespaceThree, corev1.RestartPolicyNever, map[string]string{"common-label": "common"})
knownPod5 = createPod("test-pod-5", testNamespaceOne, corev1.RestartPolicyNever)
knownPod6 = createPod("test-pod-6", testNamespaceTwo, corev1.RestartPolicyAlways)

podGVK := schema.GroupVersionKind{
Kind: "Pod",
Version: "v1",
}

knownPod1.GetObjectKind().SetGroupVersionKind(podGVK)
knownPod2.GetObjectKind().SetGroupVersionKind(podGVK)
knownPod3.GetObjectKind().SetGroupVersionKind(podGVK)
knownPod4.GetObjectKind().SetGroupVersionKind(podGVK)
knownPod5.GetObjectKind().SetGroupVersionKind(podGVK)
knownPod6.GetObjectKind().SetGroupVersionKind(podGVK)

By("creating the informer cache")
informerCache, err = cache.New(cfg, cache.Options{
DefaultTransform: func(i interface{}) (interface{}, error) {
obj := i.(runtime.Object)
Expect(obj).NotTo(BeNil())

accessor, err := meta.Accessor(obj)
Expect(err).To(BeNil())
annotations := accessor.GetAnnotations()

if _, exists := annotations["transformed"]; exists {
// Avoid performing transformation multiple times.
return i, nil
}

if annotations == nil {
annotations = make(map[string]string)
}
annotations["transformed"] = "default"
accessor.SetAnnotations(annotations)
return i, nil
},
TransformByObject: cache.TransformByObject{
&corev1.Pod{}: func(i interface{}) (interface{}, error) {
obj := i.(runtime.Object)
Expect(obj).NotTo(BeNil())
accessor, err := meta.Accessor(obj)
Expect(err).To(BeNil())

annotations := accessor.GetAnnotations()
if _, exists := annotations["transformed"]; exists {
// Avoid performing transformation multiple times.
return i, nil
}

if annotations == nil {
annotations = make(map[string]string)
}
annotations["transformed"] = "explicit"
accessor.SetAnnotations(annotations)
return i, nil
},
},
})
Expect(err).NotTo(HaveOccurred())
By("running the cache and waiting for it to sync")
// pass as an arg so that we don't race between close and re-assign
go func(ctx context.Context) {
defer GinkgoRecover()
Expect(informerCache.Start(ctx)).To(Succeed())
}(informerCacheCtx)
Expect(informerCache.WaitForCacheSync(informerCacheCtx)).To(BeTrue())
})

AfterEach(func() {
By("cleaning up created pods")
deletePod(knownPod1)
deletePod(knownPod2)
deletePod(knownPod3)
deletePod(knownPod4)
deletePod(knownPod5)
deletePod(knownPod6)

informerCacheCancel()
})

Context("with structured objects", func() {
It("should apply transformers to explicitly specified GVKS", func() {
By("listing pods")
out := corev1.PodList{}
Expect(informerCache.List(context.Background(), &out)).To(Succeed())

By("verifying that the returned pods were transformed")
for i := 0; i < len(out.Items); i++ {
Expect(getTransformValue(&out.Items[i])).To(BeIdenticalTo("explicit"))
}
})

It("should apply default transformer to objects when none is specified", func() {
By("getting the Kubernetes service")
svc := &corev1.Service{}
svcKey := client.ObjectKey{Namespace: "default", Name: "kubernetes"}
Expect(informerCache.Get(context.Background(), svcKey, svc)).To(Succeed())

By("verifying that the returned service was transformed")
Expect(getTransformValue(svc)).To(BeIdenticalTo("default"))
})
})

Context("with unstructured objects", func() {
It("should apply transformers to explicitly specified GVKS", func() {
By("listing pods")
out := unstructured.UnstructuredList{}
out.SetGroupVersionKind(schema.GroupVersionKind{
Group: "",
Version: "v1",
Kind: "PodList",
})
Expect(informerCache.List(context.Background(), &out)).To(Succeed())

By("verifying that the returned pods were transformed")
for i := 0; i < len(out.Items); i++ {
Expect(getTransformValue(&out.Items[i])).To(BeIdenticalTo("explicit"))
}
})

It("should apply default transformer to objects when none is specified", func() {
By("getting the Kubernetes service")
svc := &unstructured.Unstructured{}
svc.SetGroupVersionKind(schema.GroupVersionKind{
Group: "",
Version: "v1",
Kind: "Service",
})
svcKey := client.ObjectKey{Namespace: "default", Name: "kubernetes"}
Expect(informerCache.Get(context.Background(), svcKey, svc)).To(Succeed())

By("verifying that the returned service was transformed")
Expect(getTransformValue(svc)).To(BeIdenticalTo("default"))
})
})

Context("with metadata-only objects", func() {
It("should apply transformers to explicitly specified GVKS", func() {
By("listing pods")
out := metav1.PartialObjectMetadataList{}
out.SetGroupVersionKind(schema.GroupVersionKind{
Group: "",
Version: "v1",
Kind: "PodList",
})
Expect(informerCache.List(context.Background(), &out)).To(Succeed())

By("verifying that the returned pods were transformed")
for i := 0; i < len(out.Items); i++ {
Expect(getTransformValue(&out.Items[i])).To(BeIdenticalTo("explicit"))
}
})
It("should apply default transformer to objects when none is specified", func() {
By("getting the Kubernetes service")
svc := &metav1.PartialObjectMetadata{}
svc.SetGroupVersionKind(schema.GroupVersionKind{
Group: "",
Version: "v1",
Kind: "Service",
})
svcKey := client.ObjectKey{Namespace: "default", Name: "kubernetes"}
Expect(informerCache.Get(context.Background(), svcKey, svc)).To(Succeed())

By("verifying that the returned service was transformed")
Expect(getTransformValue(svc)).To(BeIdenticalTo("default"))
})
})
})

var _ = Describe("Cache with selectors", func() {
defer GinkgoRecover()
var (
Expand Down
19 changes: 10 additions & 9 deletions pkg/cache/internal/deleg_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,12 @@ func NewInformersMap(config *rest.Config,
namespace string,
selectors SelectorsByGVK,
disableDeepCopy DisableDeepCopyByGVK,
transformers TransformFuncByObject,
) *InformersMap {
return &InformersMap{
structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy),
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy),
metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy),
structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, transformers),
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, transformers),
metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, transformers),

Scheme: scheme,
}
Expand Down Expand Up @@ -108,18 +109,18 @@ func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj

// newStructuredInformersMap creates a new InformersMap for structured objects.
func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration,
namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, createStructuredListWatch)
namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK, transformers TransformFuncByObject) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, transformers, createStructuredListWatch)
}

// newUnstructuredInformersMap creates a new InformersMap for unstructured objects.
func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration,
namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, createUnstructuredListWatch)
namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK, transformers TransformFuncByObject) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, transformers, createUnstructuredListWatch)
}

// newMetadataInformersMap creates a new InformersMap for metadata-only objects.
func newMetadataInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration,
namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, createMetadataListWatch)
namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK, transformers TransformFuncByObject) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, transformers, createMetadataListWatch)
}
14 changes: 13 additions & 1 deletion pkg/cache/internal/informers_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ func newSpecificInformersMap(config *rest.Config,
namespace string,
selectors SelectorsByGVK,
disableDeepCopy DisableDeepCopyByGVK,
createListWatcher createListWatcherFunc) *specificInformersMap {
transformers TransformFuncByObject,
createListWatcher createListWatcherFunc,
) *specificInformersMap {
ip := &specificInformersMap{
config: config,
Scheme: scheme,
Expand All @@ -68,6 +70,7 @@ func newSpecificInformersMap(config *rest.Config,
namespace: namespace,
selectors: selectors.forGVK,
disableDeepCopy: disableDeepCopy,
transformers: transformers,
}
return ip
}
Expand Down Expand Up @@ -135,6 +138,9 @@ type specificInformersMap struct {

// disableDeepCopy indicates not to deep copy objects during get or list objects.
disableDeepCopy DisableDeepCopyByGVK

// transform funcs are applied to objects before they are committed to the cache
transformers TransformFuncByObject
}

// Start calls Run on each of the informers and sets started to true. Blocks on the context.
Expand Down Expand Up @@ -227,6 +233,12 @@ func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, ob
ni := cache.NewSharedIndexInformer(lw, obj, resyncPeriod(ip.resync)(), cache.Indexers{
cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
})

// Check to see if there is a transformer for this gvk
if err := ni.SetTransform(ip.transformers.Get(gvk)); err != nil {
return nil, false, err
}

rm, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return nil, false, err
Expand Down
Loading

0 comments on commit da9d35c

Please sign in to comment.