Skip to content

Commit

Permalink
fix(trafficrouting): Do not block the switch of service selectors for…
Browse files Browse the repository at this point in the history
… single pod failures (#2441)

* fix(traficrouter): WIP on not setting weight if not available

Signed-off-by: zachaller <[email protected]>

* fix tests

Signed-off-by: zachaller <[email protected]>

* try bailing vs setting weight

Signed-off-by: zachaller <[email protected]>

* work with expirments that do not set any weights

Signed-off-by: zachaller <[email protected]>

* fix test by commenting out code

Signed-off-by: zachaller <[email protected]>

* lint

Signed-off-by: zachaller <[email protected]>

* simplify logic

Signed-off-by: zachaller <[email protected]>

* switch logic

Signed-off-by: zachaller <[email protected]>

* add more comments

Signed-off-by: zachaller <[email protected]>

* add more comments

Signed-off-by: zachaller <[email protected]>

* add more test

Signed-off-by: zachaller <[email protected]>

* refactor test

Signed-off-by: zachaller <[email protected]>

* refactor code to reduce duplication

Signed-off-by: zachaller <[email protected]>

* change comments a bit

Signed-off-by: zachaller <[email protected]>

* remove else

Signed-off-by: zachaller <[email protected]>

Signed-off-by: zachaller <[email protected]>
  • Loading branch information
zachaller committed Dec 13, 2022
1 parent 6b755ff commit 7da69cf
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 13 deletions.
42 changes: 31 additions & 11 deletions rollout/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,6 @@ import (
"context"
"fmt"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
patchtypes "k8s.io/apimachinery/pkg/types"
"k8s.io/utils/pointer"

"github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
"github.com/argoproj/argo-rollouts/rollout/trafficrouting"
"github.com/argoproj/argo-rollouts/utils/annotations"
Expand All @@ -21,6 +15,11 @@ import (
replicasetutil "github.com/argoproj/argo-rollouts/utils/replicaset"
rolloututils "github.com/argoproj/argo-rollouts/utils/rollout"
serviceutil "github.com/argoproj/argo-rollouts/utils/service"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
patchtypes "k8s.io/apimachinery/pkg/types"
"k8s.io/utils/pointer"
)

const (
Expand Down Expand Up @@ -266,7 +265,10 @@ func (c *rolloutContext) reconcileStableAndCanaryService() error {
}

// ensureSVCTargets updates the service with the given name to point to the given ReplicaSet,
// but only if that ReplicaSet has full availability.
// but only if that ReplicaSet has proper availability. There is still an edge case with this function if
// in the small window of time between a rollout being completed, and we try to update the service selector, we lose 100%
// of the pods availability. We will not switch service selector but still go and reconcile the traffic router, setting the
// stable weight to zero. This really only affects dynamic stable scale.
func (c *rolloutContext) ensureSVCTargets(svcName string, rs *appsv1.ReplicaSet, checkRsAvailability bool) error {
if rs == nil || svcName == "" {
return nil
Expand All @@ -277,13 +279,31 @@ func (c *rolloutContext) ensureSVCTargets(svcName string, rs *appsv1.ReplicaSet,
}
currSelector := svc.Spec.Selector[v1alpha1.DefaultRolloutUniqueLabelKey]
desiredSelector := rs.Labels[v1alpha1.DefaultRolloutUniqueLabelKey]
logCtx := c.log.WithField(logutil.ServiceKey, svc.Name)

if currSelector != desiredSelector {
// ensure ReplicaSet is fully available, otherwise we will point the service to nothing or an underprovisioned ReplicaSet
if checkRsAvailability && !replicasetutil.IsReplicaSetAvailable(rs) {
logCtx := c.log.WithField(logutil.ServiceKey, svc.Name)
logCtx.Infof("delaying service switch from %s to %s: ReplicaSet not fully available", currSelector, desiredSelector)
if _, ok := svc.Annotations[v1alpha1.ManagedByRolloutsKey]; !ok {
// This block will be entered only when adopting a service that already exists, because the current annotation
// will be empty at that point. When we are adopting a service, we want to make sure that the replicaset is fully
// available before we start routing traffic to it, so we do not overload it.
// See PR: https://github.com/argoproj/argo-rollouts/pull/1777

// ensure ReplicaSet is fully available, otherwise we will point the service to nothing or an underprovisioned ReplicaSet
if checkRsAvailability && !replicasetutil.IsReplicaSetAvailable(rs) {
logCtx.Infof("delaying service switch from %s to %s: ReplicaSet not fully available", currSelector, desiredSelector)
return nil
}
logCtx.Infof("adopting service %s", svc.Name)
}

// When we are at the end of a rollout we generally will have enough capacity to handle the traffic, so we do not
// need to check the full availability of the ReplicaSet. We do still want to make sure we have at least one pod
// available, so we do not point the service to nothing, but losing a pod or two should be tolerable to still switch service selectors.
if checkRsAvailability && !replicasetutil.IsReplicaSetPartiallyAvailable(rs) {
logCtx.Infof("delaying service switch from %s to %s: ReplicaSet has zero availability", currSelector, desiredSelector)
return nil
}

err = c.switchServiceSelector(svc, desiredSelector, c.rollout)
if err != nil {
return err
Expand Down
71 changes: 71 additions & 0 deletions rollout/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,22 @@ func TestDelayCanaryStableServiceLabelInjection(t *testing.T) {
_, stableInjected := stableSvc.Spec.Selector[v1alpha1.DefaultRolloutUniqueLabelKey]
assert.False(t, stableInjected)
}
{
// ensure we don't update service because new/stable are both partially available on an adoption of service reconcile
ctrl, _, _ := f.newController(noResyncPeriodFunc)
roCtx, err := ctrl.newRolloutContext(ro1)
assert.NoError(t, err)

roCtx.newRS = newReplicaSetWithStatus(ro1, 3, 1)
roCtx.stableRS = newReplicaSetWithStatus(ro2, 3, 1)

err = roCtx.reconcileStableAndCanaryService()
assert.NoError(t, err)
_, canaryInjected := canarySvc.Spec.Selector[v1alpha1.DefaultRolloutUniqueLabelKey]
assert.False(t, canaryInjected)
_, stableInjected := stableSvc.Spec.Selector[v1alpha1.DefaultRolloutUniqueLabelKey]
assert.False(t, stableInjected)
}
{
// next ensure we do update service because new/stable are now available
ctrl, _, _ := f.newController(noResyncPeriodFunc)
Expand All @@ -805,3 +821,58 @@ func TestDelayCanaryStableServiceLabelInjection(t *testing.T) {
}

}

// TestDelayCanaryStableServiceDelayOnAdoptedService verifies allow partial readiness of pods when switching labels
// on an adopted services, but that if there is zero readiness we will not switch
func TestDelayCanaryStableServiceDelayOnAdoptedService(t *testing.T) {
ro1 := newCanaryRollout("foo", 3, nil, nil, nil, intstr.FromInt(1), intstr.FromInt(1))
ro1.Spec.Strategy.Canary.CanaryService = "canary"
ro1.Spec.Strategy.Canary.StableService = "stable"
//Setup services that are already adopted by rollouts
stableSvc := newService("stable", 80, ro1.Spec.Selector.MatchLabels, ro1)
ro2 := bumpVersion(ro1)
canarySvc := newService("canary", 80, ro1.Spec.Selector.MatchLabels, ro2)

f := newFixture(t)
defer f.Close()
f.kubeobjects = append(f.kubeobjects, canarySvc, stableSvc)
f.serviceLister = append(f.serviceLister, canarySvc, stableSvc)

t.Run("AdoptedService No Availability", func(t *testing.T) {
// first ensure we don't update service because new/stable are both not available
ctrl, _, _ := f.newController(noResyncPeriodFunc)
roCtx, err := ctrl.newRolloutContext(ro1)
assert.NoError(t, err)

roCtx.newRS = newReplicaSetWithStatus(ro1, 3, 0)
roCtx.stableRS = newReplicaSetWithStatus(ro2, 3, 0)

err = roCtx.reconcileStableAndCanaryService()
assert.NoError(t, err)
canaryHash2, canaryInjected := canarySvc.Spec.Selector[v1alpha1.DefaultRolloutUniqueLabelKey]
assert.False(t, canaryInjected)
fmt.Println(canaryHash2)
stableHash2, stableInjected := stableSvc.Spec.Selector[v1alpha1.DefaultRolloutUniqueLabelKey]
assert.False(t, stableInjected)
fmt.Println(stableHash2)
})
t.Run("AdoptedService Partial Availability", func(t *testing.T) {
// ensure we do change selector on partially available replica sets
ctrl, _, _ := f.newController(noResyncPeriodFunc)
roCtx, err := ctrl.newRolloutContext(ro1)
assert.NoError(t, err)

roCtx.newRS = newReplicaSetWithStatus(ro1, 3, 1)
roCtx.stableRS = newReplicaSetWithStatus(ro2, 3, 2)

err = roCtx.reconcileStableAndCanaryService()
assert.NoError(t, err)
canaryHash2, canaryInjected := canarySvc.Spec.Selector[v1alpha1.DefaultRolloutUniqueLabelKey]
assert.True(t, canaryInjected)
fmt.Println(canaryHash2)
stableHash2, stableInjected := stableSvc.Spec.Selector[v1alpha1.DefaultRolloutUniqueLabelKey]
assert.True(t, stableInjected)
fmt.Println(stableHash2)
})

}
1 change: 1 addition & 0 deletions test/e2e/alb/rollout-alb-experiment-no-setweight.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ spec:
servicePort: 80
steps:
- experiment:
duration: 15s
templates:
- name: experiment-alb-canary
specRef: canary
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/aws_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (s *AWSSuite) TestALBExperimentStepNoSetWeight() {
When().
PromoteRollout().
WaitForRolloutStatus("Healthy").
Sleep(1 * time.Second). // stable is currently set first, and then changes made to VirtualServices/DestinationRules
Sleep(2 * time.Second). // stable is currently set first, and then changes made to VirtualServices/DestinationRules
Then().
Assert(assertWeights(s, "alb-rollout-canary", "alb-rollout-stable", 0, 100))
}
2 changes: 1 addition & 1 deletion test/e2e/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func (s *FunctionalSuite) TestRolloutPDBRestart() {
s.Given().
HealthyRollout(`
---
apiVersion: policy/v1beta1
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
name: rollout-pdb-restart
Expand Down
5 changes: 5 additions & 0 deletions utils/replicaset/replicaset.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,3 +646,8 @@ func IsReplicaSetAvailable(rs *appsv1.ReplicaSet) bool {
availableReplicas := rs.Status.AvailableReplicas
return replicas != nil && *replicas != 0 && availableReplicas != 0 && *replicas <= availableReplicas
}

// IsReplicaSetPartiallyAvailable returns if a ReplicaSet is scaled up and has at least 1 pod available
func IsReplicaSetPartiallyAvailable(rs *appsv1.ReplicaSet) bool {
return rs.Status.AvailableReplicas > 0
}
39 changes: 39 additions & 0 deletions utils/replicaset/replicaset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1336,3 +1336,42 @@ func TestIsReplicaSetAvailable(t *testing.T) {
assert.False(t, IsReplicaSetAvailable(&rs))
}
}

func TestIsReplicaSetPartiallyAvailable(t *testing.T) {
t.Run("No Availability", func(t *testing.T) {
rs := appsv1.ReplicaSet{
Spec: appsv1.ReplicaSetSpec{
Replicas: pointer.Int32Ptr(2),
},
Status: appsv1.ReplicaSetStatus{
ReadyReplicas: 0,
AvailableReplicas: 0,
},
}
assert.False(t, IsReplicaSetPartiallyAvailable(&rs))
})
t.Run("Partial Availability", func(t *testing.T) {
rs := appsv1.ReplicaSet{
Spec: appsv1.ReplicaSetSpec{
Replicas: pointer.Int32Ptr(2),
},
Status: appsv1.ReplicaSetStatus{
ReadyReplicas: 2,
AvailableReplicas: 1,
},
}
assert.True(t, IsReplicaSetPartiallyAvailable(&rs))
})
t.Run("Full Availability", func(t *testing.T) {
rs := appsv1.ReplicaSet{
Spec: appsv1.ReplicaSetSpec{
Replicas: pointer.Int32Ptr(2),
},
Status: appsv1.ReplicaSetStatus{
ReadyReplicas: 2,
AvailableReplicas: 2,
},
}
assert.True(t, IsReplicaSetPartiallyAvailable(&rs))
})
}

0 comments on commit 7da69cf

Please sign in to comment.