Skip to content

Commit

Permalink
controller: properly update lastAppliedSpec and track failed status
Browse files Browse the repository at this point in the history
Previously, operator update lastAppliedSpec only when object was successfully reconciled.
It made impossible to properly detect changes between prev and current object state. And diff was calculated incorrectly.
Since operator executes Update requests only on object diff, failed changes cannot be removed and rollback was not possible.

 Also, operator updated `failed` status to `expanding` on each reconcile loop. It made inefficient error exponential backoff.

 This commit applies `lastAppliedSpec` as soon as operator executes first reconcile loop. Which makes possible incorrect configuration rollback.

 It also keeps `failed` updateStatus during reconcile loops. Since it's unrecoverable by operator and must be fixed by user or Kubernetes cluster administrator.

Signed-off-by: f41gh7 <[email protected]>
  • Loading branch information
f41gh7 committed Dec 23, 2024
1 parent 2bee208 commit 471f183
Show file tree
Hide file tree
Showing 14 changed files with 71 additions and 24 deletions.
2 changes: 1 addition & 1 deletion api/operator/v1beta1/vmcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,7 @@ func (cr *VMCluster) SetUpdateStatusTo(ctx context.Context, r client.Client, sta
crStatus: &cr.Status,
maybeErr: maybeErr,
mutateCurrentBeforeCompare: func(vs *VMClusterStatus) {
vs.LegacyStatus = status
vs.LegacyStatus = vs.UpdateStatus
},
})
}
Expand Down
4 changes: 2 additions & 2 deletions api/operator/v1beta1/vmextra_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1331,6 +1331,7 @@ func updateObjectStatus[T client.Object, ST any](ctx context.Context, rclient cl
currentStatus := opts.crStatus
prevStatus := opts.crStatus.DeepCopy()
currMeta := currentStatus.GetStatusMetadata()
newUpdateStatus := opts.actualStatus
switch opts.actualStatus {
case UpdateStatusExpanding, UpdateStatusPaused:
case UpdateStatusFailed:
Expand All @@ -1344,7 +1345,6 @@ func updateObjectStatus[T client.Object, ST any](ctx context.Context, rclient cl
}

currMeta.ObservedGeneration = opts.cr.GetGeneration()

if opts.mutateCurrentBeforeCompare != nil {
opts.mutateCurrentBeforeCompare(opts.crStatus.(ST))
}
Expand All @@ -1353,7 +1353,7 @@ func updateObjectStatus[T client.Object, ST any](ctx context.Context, rclient cl
if equality.Semantic.DeepEqual(currentStatus, prevStatus) && currMeta.UpdateStatus == opts.actualStatus {
return nil
}
currMeta.UpdateStatus = opts.actualStatus
currMeta.UpdateStatus = newUpdateStatus

pr, err := buildStatusPatch(currentStatus)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion api/operator/v1beta1/vmsingle_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func (cr *VMSingle) SetUpdateStatusTo(ctx context.Context, r client.Client, stat
crStatus: &cr.Status,
maybeErr: maybeErr,
mutateCurrentBeforeCompare: func(vs *VMSingleStatus) {
vs.LegacyStatus = status
vs.LegacyStatus = vs.UpdateStatus
},
})
}
Expand Down
3 changes: 3 additions & 0 deletions docs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@ aliases:
## tip

* BUGFIX: [vmoperator](https://docs.victoriametrics.com/operator/): keep `resourceVersion` and other significant `metadata` fields during `update` objects requests. See [this issue](https://github.com/VictoriaMetrics/operator/issues/1200) for details.
* BUGFIX: [vmoperator](https://docs.victoriametrics.com/operator/): properly update `updateStatus: failed` field. It fixes excessive errors logging and amount of created Kubernetes `Events`.
* BUGFIX: [vmoperator](https://docs.victoriametrics.com/operator/): Properly rollback incorrect object configuration. Previously diff for objects could be calculate incorrectly and update request could be skipped.
* BUGFIX: [vmcluster](https://docs.victoriametrics.com/operator/resources/vmcluster/) and [vmsingle](https://docs.victoriametrics.com/operator/resources/vmsingle/): restore deprecated fields `status.clusterStatus` and `status.singleStatus` removed at `v0.51.0`. Those deprecated fields will be removed at upcoming `v0.52.0` version.


## [v0.51.0](https://github.com/VictoriaMetrics/operator/releases/tag/v0.51.0)

**Release date:** 19 Dec 2024
Expand Down
36 changes: 25 additions & 11 deletions internal/controller/operator/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ import (
"sync"
"time"

vmv1beta1 "github.com/VictoriaMetrics/operator/api/operator/v1beta1"
"github.com/VictoriaMetrics/operator/internal/config"
"github.com/VictoriaMetrics/operator/internal/controller/operator/factory/logger"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
corev1 "k8s.io/api/core/v1"
Expand All @@ -25,6 +22,11 @@ import (
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/metrics"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

vmv1beta1 "github.com/VictoriaMetrics/operator/api/operator/v1beta1"
"github.com/VictoriaMetrics/operator/internal/config"
"github.com/VictoriaMetrics/operator/internal/controller/operator/factory/logger"
operatorreconcile "github.com/VictoriaMetrics/operator/internal/controller/operator/factory/reconcile"
)

// BindFlags binds package flags to the given flagSet
Expand Down Expand Up @@ -270,6 +272,9 @@ func createGenericEventForObject(ctx context.Context, c client.Client, object cl
return nil
}

// TODO :@f41gh7 replace object with generic type
// it allows to use DeepClone method to prevent hidden object updates
// made by controller-runtime client
func reconcileAndTrackStatus(ctx context.Context, c client.Client, object objectWithStatusTrack, cb func() (ctrl.Result, error)) (result ctrl.Result, resultErr error) {
if object.Paused() {
if err := object.SetUpdateStatusTo(ctx, c, vmv1beta1.UpdateStatusPaused, nil); err != nil {
Expand All @@ -285,15 +290,27 @@ func reconcileAndTrackStatus(ctx context.Context, c client.Client, object object
}
var diffPatch client.Patch
if specChanged {
// TODO: @f41gh7 replace error prone patch
// with client.Status.Update()
// it should simplify code
// and reduce surface of possible errors
diffPatch, err = object.LastAppliedSpecAsPatch()
if err != nil {
resultErr = fmt.Errorf("cannot parse last applied spec for cluster: %w", err)
return
}

if err := object.SetUpdateStatusTo(ctx, c, vmv1beta1.UpdateStatusExpanding, nil); err != nil {
resultErr = fmt.Errorf("failed to update object status: %w", err)
return
}
// update lastAppliedSpec as soon as operator receives it
// it allows to properly build diff with previous object state
// and rollback bad configurations
if err := c.Patch(ctx, object, diffPatch); err != nil {
resultErr = fmt.Errorf("cannot update cluster with last applied spec: %w", err)
return
}
if err := createGenericEventForObject(ctx, c, object, "starting object update"); err != nil {
logger.WithContext(ctx).Error(err, " cannot create k8s api event")
}
Expand All @@ -307,25 +324,22 @@ func reconcileAndTrackStatus(ctx context.Context, c client.Client, object object
if apierrors.IsConflict(err) {
return
}
if updateErr := object.SetUpdateStatusTo(ctx, c, vmv1beta1.UpdateStatusFailed, err); updateErr != nil {
desiredStatus := vmv1beta1.UpdateStatusFailed
if operatorreconcile.IsErrorWaitTimeout(err) {
desiredStatus = vmv1beta1.UpdateStatusExpanding
}
if updateErr := object.SetUpdateStatusTo(ctx, c, desiredStatus, err); updateErr != nil {
resultErr = fmt.Errorf("failed to update object status: %q, origin err: %w", updateErr, err)
return
}

return result, err
}
if specChanged {
// use patch instead of update, only 1 field must be changed.
if err := c.Patch(ctx, object, diffPatch); err != nil {
resultErr = fmt.Errorf("cannot update cluster with last applied spec: %w", err)
return
}

if err := createGenericEventForObject(ctx, c, object, "reconcile of object finished successfully"); err != nil {
logger.WithContext(ctx).Error(err, " cannot create k8s api event")
}
logger.WithContext(ctx).Info("object was successfully reconciled")

}
if err := object.SetUpdateStatusTo(ctx, c, vmv1beta1.UpdateStatusOperational, nil); err != nil {
resultErr = fmt.Errorf("failed to update object status: %w", err)
Expand Down
10 changes: 8 additions & 2 deletions internal/controller/operator/factory/reconcile/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func Deployment(ctx context.Context, rclient client.Client, newDeploy, prevDeplo

// waitDeploymentReady waits until deployment's replicaSet rollouts and all new pods is ready
func waitDeploymentReady(ctx context.Context, rclient client.Client, dep *appsv1.Deployment, deadline time.Duration) error {
var isErrDealine bool
err := wait.PollUntilContextTimeout(ctx, time.Second, deadline, false, func(ctx context.Context) (done bool, err error) {
var actualDeploy appsv1.Deployment
if err := rclient.Get(ctx, types.NamespacedName{Namespace: dep.Namespace, Name: dep.Name}, &actualDeploy); err != nil {
Expand All @@ -94,7 +95,8 @@ func waitDeploymentReady(ctx context.Context, rclient client.Client, dep *appsv1
}
cond := getDeploymentCondition(actualDeploy.Status, appsv1.DeploymentProgressing)
if cond != nil && cond.Reason == "ProgressDeadlineExceeded" {
return false, fmt.Errorf("deployment %s/%s has exceeded its progress deadline", dep.Namespace, dep.Name)
isErrDealine = true
return true, fmt.Errorf("deployment %s/%s has exceeded its progress deadline", dep.Namespace, dep.Name)
}
if actualDeploy.Spec.Replicas != nil && actualDeploy.Status.UpdatedReplicas < *actualDeploy.Spec.Replicas {
// Waiting for deployment rollout to finish: part of new replicas have been updated...
Expand All @@ -111,7 +113,11 @@ func waitDeploymentReady(ctx context.Context, rclient client.Client, dep *appsv1
return true, nil
})
if err != nil {
return reportFirstNotReadyPodOnError(ctx, rclient, fmt.Errorf("cannot wait for deployment to become ready: %w", err), dep.Namespace, labels.SelectorFromSet(dep.Spec.Selector.MatchLabels), dep.Spec.MinReadySeconds)
podErr := reportFirstNotReadyPodOnError(ctx, rclient, fmt.Errorf("cannot wait for deployment to become ready: %w", err), dep.Namespace, labels.SelectorFromSet(dep.Spec.Selector.MatchLabels), dep.Spec.MinReadySeconds)
if isErrDealine {
return err
}
return &errWaitReady{origin: podErr}
}
return nil
}
Expand Down
24 changes: 24 additions & 0 deletions internal/controller/operator/factory/reconcile/reconcile.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package reconcile

import (
"errors"
"fmt"
"time"

"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -82,3 +84,25 @@ func cloneSignificantMetadata(newObj, currObj client.Object) {
newObj.SetUID(currObj.GetUID())
newObj.SetSelfLink(currObj.GetSelfLink())
}

// IsErrorWaitTimeout determines if the err is an error which indicates that timeout for app
// transition into Ready state reached and should be continued at the next reconcile loop
func IsErrorWaitTimeout(err error) bool {
var et *errWaitReady
return errors.As(err, &et)
}

type errWaitReady struct {
origin error
}

// Error implements errors.Error interface
func (err *errWaitReady) Error() string {
return fmt.Sprintf(": %q",
err.origin)
}

// Unwrap implements error.Unwrap interface
func (err *errWaitReady) Unwrap() error {
return err.origin
}
2 changes: 1 addition & 1 deletion internal/controller/operator/vlogs_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (r *VLogsReconciler) Reconcile(ctx context.Context, req ctrl.Request) (resu
}
r.Client.Scheme().Default(instance)

result, err = reconcileAndTrackStatus(ctx, r.Client, instance, func() (ctrl.Result, error) {
result, err = reconcileAndTrackStatus(ctx, r.Client, instance.DeepCopy(), func() (ctrl.Result, error) {

if err = vlogs.CreateOrUpdateVLogs(ctx, r, instance); err != nil {
return result, fmt.Errorf("failed create or update vlogs: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/operator/vmagent_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (r *VMAgentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (re
}
r.Client.Scheme().Default(instance)

result, err = reconcileAndTrackStatus(ctx, r.Client, instance, func() (ctrl.Result, error) {
result, err = reconcileAndTrackStatus(ctx, r.Client, instance.DeepCopy(), func() (ctrl.Result, error) {
if err = vmagent.CreateOrUpdateVMAgent(ctx, instance, r); err != nil {
return result, err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/operator/vmalert_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (r *VMAlertReconciler) Reconcile(ctx context.Context, req ctrl.Request) (re
}
r.Client.Scheme().Default(instance)

result, resultErr = reconcileAndTrackStatus(ctx, r.Client, instance, func() (ctrl.Result, error) {
result, resultErr = reconcileAndTrackStatus(ctx, r.Client, instance.DeepCopy(), func() (ctrl.Result, error) {
maps, err := vmalert.CreateOrUpdateRuleConfigMaps(ctx, instance, r)
if err != nil {
return result, err
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/operator/vmalertmanager_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (r *VMAlertmanagerReconciler) Reconcile(ctx context.Context, req ctrl.Reque
}
r.Client.Scheme().Default(instance)

result, err = reconcileAndTrackStatus(ctx, r.Client, instance, func() (ctrl.Result, error) {
result, err = reconcileAndTrackStatus(ctx, r.Client, instance.DeepCopy(), func() (ctrl.Result, error) {
if err := alertmanager.CreateAMConfig(ctx, instance, r.Client); err != nil {
return result, err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/operator/vmauth_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (r *VMAuthReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
}
r.Client.Scheme().Default(instance)

result, err = reconcileAndTrackStatus(ctx, r.Client, instance, func() (ctrl.Result, error) {
result, err = reconcileAndTrackStatus(ctx, r.Client, instance.DeepCopy(), func() (ctrl.Result, error) {
if err := vmauth.CreateOrUpdateVMAuth(ctx, instance, r); err != nil {
return result, fmt.Errorf("cannot create or update vmauth deploy: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/operator/vmcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (r *VMClusterReconciler) Reconcile(ctx context.Context, request ctrl.Reques
}
r.Client.Scheme().Default(instance)

result, err = reconcileAndTrackStatus(ctx, r.Client, instance, func() (ctrl.Result, error) {
result, err = reconcileAndTrackStatus(ctx, r.Client, instance.DeepCopy(), func() (ctrl.Result, error) {
err = vmcluster.CreateOrUpdateVMCluster(ctx, instance, r.Client)
if err != nil {
return result, fmt.Errorf("failed create or update vmcluster: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/operator/vmsingle_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (r *VMSingleReconciler) Reconcile(ctx context.Context, req ctrl.Request) (r
}
r.Client.Scheme().Default(instance)

result, err = reconcileAndTrackStatus(ctx, r.Client, instance, func() (ctrl.Result, error) {
result, err = reconcileAndTrackStatus(ctx, r.Client, instance.DeepCopy(), func() (ctrl.Result, error) {
if err = vmsingle.CreateOrUpdateVMSingle(ctx, instance, r); err != nil {
return result, fmt.Errorf("failed create or update single: %w", err)
}
Expand Down

0 comments on commit 471f183

Please sign in to comment.