Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enforce revisionHistoryLimit for resource snapshots #426

Merged
merged 5 commits into from
Jul 14, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 163 additions & 49 deletions pkg/controllers/clusterresourceplacement/controller.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
/*
Copyright (c) Microsoft Corporation.
Licensed under the MIT license.
*/

// Package clusterresourceplacement features a controller to reconcile the clusterResourcePlacement changes.
package clusterresourceplacement

import (
Expand All @@ -17,6 +23,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1"
"go.goms.io/fleet/pkg/utils"
"go.goms.io/fleet/pkg/utils/controller"
)

Expand Down Expand Up @@ -86,7 +93,18 @@ func (r *Reconciler) deleteClusterResourceSnapshots(ctx context.Context, crp *fl
// clusterSchedulingPolicySnapshot status and work status.
// If the error type is ErrUnexpectedBehavior, the controller will skip the reconciling.
func (r *Reconciler) handleUpdate(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement) (ctrl.Result, error) {
_, err := r.getOrCreateClusterSchedulingPolicySnapshot(ctx, crp)
revisionLimit := fleetv1beta1.RevisionHistoryLimitDefaultValue
if crp.Spec.RevisionHistoryLimit != nil {
revisionLimit = *crp.Spec.RevisionHistoryLimit
if revisionLimit <= 0 {
err := fmt.Errorf("invalid clusterResourcePlacement %s: invalid revisionHistoryLimit %d", crp.Name, revisionLimit)
klog.ErrorS(controller.NewUnexpectedBehaviorError(err), "Invalid revisionHistoryLimit value and using default value instead", "clusterResourcePlacement", klog.KObj(crp))
// use the default value instead
revisionLimit = fleetv1beta1.RevisionHistoryLimitDefaultValue
}
}

_, err := r.getOrCreateClusterSchedulingPolicySnapshot(ctx, crp, int(revisionLimit))
if err != nil {
return ctrl.Result{}, err
}
Expand All @@ -97,7 +115,7 @@ func (r *Reconciler) handleUpdate(ctx context.Context, crp *fleetv1beta1.Cluster
resourceSnapshotSpec := fleetv1beta1.ResourceSnapshotSpec{
SelectedResources: selectedResources,
}
_, err = r.getOrCreateClusterResourceSnapshot(ctx, crp, &resourceSnapshotSpec)
_, err = r.getOrCreateClusterResourceSnapshot(ctx, crp, &resourceSnapshotSpec, int(revisionLimit))
if err != nil {
return ctrl.Result{}, err
}
Expand All @@ -107,11 +125,13 @@ func (r *Reconciler) handleUpdate(ctx context.Context, crp *fleetv1beta1.Cluster
return ctrl.Result{}, nil
}

func (r *Reconciler) getOrCreateClusterSchedulingPolicySnapshot(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement) (*fleetv1beta1.ClusterSchedulingPolicySnapshot, error) {
func (r *Reconciler) getOrCreateClusterSchedulingPolicySnapshot(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement, revisionHistoryLimit int) (*fleetv1beta1.ClusterSchedulingPolicySnapshot, error) {
crpKObj := klog.KObj(crp)
schedulingPolicy := *crp.Spec.Policy // will exclude the numberOfClusters
schedulingPolicy.NumberOfClusters = nil
policyHash, err := generatePolicyHash(&schedulingPolicy)
schedulingPolicy := crp.Spec.Policy.DeepCopy()
if schedulingPolicy != nil {
schedulingPolicy.NumberOfClusters = nil // will exclude the numberOfClusters
}
policyHash, err := generatePolicyHash(schedulingPolicy)
if err != nil {
klog.ErrorS(err, "Failed to generate policy hash of crp", "clusterResourcePlacement", crpKObj)
return nil, controller.NewUnexpectedBehaviorError(err)
Expand Down Expand Up @@ -145,7 +165,7 @@ func (r *Reconciler) getOrCreateClusterSchedulingPolicySnapshot(ctx context.Cont

// delete redundant snapshot revisions before creating a new snapshot to guarantee that the number of snapshots
// won't exceed the limit.
if err := r.deleteRedundantSchedulingPolicySnapshots(ctx, crp); err != nil {
if err := r.deleteRedundantSchedulingPolicySnapshots(ctx, crp, revisionHistoryLimit); err != nil {
return nil, err
}

Expand All @@ -161,7 +181,7 @@ func (r *Reconciler) getOrCreateClusterSchedulingPolicySnapshot(ctx context.Cont
},
},
Spec: fleetv1beta1.SchedulingPolicySnapshotSpec{
Policy: &schedulingPolicy,
Policy: schedulingPolicy,
PolicyHash: []byte(policyHash),
},
}
Expand All @@ -187,38 +207,79 @@ func (r *Reconciler) getOrCreateClusterSchedulingPolicySnapshot(ctx context.Cont
return latestPolicySnapshot, nil
}

func (r *Reconciler) deleteRedundantSchedulingPolicySnapshots(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement) error {
func (r *Reconciler) deleteRedundantSchedulingPolicySnapshots(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement, revisionHistoryLimit int) error {
sortedList, err := r.listSortedClusterSchedulingPolicySnapshots(ctx, crp)
if err != nil {
return err
}
if len(sortedList.Items) < revisionHistoryLimit {
return nil
}

crpKObj := klog.KObj(crp)
// respect the revisionHistoryLimit field
revisionLimit := fleetv1beta1.RevisionHistoryLimitDefaultValue
if crp.Spec.RevisionHistoryLimit != nil {
revisionLimit = *crp.Spec.RevisionHistoryLimit
if revisionLimit <= 0 {
err := fmt.Errorf("invalid clusterResourcePlacement %s: invalid revisionHistoryLimit %d", crpKObj, revisionLimit)
klog.ErrorS(controller.NewExpectedBehaviorError(err), "Invalid revisionHistoryLimit value and using default value instead", "clusterResourcePlacement", crpKObj)
// use the default value instead
revisionLimit = fleetv1beta1.RevisionHistoryLimitDefaultValue
if len(sortedList.Items)-revisionHistoryLimit > 0 {
michaelawyu marked this conversation as resolved.
Show resolved Hide resolved
// We always delete before creating a new snapshot, the snapshot size should never exceed the limit as there is
// no finalizer added and object should be deleted immediately.
klog.Warningf("The number of clusterSchedulingPolicySnapshots exceeds the revisionHistoryLimit and it should never happen", "clusterResourcePlacement", klog.KObj(crp), "numberOfSnapshots", len(sortedList.Items), "revisionHistoryLimit", revisionHistoryLimit)
}

for i := 0; i <= len(sortedList.Items)-revisionHistoryLimit; i++ { // need to reserve one slot for the new snapshot
if err := r.Client.Delete(ctx, &sortedList.Items[i]); err != nil && !errors.IsNotFound(err) {
klog.ErrorS(err, "Failed to delete clusterSchedulingPolicySnapshot", "clusterResourcePlacement", klog.KObj(crp), "clusterSchedulingPolicySnapshot", klog.KObj(&sortedList.Items[i]))
return controller.NewAPIServerError(false, err)
}
}
if len(sortedList.Items) < int(revisionLimit) {
return nil
}

// deleteRedundantResourceSnapshots handles multiple snapshots in a group.
func (r *Reconciler) deleteRedundantResourceSnapshots(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement, revisionHistoryLimit int) error {
sortedList, err := r.listSortedResourceSnapshots(ctx, crp)
if err != nil {
return err
}

if len(sortedList.Items) < revisionHistoryLimit {
// If the number of existing snapshots is less than the limit no matter how many snapshots in a group, we don't
// need to delete any snapshots.
// Skip the checking and deleting.
return nil
}
for i := 0; i <= len(sortedList.Items)-int(revisionLimit); i++ {

crpKObj := klog.KObj(crp)
lastGroupIndex := -1
groupCounter := 0

// delete the snapshots from the end as there are could be multiple snapshots in a group in order to keep the latest
// snapshots from the end.
for i := len(sortedList.Items) - 1; i >= 0; i-- {
zhiying-lin marked this conversation as resolved.
Show resolved Hide resolved
snapshotKObj := klog.KObj(&sortedList.Items[i])
ii, err := parseResourceIndexFromLabel(&sortedList.Items[i])
if err != nil {
klog.ErrorS(err, "Failed to parse the resource index label", "clusterResourcePlacement", crpKObj, "clusterResourceSnapshot", snapshotKObj)
return controller.NewUnexpectedBehaviorError(err)
}
if ii != lastGroupIndex {
groupCounter++
lastGroupIndex = ii
}
if groupCounter < revisionHistoryLimit { // need to reserve one slot for the new snapshot
continue
}
if err := r.Client.Delete(ctx, &sortedList.Items[i]); err != nil && !errors.IsNotFound(err) {
zhiying-lin marked this conversation as resolved.
Show resolved Hide resolved
klog.ErrorS(err, "Failed to delete clusterSchedulingPolicySnapshot", "clusterResourcePlacement", crpKObj, "clusterSchedulingPolicySnapshot", klog.KObj(&sortedList.Items[i]))
klog.ErrorS(err, "Failed to delete clusterResourceSnapshot", "clusterResourcePlacement", crpKObj, "clusterResourceSnapshot", snapshotKObj)
return controller.NewAPIServerError(false, err)
}
}
if groupCounter-revisionHistoryLimit > 0 {
// We always delete before creating a new snapshot, the snapshot group size should never exceed the limit
// as there is no finalizer added and the object should be deleted immediately.
klog.Warningf("The number of clusterResourceSnapshot groups exceeds the revisionHistoryLimit and it should never happen", "clusterResourcePlacement", klog.KObj(crp), "numberOfSnapshotGroup", groupCounter, "revisionHistoryLimit", revisionHistoryLimit)
zhiying-lin marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
}

// TODO handle all the resources selected by placement larger than 1MB size limit of k8s objects.
func (r *Reconciler) getOrCreateClusterResourceSnapshot(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement, resourceSnapshotSpec *fleetv1beta1.ResourceSnapshotSpec) (*fleetv1beta1.ClusterResourceSnapshot, error) {
func (r *Reconciler) getOrCreateClusterResourceSnapshot(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement, resourceSnapshotSpec *fleetv1beta1.ResourceSnapshotSpec, revisionHistoryLimit int) (*fleetv1beta1.ClusterResourceSnapshot, error) {
resourceHash, err := generateResourceHash(resourceSnapshotSpec)
if err != nil {
klog.ErrorS(err, "Failed to generate resource hash of crp", "clusterResourcePlacement", klog.KObj(crp))
Expand Down Expand Up @@ -259,6 +320,11 @@ func (r *Reconciler) getOrCreateClusterResourceSnapshot(ctx context.Context, crp
return nil, controller.NewAPIServerError(false, err)
}
}
// delete redundant snapshot revisions before creating a new snapshot to guarantee that the number of snapshots
// won't exceed the limit.
if err := r.deleteRedundantResourceSnapshots(ctx, crp, revisionHistoryLimit); err != nil {
return nil, err
}

// create a new resource snapshot
latestResourceSnapshotIndex++
Expand All @@ -272,6 +338,8 @@ func (r *Reconciler) getOrCreateClusterResourceSnapshot(ctx context.Context, crp
},
Annotations: map[string]string{
fleetv1beta1.ResourceGroupHashAnnotation: resourceHash,
// TODO need to updated once we support multiple snapshots
zhiying-lin marked this conversation as resolved.
Show resolved Hide resolved
fleetv1beta1.NumberOfResourceSnapshotsAnnotation: "1",
},
},
Spec: *resourceSnapshotSpec,
Expand Down Expand Up @@ -307,7 +375,7 @@ func (r *Reconciler) ensureLatestPolicySnapshot(ctx context.Context, crp *fleetv
if crp.Spec.Policy != nil &&
crp.Spec.Policy.PlacementType == fleetv1beta1.PickNPlacementType &&
crp.Spec.Policy.NumberOfClusters != nil {
oldCount, err := parseNumberOfClustersFromAnnotation(latest)
oldCount, err := utils.ExtractNumOfClustersFromPolicySnapshot(latest)
if err != nil {
klog.ErrorS(err, "Failed to parse the numberOfClusterAnnotation", "clusterSchedulingPolicySnapshot", klog.KObj(latest))
return controller.NewUnexpectedBehaviorError(err)
Expand Down Expand Up @@ -463,29 +531,85 @@ func (r *Reconciler) lookupLatestResourceSnapshot(ctx context.Context, crp *flee
klog.ErrorS(err, "Invalid clusterResourceSnapshots", "clusterResourcePlacement", crpKObj)
return nil, -1, controller.NewUnexpectedBehaviorError(err)
}
// When there are no active snapshots, find the one who has the largest resource index.
if err := r.Client.List(ctx, snapshotList, client.MatchingLabels{fleetv1beta1.CRPTrackingLabel: crp.Name}); err != nil {
klog.ErrorS(err, "Failed to list all clusterResourceSnapshots", "clusterResourcePlacement", crpKObj)
return nil, -1, controller.NewAPIServerError(false, err)
// When there are no active snapshots, find the first snapshot who has the largest resource index.
// It should be rare only when CRP is crashed before creating the new active snapshot.
sortedList, err := r.listSortedResourceSnapshots(ctx, crp)
if err != nil {
return nil, -1, err
}
if len(snapshotList.Items) == 0 {
if len(sortedList.Items) == 0 {
// The resource index of the first snapshot will start from 0.
return nil, -1, nil
}
index := -1 // the index of the cluster resource snapshot array
lastResourceIndex := -1 // the assigned resource index of the cluster resource snapshot
for i := range snapshotList.Items {
resourceIndex, err := parseResourceIndexFromLabel(&snapshotList.Items[i])
latestSnapshot := &sortedList.Items[len(sortedList.Items)-1]
resourceIndex, err := parseResourceIndexFromLabel(latestSnapshot)
if err != nil {
klog.ErrorS(err, "Failed to parse the resource index label", "clusterResourcePlacement", crpKObj, "clusterResourceSnapshot", klog.KObj(latestSnapshot))
return nil, -1, controller.NewUnexpectedBehaviorError(err)
}
return latestSnapshot, resourceIndex, nil
}

// listSortedResourceSnapshots returns the resource snapshots sorted by its index and its subindex.
// The resourceSnapshot is less than the other one when resourceIndex is less.
// When the resourceIndex is equal, then order by the subindex.
// Note: the snapshot does not have subindex is the largest of a group and there should be only one in a group.
func (r *Reconciler) listSortedResourceSnapshots(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement) (*fleetv1beta1.ClusterResourceSnapshotList, error) {
snapshotList := &fleetv1beta1.ClusterResourceSnapshotList{}
crpKObj := klog.KObj(crp)
if err := r.Client.List(ctx, snapshotList, client.MatchingLabels{fleetv1beta1.CRPTrackingLabel: crp.Name}); err != nil {
klog.ErrorS(err, "Failed to list all clusterResourceSnapshots", "clusterResourcePlacement", crpKObj)
return nil, controller.NewAPIServerError(false, err)
}
var errs []error
sort.Slice(snapshotList.Items, func(i, j int) bool {
iKObj := klog.KObj(&snapshotList.Items[i])
jKObj := klog.KObj(&snapshotList.Items[j])
ii, err := parseResourceIndexFromLabel(&snapshotList.Items[i])
if err != nil {
klog.ErrorS(err, "Failed to parse the resource index label", "clusterResourceSnapshot", klog.KObj(&snapshotList.Items[i]))
return nil, -1, controller.NewUnexpectedBehaviorError(err)
klog.ErrorS(err, "Failed to parse the resource index label", "clusterResourcePlacement", crpKObj, "clusterResourceSnapshot", iKObj)
errs = append(errs, err)
}
ji, err := parseResourceIndexFromLabel(&snapshotList.Items[j])
if err != nil {
klog.ErrorS(err, "Failed to parse the resource index label", "clusterResourcePlacement", crpKObj, "clusterResourceSnapshot", jKObj)
errs = append(errs, err)
}
if lastResourceIndex < resourceIndex {
index = i
lastResourceIndex = resourceIndex
if ii != ji {
return ii < ji
}

iDoesExist, iSubindex, err := utils.ExtractSubindexFromClusterResourceSnapshot(&snapshotList.Items[i])
if err != nil {
klog.ErrorS(err, "Failed to parse the subindex index", "clusterResourcePlacement", crpKObj, "clusterResourceSnapshot", iKObj)
errs = append(errs, err)
}
jDoesExist, jSubindex, err := utils.ExtractSubindexFromClusterResourceSnapshot(&snapshotList.Items[j])
if err != nil {
klog.ErrorS(err, "Failed to parse the subindex index", "clusterResourcePlacement", crpKObj, "clusterResourceSnapshot", jKObj)
errs = append(errs, err)
}

// Both of the snapshots do not have subindex, which should not happen.
if !iDoesExist && !jDoesExist {
klog.ErrorS(err, "There are more than one resource snapshot which do not have subindex in a group", "clusterResourcePlacement", crpKObj, "clusterResourceSnapshot", iKObj, "clusterResourceSnapshot", jKObj)
errs = append(errs, err)
}

if !iDoesExist { // check if it's the first snapshot
zhiying-lin marked this conversation as resolved.
Show resolved Hide resolved
return false
}
if !jDoesExist { // check if it's the first snapshot
return true
}
return iSubindex < jSubindex
})

if len(errs) > 0 {
return nil, controller.NewUnexpectedBehaviorError(utilerrors.NewAggregate(errs))
}
return &snapshotList.Items[index], lastResourceIndex, nil

return snapshotList, nil
}

// parsePolicyIndexFromLabel returns error when parsing the label which should never return error in production.
Expand All @@ -508,16 +632,6 @@ func parseResourceIndexFromLabel(s *fleetv1beta1.ClusterResourceSnapshot) (int,
return v, nil
}

// parseNumberOfClustersFromAnnotation returns error when parsing the annotation which should never return error in production.
func parseNumberOfClustersFromAnnotation(s *fleetv1beta1.ClusterSchedulingPolicySnapshot) (int, error) {
n := s.Annotations[fleetv1beta1.NumberOfClustersAnnotation]
v, err := strconv.Atoi(n)
if err != nil || v < 0 {
return -1, fmt.Errorf("invalid numberOfCluster %q, error: %w", n, err)
}
return v, nil
}

func generatePolicyHash(policy *fleetv1beta1.PlacementPolicy) (string, error) {
jsonBytes, err := json.Marshal(policy)
if err != nil {
Expand Down
Loading