Skip to content

Commit

Permalink
UPSTREAM: <carry>: add etcd3RetryingProberMonitor for retrying etcd U…
Browse files Browse the repository at this point in the history
…navailable errors for the etcd health checker client

UPSTREAM: <carry>: replace newETCD3ProberMonitor with etcd3RetryingProberMonitor
  • Loading branch information
p0lyn0mial authored and bertinatto committed Dec 11, 2024
1 parent f2fdd39 commit 9827eb6
Show file tree
Hide file tree
Showing 4 changed files with 197 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,13 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan
// retry in a loop in the background until we successfully create the client, storing the client or error encountered

lock := sync.RWMutex{}
var prober *etcd3ProberMonitor
var prober *etcd3RetryingProberMonitor
clientErr := fmt.Errorf("etcd client connection not yet established")

go wait.PollImmediateUntil(time.Second, func() (bool, error) {
lock.Lock()
defer lock.Unlock()
newProber, err := newETCD3ProberMonitor(c)
newProber, err := newRetryingETCD3ProberMonitor(c)
// Ensure that server is already not shutting down.
select {
case <-stopCh:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func CreateProber(c storagebackend.Config) (Prober, error) {
case storagebackend.StorageTypeETCD2:
return nil, fmt.Errorf("%s is no longer a supported storage backend", c.Type)
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
return newETCD3ProberMonitor(c)
return newRetryingETCD3ProberMonitor(c)
default:
return nil, fmt.Errorf("unknown storage type: %s", c.Type)
}
Expand All @@ -80,7 +80,7 @@ func CreateMonitor(c storagebackend.Config) (metrics.Monitor, error) {
case storagebackend.StorageTypeETCD2:
return nil, fmt.Errorf("%s is no longer a supported storage backend", c.Type)
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
return newETCD3ProberMonitor(c)
return newRetryingETCD3ProberMonitor(c)
default:
return nil, fmt.Errorf("unknown storage type: %s", c.Type)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package factory

import (
"context"

"k8s.io/apiserver/pkg/storage/etcd3/etcd3retry"
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
"k8s.io/apiserver/pkg/storage/storagebackend"
)

type proberMonitor interface {
Prober
metrics.Monitor
}

type etcd3RetryingProberMonitor struct {
delegate proberMonitor
}

func newRetryingETCD3ProberMonitor(c storagebackend.Config) (*etcd3RetryingProberMonitor, error) {
delegate, err := newETCD3ProberMonitor(c)
if err != nil {
return nil, err
}
return &etcd3RetryingProberMonitor{delegate: delegate}, nil
}

func (t *etcd3RetryingProberMonitor) Probe(ctx context.Context) error {
return etcd3retry.OnError(ctx, etcd3retry.DefaultRetry, etcd3retry.IsRetriableEtcdError, func() error {
return t.delegate.Probe(ctx)
})
}

func (t *etcd3RetryingProberMonitor) Monitor(ctx context.Context) (metrics.StorageMetrics, error) {
var ret metrics.StorageMetrics
err := etcd3retry.OnError(ctx, etcd3retry.DefaultRetry, etcd3retry.IsRetriableEtcdError, func() error {
var innerErr error
ret, innerErr = t.delegate.Monitor(ctx)
return innerErr
})
return ret, err
}

func (t *etcd3RetryingProberMonitor) Close() error {
return t.delegate.Close()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package factory

import (
"context"
"fmt"
"testing"

etcdrpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"

"k8s.io/apiserver/pkg/storage/etcd3/metrics"
)

func getRetryScenarios() []struct {
name string
retryFnError func() error
expectedRetries int
expectedFinalError error
} {
return []struct {
name string
retryFnError func() error
expectedRetries int
expectedFinalError error
}{
{
name: "retry ErrLeaderChanged",
retryFnError: func() error {
return etcdrpc.ErrLeaderChanged
},
expectedRetries: 5,
expectedFinalError: etcdrpc.ErrLeaderChanged,
},
{
name: "retry ErrLeaderChanged a few times",
retryFnError: func() func() error {
retryCounter := -1
return func() error {
retryCounter++
if retryCounter == 3 {
return nil
}
return etcdrpc.ErrLeaderChanged
}
}(),
expectedRetries: 3,
},
{
name: "no retries",
retryFnError: func() error {
return nil
},
},
{
name: "no retries for a random error",
retryFnError: func() error {
return fmt.Errorf("random error")
},
expectedFinalError: fmt.Errorf("random error"),
},
}
}

func TestEtcd3RetryingProber(t *testing.T) {
for _, scenario := range getRetryScenarios() {
t.Run(scenario.name, func(t *testing.T) {
ctx := context.TODO()
targetDelegate := &fakeEtcd3RetryingProberMonitor{
// we set it to -1 to indicate that the first
// execution is not a retry
actualRetries: -1,
probeFn: scenario.retryFnError,
}

target := &etcd3RetryingProberMonitor{delegate: targetDelegate}
err := target.Probe(ctx)

if targetDelegate.actualRetries != scenario.expectedRetries {
t.Errorf("Unexpected number of retries %v, expected %v", targetDelegate.actualRetries, scenario.expectedRetries)
}
if (err == nil && scenario.expectedFinalError != nil) || (err != nil && scenario.expectedFinalError == nil) {
t.Errorf("Expected error %v, got %v", scenario.expectedFinalError, err)
}
if err != nil && scenario.expectedFinalError != nil && err.Error() != scenario.expectedFinalError.Error() {
t.Errorf("Expected error %v, got %v", scenario.expectedFinalError, err)
}
})
}
}

func TestEtcd3RetryingMonitor(t *testing.T) {
for _, scenario := range getRetryScenarios() {
t.Run(scenario.name, func(t *testing.T) {
ctx := context.TODO()
expectedRetValue := int64(scenario.expectedRetries)
targetDelegate := &fakeEtcd3RetryingProberMonitor{
// we set it to -1 to indicate that the first
// execution is not a retry
actualRetries: -1,
monitorFn: func() func() (metrics.StorageMetrics, error) {
retryCounter := -1
return func() (metrics.StorageMetrics, error) {
retryCounter++
err := scenario.retryFnError()
ret := metrics.StorageMetrics{int64(retryCounter)}
return ret, err
}
}(),
}

target := &etcd3RetryingProberMonitor{delegate: targetDelegate}
actualRetValue, err := target.Monitor(ctx)

if targetDelegate.actualRetries != scenario.expectedRetries {
t.Errorf("Unexpected number of retries %v, expected %v", targetDelegate.actualRetries, scenario.expectedRetries)
}
if (err == nil && scenario.expectedFinalError != nil) || (err != nil && scenario.expectedFinalError == nil) {
t.Errorf("Expected error %v, got %v", scenario.expectedFinalError, err)
}
if err != nil && scenario.expectedFinalError != nil && err.Error() != scenario.expectedFinalError.Error() {
t.Errorf("Expected error %v, got %v", scenario.expectedFinalError, err)
}
if actualRetValue.Size != expectedRetValue {
t.Errorf("Unexpected value returned actual %v, expected %v", actualRetValue.Size, expectedRetValue)
}
})
}
}

type fakeEtcd3RetryingProberMonitor struct {
actualRetries int
probeFn func() error
monitorFn func() (metrics.StorageMetrics, error)
}

func (f *fakeEtcd3RetryingProberMonitor) Probe(_ context.Context) error {
f.actualRetries++
return f.probeFn()
}

func (f *fakeEtcd3RetryingProberMonitor) Monitor(_ context.Context) (metrics.StorageMetrics, error) {
f.actualRetries++
return f.monitorFn()
}

func (f *fakeEtcd3RetryingProberMonitor) Close() error {
panic("not implemented")
}

0 comments on commit 9827eb6

Please sign in to comment.