Skip to content

Commit

Permalink
Add webhook for ClusterResourcePlacement
Browse files Browse the repository at this point in the history
  • Loading branch information
Fei-Guo authored and ryanzhang-oss committed Aug 26, 2022
1 parent 9fba06b commit 5cf6968
Show file tree
Hide file tree
Showing 13 changed files with 200 additions and 38 deletions.
97 changes: 97 additions & 0 deletions apis/v1alpha1/clusterresourceplacement_webhook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
Copyright (c) Microsoft Corporation.
Licensed under the MIT license.
*/

package v1alpha1

import (
"fmt"

"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
apiErrors "k8s.io/apimachinery/pkg/util/errors"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/webhook"

"go.goms.io/fleet/pkg/utils/informermanager"
)

var ResourceInformer informermanager.InformerManager
var restMapper meta.RESTMapper

func (c *ClusterResourcePlacement) SetupWebhookWithManager(mgr ctrl.Manager) error {
restMapper = mgr.GetRESTMapper()
return ctrl.NewWebhookManagedBy(mgr).
For(c).
Complete()
}

var _ webhook.Validator = &ClusterResourcePlacement{}

func (c *ClusterResourcePlacement) ValidateCreate() error {
return ValidateClusterResourcePlacement(c)
}

func (c *ClusterResourcePlacement) ValidateUpdate(old runtime.Object) error {
// TODO: validate changes against old if needed
return ValidateClusterResourcePlacement(c)
}

func (c *ClusterResourcePlacement) ValidateDelete() error {
// do nothing for delete request
return nil
}

// ValidateClusterResourcePlacement validate a ClusterResourcePlacement object
func ValidateClusterResourcePlacement(clusterResourcePlacement *ClusterResourcePlacement) error {
allErr := make([]error, 0)

for _, selector := range clusterResourcePlacement.Spec.ResourceSelectors {
//TODO: make sure the selector's gvk is valid
if selector.LabelSelector != nil {
if len(selector.Name) != 0 {
allErr = append(allErr, fmt.Errorf("the labelSelector and name fields are mutually exclusive in selector %+v", selector))
}
if _, err := metav1.LabelSelectorAsSelector(selector.LabelSelector); err != nil {
allErr = append(allErr, errors.Wrap(err, fmt.Sprintf("the labelSelector in resource selector %+v is invalid", selector)))
}
}
}

if clusterResourcePlacement.Spec.Policy != nil && clusterResourcePlacement.Spec.Policy.Affinity != nil &&
clusterResourcePlacement.Spec.Policy.Affinity.ClusterAffinity != nil {
for _, selector := range clusterResourcePlacement.Spec.Policy.Affinity.ClusterAffinity.ClusterSelectorTerms {
if _, err := metav1.LabelSelectorAsSelector(&selector.LabelSelector); err != nil {
allErr = append(allErr, errors.Wrap(err, fmt.Sprintf("the labelSelector in cluster selector %+v is invalid", selector)))
}
}
}

// we leverage the informermanager in the changedetector controller to do the resource scope validation
if ResourceInformer == nil {
allErr = append(allErr, fmt.Errorf("cannot perform resource scope check for now, please retry"))
} else {
for _, selector := range clusterResourcePlacement.Spec.ResourceSelectors {
gk := schema.GroupKind{
Group: selector.Group,
Kind: selector.Kind,
}

restMapping, err := restMapper.RESTMapping(gk, selector.Version)
if err != nil {
allErr = append(allErr, errors.Wrap(err, "Failed to get GVR of object"))
continue
}

if !ResourceInformer.IsClusterScopedResources(restMapping.Resource) {
allErr = append(allErr, fmt.Errorf("the placed resource is not found in schema or it is not a cluster scoped resource: %v", restMapping.Resource))
}
}
}

return apiErrors.NewAggregate(allErr)
}
2 changes: 1 addition & 1 deletion apis/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion cmd/hubagent/workload/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"go.goms.io/fleet/pkg/resourcewatcher"
"go.goms.io/fleet/pkg/utils"
"go.goms.io/fleet/pkg/utils/controller"
"go.goms.io/fleet/pkg/utils/informermanager"
)

const (
Expand Down Expand Up @@ -79,7 +80,8 @@ func SetupControllers(ctx context.Context, mgr ctrl.Manager, config *rest.Config
}

// the manager for all the dynamically created informers
dynamicInformerManager := utils.NewInformerManager(dynamicClient, opts.ResyncPeriod.Duration, ctx.Done())
dynamicInformerManager := informermanager.NewInformerManager(dynamicClient, opts.ResyncPeriod.Duration, ctx.Done())
fleetv1alpha1.ResourceInformer = dynamicInformerManager // webhook needs this to check resource scope

// Set up a custom controller to reconcile cluster resource placement
klog.Info("Setting up clusterResourcePlacement controller")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
fleetv1alpha1 "go.goms.io/fleet/apis/v1alpha1"
"go.goms.io/fleet/pkg/utils"
"go.goms.io/fleet/pkg/utils/controller"
"go.goms.io/fleet/pkg/utils/informermanager"
"go.goms.io/fleet/pkg/utils/validator"
)

Expand All @@ -42,7 +43,7 @@ var (
// Reconciler reconciles a cluster resource placement object
type Reconciler struct {
// the informer contains the cache for all the resources we need
InformerManager utils.InformerManager
InformerManager informermanager.InformerManager

RestMapper meta.RESTMapper

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ import (
fleetv1alpha1 "go.goms.io/fleet/apis/v1alpha1"
"go.goms.io/fleet/pkg/utils"
"go.goms.io/fleet/pkg/utils/controller"
"go.goms.io/fleet/pkg/utils/informermanager"
)

// Reconciler reconciles a MemberCluster object
type Reconciler struct {
// the informer contains the cache for all the resources we need
InformerManager utils.InformerManager
InformerManager informermanager.InformerManager

// PlacementController maintains a rate limited queue which used to store
// the name of the clusterResourcePlacement and a reconcile function to consume the items in queue.
Expand Down
3 changes: 2 additions & 1 deletion pkg/controllers/resourcechange/resourcechange_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
fleetv1alpha1 "go.goms.io/fleet/apis/v1alpha1"
"go.goms.io/fleet/pkg/utils"
"go.goms.io/fleet/pkg/utils/controller"
"go.goms.io/fleet/pkg/utils/informermanager"
"go.goms.io/fleet/pkg/utils/keys"
"go.goms.io/fleet/pkg/utils/validator"
)
Expand All @@ -39,7 +40,7 @@ type Reconciler struct {
RestMapper meta.RESTMapper

// InformerManager holds all the informers that we can use to read from
InformerManager utils.InformerManager
InformerManager informermanager.InformerManager

// PlacementController exposes the placement queue for the reconciler to push to
PlacementController controller.Controller
Expand Down
11 changes: 6 additions & 5 deletions pkg/resourcewatcher/change_dector.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"go.goms.io/fleet/pkg/utils"
"go.goms.io/fleet/pkg/utils/controller"
"go.goms.io/fleet/pkg/utils/informermanager"
"go.goms.io/fleet/pkg/utils/keys"
)

Expand Down Expand Up @@ -52,7 +53,7 @@ type ChangeDetector struct {
MemberClusterPlacementController controller.Controller

// InformerManager manages all the dynamic informers created by the discovery client
InformerManager utils.InformerManager
InformerManager informermanager.InformerManager

// DisabledResourceConfig contains all the api resources that we won't select
DisabledResourceConfig *utils.DisabledResourceConfig
Expand Down Expand Up @@ -80,7 +81,7 @@ func (d *ChangeDetector) Start(ctx context.Context) error {
clusterPlacementEventHandler := newHandlerOnEvents(d.onClusterResourcePlacementAdded,
d.onClusterResourcePlacementUpdated, d.onClusterResourcePlacementDeleted)
d.InformerManager.AddStaticResource(
utils.APIResourceMeta{
informermanager.APIResourceMeta{
GroupVersionResource: utils.ClusterResourcePlacementGVR,
IsClusterScoped: true,
}, clusterPlacementEventHandler)
Expand All @@ -89,7 +90,7 @@ func (d *ChangeDetector) Start(ctx context.Context) error {
// the placement queue. We don't need to handle the add event as they are placed by the placement controller.
workEventHandler := newHandlerOnEvents(nil, d.onWorkUpdated, d.onWorkDeleted)
d.InformerManager.AddStaticResource(
utils.APIResourceMeta{
informermanager.APIResourceMeta{
GroupVersionResource: utils.WorkGVR,
IsClusterScoped: false,
}, workEventHandler)
Expand All @@ -98,7 +99,7 @@ func (d *ChangeDetector) Start(ctx context.Context) error {
// delete event as the work resources in this cluster will all get deleted which will trigger placement reconcile.
memberClusterEventHandler := newHandlerOnEvents(nil, d.onMemberClusterUpdated, nil)
d.InformerManager.AddStaticResource(
utils.APIResourceMeta{
informermanager.APIResourceMeta{
GroupVersionResource: utils.MemberClusterGVR,
IsClusterScoped: true,
}, memberClusterEventHandler)
Expand Down Expand Up @@ -144,7 +145,7 @@ func (d *ChangeDetector) discoverAPIResourcesLoop(ctx context.Context, period ti
// discoverResources goes through all the api resources in the cluster and create informers on selected types
func (d *ChangeDetector) discoverResources(dynamicResourceEventHandler cache.ResourceEventHandler) {
newResources, err := d.getWatchableResources()
var dynamicResources []utils.APIResourceMeta
var dynamicResources []informermanager.APIResourceMeta
if err != nil {
klog.ErrorS(err, "Failed to get all the api resources from the cluster")
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/resourcewatcher/resource_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ import (
"k8s.io/client-go/discovery"
"k8s.io/klog/v2"

"go.goms.io/fleet/pkg/utils"
"go.goms.io/fleet/pkg/utils/informermanager"
)

// getWatchableResources returns all api resources from discoveryClient that we can watch.
// More specifically, all api resources which support the 'list', and 'watch' verbs.
// All discovery errors are considered temporary. Upon encountering any error,
// getWatchableResources will log and return any discovered resources it was able to process (which may be none).
func (d *ChangeDetector) getWatchableResources() ([]utils.APIResourceMeta, error) {
func (d *ChangeDetector) getWatchableResources() ([]informermanager.APIResourceMeta, error) {
// Get all the resources this cluster has. We only need to care about the preferred version as the informers watch
// the preferred version will get watch event for resources on the other versions since there is only one version in etcd.
allResources, discoverError := d.DiscoveryClient.ServerPreferredResources()
Expand All @@ -35,7 +35,7 @@ func (d *ChangeDetector) getWatchableResources() ([]utils.APIResourceMeta, error
return nil, discoverError
}

watchableGroupVersionResources := make([]utils.APIResourceMeta, 0)
watchableGroupVersionResources := make([]informermanager.APIResourceMeta, 0)

// This is extracted from discovery.GroupVersionResources to only watch watchable resources
watchableResources := discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"list", "watch"}}, allResources)
Expand All @@ -48,7 +48,7 @@ func (d *ChangeDetector) getWatchableResources() ([]utils.APIResourceMeta, error
}
for i := range rl.APIResources {
gvr := schema.GroupVersionResource{Group: gv.Group, Version: gv.Version, Resource: rl.APIResources[i].Name}
watchableGroupVersionResources = append(watchableGroupVersionResources, utils.APIResourceMeta{
watchableGroupVersionResources = append(watchableGroupVersionResources, informermanager.APIResourceMeta{
GroupVersionResource: gvr,
IsClusterScoped: !rl.APIResources[i].Namespaced,
})
Expand Down
23 changes: 2 additions & 21 deletions pkg/utils/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ Licensed under the MIT license.
package utils

import (
"context"
"crypto/rand"
"fmt"
"log"
Expand All @@ -27,6 +26,7 @@ import (
workv1alpha1 "sigs.k8s.io/work-api/pkg/apis/v1alpha1"

fleetv1alpha1 "go.goms.io/fleet/apis/v1alpha1"
"go.goms.io/fleet/pkg/utils/informermanager"
)

const (
Expand Down Expand Up @@ -155,25 +155,6 @@ func RandStr() string {
return string(ret)
}

// ContextForChannel derives a child context from a parent channel.
//
// The derived context's Done channel is closed when the returned cancel function
// is called or when the parent channel is closed, whichever happens first.
//
// Note the caller must *always* call the CancelFunc, otherwise resources may be leaked.
func ContextForChannel(parentCh <-chan struct{}) (context.Context, context.CancelFunc) {
ctx, cancel := context.WithCancel(context.Background())

go func() {
select {
case <-parentCh:
cancel()
case <-ctx.Done():
}
}()
return ctx, cancel
}

// CheckCRDInstalled checks if the custom resource definition is installed
func CheckCRDInstalled(discoveryClient discovery.DiscoveryInterface, gvk schema.GroupVersionKind) error {
startTime := time.Now()
Expand All @@ -197,7 +178,7 @@ func CheckCRDInstalled(discoveryClient discovery.DiscoveryInterface, gvk schema.
}

// ShouldPropagateObj decides if one should propagate the object
func ShouldPropagateObj(informerManager InformerManager, uObj *unstructured.Unstructured) (bool, error) {
func ShouldPropagateObj(informerManager informermanager.InformerManager, uObj *unstructured.Unstructured) (bool, error) {
// TODO: add more special handling for different resource kind
switch uObj.GroupVersionKind() {
case corev1.SchemeGroupVersion.WithKind("ConfigMap"):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ Copyright (c) Microsoft Corporation.
Licensed under the MIT license.
*/

package utils
package informermanager

import (
"context"
Expand Down Expand Up @@ -211,3 +211,22 @@ func (s *informerManagerImpl) IsClusterScopedResources(resource schema.GroupVers
func (s *informerManagerImpl) Stop() {
s.cancel()
}

// ContextForChannel derives a child context from a parent channel.
//
// The derived context's Done channel is closed when the returned cancel function
// is called or when the parent channel is closed, whichever happens first.
//
// Note the caller must *always* call the CancelFunc, otherwise resources may be leaked.
func ContextForChannel(parentCh <-chan struct{}) (context.Context, context.CancelFunc) {
ctx, cancel := context.WithCancel(context.Background())

go func() {
select {
case <-parentCh:
cancel()
case <-ctx.Done():
}
}()
return ctx, cancel
}
15 changes: 15 additions & 0 deletions pkg/webhook/add_clusterresourceplacement.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
Copyright (c) Microsoft Corporation.
Licensed under the MIT license.
*/

package webhook

import (
"go.goms.io/fleet/pkg/webhook/clusterresourceplacement"
)

func init() {
// AddToManagerFuncs is a list of functions to create webhook and add them to a manager.
AddToManagerFuncs = append(AddToManagerFuncs, clusterresourceplacement.Add)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
Copyright (c) Microsoft Corporation.
Licensed under the MIT license.
*/

package clusterresourceplacement

import (
"sigs.k8s.io/controller-runtime/pkg/manager"

fleetv1alpha1 "go.goms.io/fleet/apis/v1alpha1"
)

func Add(mgr manager.Manager) error {
return (&fleetv1alpha1.ClusterResourcePlacement{}).SetupWebhookWithManager(mgr)
}
Loading

0 comments on commit 5cf6968

Please sign in to comment.