Skip to content

Commit

Permalink
fix the binding watcher
Browse files Browse the repository at this point in the history
  • Loading branch information
Ryan Zhang committed Jul 16, 2024
1 parent 572ab98 commit 9502209
Show file tree
Hide file tree
Showing 18 changed files with 844 additions and 142 deletions.
8 changes: 5 additions & 3 deletions pkg/controllers/clusterresourcebindingwatcher/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/metrics/server"

Expand All @@ -44,10 +43,13 @@ func TestAPIs(t *testing.T) {
}

var _ = BeforeSuite(func() {
klog.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true)))

ctx, cancel = context.WithCancel(context.TODO())

By("Setup klog")
fs := flag.NewFlagSet("klog", flag.ContinueOnError)
klog.InitFlags(fs)
Expect(fs.Parse([]string{"--v", "5", "-add_dir_header", "true"})).Should(Succeed())

By("bootstrapping test environment")
testEnv = &envtest.Environment{
CRDDirectoryPaths: []string{filepath.Join("../../../", "config", "crd", "bases")},
Expand Down
13 changes: 9 additions & 4 deletions pkg/controllers/clusterresourcebindingwatcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/predicate"

fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1"
"go.goms.io/fleet/pkg/utils"
"go.goms.io/fleet/pkg/utils/condition"
"go.goms.io/fleet/pkg/utils/controller"
)
Expand Down Expand Up @@ -99,7 +100,7 @@ func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
klog.ErrorS(err, "Failed to process update event")
return false
}
return areConditionsUpdated(oldBinding, newBinding)
return isBindingStatusUpdated(oldBinding, newBinding)
},
}

Expand All @@ -109,14 +110,18 @@ func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
Complete(r)
}

func areConditionsUpdated(oldBinding, newBinding *fleetv1beta1.ClusterResourceBinding) bool {
func isBindingStatusUpdated(oldBinding, newBinding *fleetv1beta1.ClusterResourceBinding) bool {
for i := condition.RolloutStartedCondition; i < condition.TotalCondition; i++ {
oldCond := oldBinding.GetCondition(string(i.ResourceBindingConditionType()))
newCond := newBinding.GetCondition(string(i.ResourceBindingConditionType()))
// oldCond.ObservedGeneration will always be less than or equal to newCond.ObservedGeneration.
if !condition.EqualCondition(oldCond, newCond) {
if !condition.EqualCondition(newCond, oldCond) {
klog.V(2).InfoS("The binding condition has changed, need to update the corresponding CRP", "oldBinding", klog.KObj(oldBinding), "newBinding", klog.KObj(newBinding))
return true
}
}
if !utils.IsFailedResourcePlacementsEqual(oldBinding.Status.FailedPlacements, newBinding.Status.FailedPlacements) {
klog.V(2).InfoS("The binding failed placement has changed, need to update the corresponding CRP", "oldBinding", klog.KObj(oldBinding), "newBinding", klog.KObj(newBinding))
return true
}
return false
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,11 @@ var _ = Describe("Test ClusterResourceBinding Watcher - update metadata", Serial
})

AfterEach(func() {
crb.Name = testCRBName
By("Deleting the clusterResourceBinding")
Expect(k8sClient.Delete(ctx, crb)).Should(Succeed(), "failed to delete cluster resource binding")
})

It("Should not enqueue the clusterResourcePlacement name for reconciling, when clusterResourceBinding spec, status doesn't change", func() {
It("Should not enqueue the clusterResourcePlacement name for reconciling, when only meta data changed", func() {
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: testCRBName}, crb)).Should(Succeed(), "failed to get cluster resource binding")
labels := crb.GetLabels()
labels["test-key"] = "test-value"
Expand All @@ -79,12 +78,23 @@ var _ = Describe("Test ClusterResourceBinding Watcher - update metadata", Serial
By("Checking placement controller queue")
consistentlyCheckPlacementControllerQueueIsEmpty()
})

It("Should not enqueue the clusterResourcePlacement name for reconciling, when only spec changed", func() {
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: testCRBName}, crb)).Should(Succeed(), "failed to get cluster resource binding")
crb.Spec.State = fleetv1beta1.BindingStateBound
Expect(k8sClient.Update(ctx, crb)).Should(Succeed(), "failed to update cluster resource binding")

By("Checking placement controller queue")
consistentlyCheckPlacementControllerQueueIsEmpty()
})
})

// This container cannot be run in parallel with other ITs because it uses a shared fakePlacementController. These tests are also ordered.
var _ = Describe("Test ClusterResourceBinding Watcher - update status", Serial, Ordered, func() {
var crb *fleetv1beta1.ClusterResourceBinding
var currentTime metav1.Time
BeforeAll(func() {
currentTime = metav1.Now()
fakePlacementController.ResetQueue()
By("Creating a new clusterResourceBinding")
crb = clusterResourceBindingForTest()
Expand Down Expand Up @@ -123,30 +133,129 @@ var _ = Describe("Test ClusterResourceBinding Watcher - update status", Serial,
validateWhenUpdateClusterResourceBindingStatusWithCondition(fleetv1beta1.ResourceBindingAvailable, crb.Generation, metav1.ConditionFalse, testReason1)
})

It("Should enqueue the clusterResourcePlacement name for reconciling, when condition's observed generation changes", func() {
validateWhenUpdateClusterResourceBindingStatusWithCondition(fleetv1beta1.ResourceBindingRolloutStarted, crb.Generation+1, metav1.ConditionFalse, testReason1)
})

It("Should enqueue the clusterResourcePlacement name for reconciling, when condition's reason changes", func() {
validateWhenUpdateClusterResourceBindingStatusWithCondition(fleetv1beta1.ResourceBindingOverridden, crb.Generation, metav1.ConditionFalse, testReason2)
})

It("Should not enqueue the clusterResourcePlacement name for reconciling, when only condition's last transition time changes", func() {
It("Should not enqueue the clusterResourcePlacement name for reconciling, when condition's observed generation changes", func() {
crb := &fleetv1beta1.ClusterResourceBinding{}
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: testCRBName}, crb)).Should(Succeed(), "failed to get cluster resource binding")
condition := metav1.Condition{
Type: string(fleetv1beta1.ResourceBindingOverridden),
ObservedGeneration: crb.Generation + 1,
Status: metav1.ConditionFalse,
Reason: testReason2,
LastTransitionTime: currentTime,
}
By(fmt.Sprintf("Updating the clusterResourceBinding status - %s, %d, %s, %s", fleetv1beta1.ResourceBindingOverridden, crb.Generation, metav1.ConditionFalse, testReason2))
crb.SetConditions(condition)
Expect(k8sClient.Status().Update(ctx, crb)).Should(Succeed(), "failed to update cluster resource binding status")

consistentlyCheckPlacementControllerQueueIsEmpty()
})

It("Should not enqueue the clusterResourcePlacement name for reconciling, when only condition's last transition time changes", func() {
crb := &fleetv1beta1.ClusterResourceBinding{}
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: testCRBName}, crb)).Should(Succeed(), "failed to get cluster resource binding")
newTime := metav1.NewTime(currentTime.Add(10 * time.Second))
condition := metav1.Condition{
Type: string(fleetv1beta1.ResourceBindingOverridden),
ObservedGeneration: crb.Generation,
ObservedGeneration: crb.Generation + 1,
Status: metav1.ConditionFalse,
Reason: testReason2,
LastTransitionTime: metav1.Now(),
LastTransitionTime: newTime,
}
By(fmt.Sprintf("Updating the clusterResourceBinding status - %s, %d, %s, %s", fleetv1beta1.ResourceBindingOverridden, crb.Generation, metav1.ConditionFalse, testReason2))
crb.SetConditions(condition)
Expect(k8sClient.Status().Update(ctx, crb)).Should(Succeed(), "failed to update cluster resource binding status")

consistentlyCheckPlacementControllerQueueIsEmpty()
})

Context("Should enqueue the clusterResourcePlacement name for reconciling, when the failed placement list has changed", Serial, Ordered, func() {
It("Should enqueue the clusterResourcePlacement name for reconciling, when there are new failed placements", func() {
crb := &fleetv1beta1.ClusterResourceBinding{}
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: testCRBName}, crb)).Should(Succeed(), "failed to get cluster resource binding")
crb.Status.FailedPlacements = []fleetv1beta1.FailedResourcePlacement{
{
ResourceIdentifier: fleetv1beta1.ResourceIdentifier{
Group: "",
Version: "v1",
Kind: "Service",
Name: "svc-name",
Namespace: "svc-namespace",
},
Condition: metav1.Condition{
Type: fleetv1beta1.WorkConditionTypeAvailable,
Status: metav1.ConditionFalse,
Reason: "fakeFailedAvailableReason",
Message: "fakeFailedAvailableMessage",
LastTransitionTime: metav1.Now(),
},
},
{
ResourceIdentifier: fleetv1beta1.ResourceIdentifier{
Group: "",
Version: "v1",
Kind: "ConfigMap",
Name: "config-name",
Namespace: "config-namespace",
},
Condition: metav1.Condition{
Type: fleetv1beta1.WorkConditionTypeAvailable,
Status: metav1.ConditionFalse,
Reason: "fakeFailedAvailableReason",
Message: "fakeFailedAvailableMessage",
LastTransitionTime: metav1.Now(),
},
},
}
Expect(k8sClient.Status().Update(ctx, crb)).Should(Succeed(), "failed to update cluster resource binding status")

By("Checking placement controller queue")
eventuallyCheckPlacementControllerQueue(crb.GetLabels()[fleetv1beta1.CRPTrackingLabel])
fakePlacementController.ResetQueue()
})

It("Should enqueue the clusterResourcePlacement name for reconciling, when there are one less failed placements", func() {
crb := &fleetv1beta1.ClusterResourceBinding{}
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: testCRBName}, crb)).Should(Succeed(), "failed to get cluster resource binding")
crb.Status.FailedPlacements = []fleetv1beta1.FailedResourcePlacement{
{
ResourceIdentifier: fleetv1beta1.ResourceIdentifier{
Group: "",
Version: "v1",
Kind: "Service",
Name: "svc-name",
Namespace: "svc-namespace",
},
Condition: metav1.Condition{
Type: fleetv1beta1.WorkConditionTypeAvailable,
Status: metav1.ConditionFalse,
Reason: "fakeFailedAvailableReason",
Message: "fakeFailedAvailableMessage",
LastTransitionTime: metav1.Now(),
},
},
}
Expect(k8sClient.Status().Update(ctx, crb)).Should(Succeed(), "failed to update cluster resource binding status")

By("Checking placement controller queue")
eventuallyCheckPlacementControllerQueue(crb.GetLabels()[fleetv1beta1.CRPTrackingLabel])
fakePlacementController.ResetQueue()
})

It("Should enqueue the clusterResourcePlacement name for reconciling, when there are no more failed placements", func() {
crb := &fleetv1beta1.ClusterResourceBinding{}
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: testCRBName}, crb)).Should(Succeed(), "failed to get cluster resource binding")
crb.Status.FailedPlacements = []fleetv1beta1.FailedResourcePlacement{}
Expect(k8sClient.Status().Update(ctx, crb)).Should(Succeed(), "failed to update cluster resource binding status")

By("Checking placement controller queue")
eventuallyCheckPlacementControllerQueue(crb.GetLabels()[fleetv1beta1.CRPTrackingLabel])
fakePlacementController.ResetQueue()
})
})
})

func clusterResourceBindingForTest() *fleetv1beta1.ClusterResourceBinding {
Expand Down
5 changes: 1 addition & 4 deletions pkg/controllers/workgenerator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ import (
"strings"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -888,8 +886,7 @@ func (r *Reconciler) SetupWithManager(mgr controllerruntime.Manager) error {
// we need to compare the failed placement if the work is not applied or available
oldFailedPlacements := extractFailedResourcePlacementsFromWork(oldWork)
newFailedPlacements := extractFailedResourcePlacementsFromWork(newWork)
if cmp.Equal(oldFailedPlacements, newFailedPlacements, cmp.Options{cmpopts.SortSlices(condition.LessFuncResourceIdentifier),
cmpopts.SortSlices(condition.LessFuncFailedResourcePlacements), condition.IgnoreConditionLTTAndMessageFields, cmpopts.EquateEmpty()}) {
if utils.IsFailedResourcePlacementsEqual(oldFailedPlacements, newFailedPlacements) {
klog.V(2).InfoS("The failed placement list didn't change on failed work, no need to reconcile", "oldWork", klog.KObj(oldWork), "newWork", klog.KObj(newWork))
return
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/controllers/workgenerator/controller_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ var (
validResourceOverrideSnapshot placementv1alpha1.ResourceOverrideSnapshot
invalidClusterResourceOverrideSnapshot placementv1alpha1.ClusterResourceOverrideSnapshot

cmpConditionOption = cmp.Options{cmpopts.SortSlices(condition.LessFuncResourceIdentifier),
cmpopts.SortSlices(condition.LessFuncFailedResourcePlacements), condition.IgnoreConditionLTTAndMessageFields, cmpopts.EquateEmpty()}
cmpConditionOption = cmp.Options{cmpopts.SortSlices(utils.LessFuncFailedResourcePlacements), utils.IgnoreConditionLTTAndMessageFields, cmpopts.EquateEmpty()}

fakeFailedAppliedReason = "fakeApplyFailureReason"
fakeFailedAppliedMessage = "fake apply failure message"
Expand Down
53 changes: 53 additions & 0 deletions pkg/utils/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"crypto/rand"
"fmt"
"math/big"
"sort"
"strings"
"time"

Expand All @@ -18,6 +19,7 @@ import (
discoveryv1 "k8s.io/api/discovery/v1"
rbacv1 "k8s.io/api/rbac/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand All @@ -34,6 +36,7 @@ import (
placementv1alpha1 "go.goms.io/fleet/apis/placement/v1alpha1"
placementv1beta1 "go.goms.io/fleet/apis/placement/v1beta1"
fleetv1alpha1 "go.goms.io/fleet/apis/v1alpha1"
"go.goms.io/fleet/pkg/utils/condition"
"go.goms.io/fleet/pkg/utils/controller"
"go.goms.io/fleet/pkg/utils/informer"
)
Expand Down Expand Up @@ -84,6 +87,9 @@ const (

// ResourceIdentifierStringFormat is the format of the resource identifier string.
ResourceIdentifierStringFormat = "%s/%s/%s/%s/%s"

// ResourceIdentifierWithEnvelopeIdentifierStringFormat is the format of the resource identifier string with envelope identifier.
ResourceIdentifierWithEnvelopeIdentifierStringFormat = "%s/%s/%s/%s/%s/%s/%s/%s"
)

var (
Expand Down Expand Up @@ -486,3 +492,50 @@ func GenerateGroupString(groups []string) string {
}
return groupString
}

// LessFuncResourceIdentifier is a less function for sorting resource identifiers
var LessFuncResourceIdentifier = func(a, b placementv1beta1.ResourceIdentifier) bool {
aStr := fmt.Sprintf(ResourceIdentifierStringFormat, a.Group, a.Version, a.Kind, a.Namespace, a.Name)
bStr := fmt.Sprintf(ResourceIdentifierStringFormat, b.Group, b.Version, b.Kind, b.Namespace, b.Name)
return aStr < bStr
}

// LessFuncFailedResourcePlacements is a less function for sorting failed resource placements
var LessFuncFailedResourcePlacements = func(a, b placementv1beta1.FailedResourcePlacement) bool {
var aStr, bStr string
if a.Envelope != nil {
aStr = fmt.Sprintf(ResourceIdentifierWithEnvelopeIdentifierStringFormat, a.Group, a.Version, a.Kind, a.Namespace, a.Name, a.Envelope.Type, a.Envelope.Namespace, a.Envelope.Name)
} else {
aStr = fmt.Sprintf(ResourceIdentifierStringFormat, a.Group, a.Version, a.Kind, a.Namespace, a.Name)
}
if b.Envelope != nil {
bStr = fmt.Sprintf(ResourceIdentifierWithEnvelopeIdentifierStringFormat, b.Group, b.Version, b.Kind, b.Namespace, b.Name, b.Envelope.Type, b.Envelope.Namespace, b.Envelope.Name)
} else {
bStr = fmt.Sprintf(ResourceIdentifierStringFormat, b.Group, b.Version, b.Kind, b.Namespace, b.Name)

}
return aStr < bStr
}

func IsFailedResourcePlacementsEqual(oldFailedResourcePlacements, newFailedResourcePlacements []placementv1beta1.FailedResourcePlacement) bool {
if len(oldFailedResourcePlacements) != len(newFailedResourcePlacements) {
return false
}
sort.Slice(oldFailedResourcePlacements, func(i, j int) bool {
return LessFuncFailedResourcePlacements(oldFailedResourcePlacements[i], oldFailedResourcePlacements[j])
})
sort.Slice(newFailedResourcePlacements, func(i, j int) bool {
return LessFuncFailedResourcePlacements(newFailedResourcePlacements[i], newFailedResourcePlacements[j])
})
for i := range oldFailedResourcePlacements {
oldFailedResourcePlacement := oldFailedResourcePlacements[i]
newFailedResourcePlacement := newFailedResourcePlacements[i]
if !equality.Semantic.DeepEqual(oldFailedResourcePlacement.ResourceIdentifier, newFailedResourcePlacement.ResourceIdentifier) {
return false
}
if !condition.EqualCondition(&newFailedResourcePlacement.Condition, &oldFailedResourcePlacement.Condition) {
return false
}
}
return true
}
Loading

0 comments on commit 9502209

Please sign in to comment.