diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go index 9d52eacd23838..5245a8eeaf4dd 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go @@ -156,13 +156,13 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan // retry in a loop in the background until we successfully create the client, storing the client or error encountered lock := sync.RWMutex{} - var prober *etcd3ProberMonitor + var prober *etcd3RetryingProberMonitor clientErr := fmt.Errorf("etcd client connection not yet established") go wait.PollImmediateUntil(time.Second, func() (bool, error) { lock.Lock() defer lock.Unlock() - newProber, err := newETCD3ProberMonitor(c) + newProber, err := newRetryingETCD3ProberMonitor(c) // Ensure that server is already not shutting down. select { case <-stopCh: diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go index 2bf3727e8a77d..0967a84cbe83f 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go @@ -69,7 +69,7 @@ func CreateProber(c storagebackend.Config) (Prober, error) { case storagebackend.StorageTypeETCD2: return nil, fmt.Errorf("%s is no longer a supported storage backend", c.Type) case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3: - return newETCD3ProberMonitor(c) + return newRetryingETCD3ProberMonitor(c) default: return nil, fmt.Errorf("unknown storage type: %s", c.Type) } @@ -80,7 +80,7 @@ func CreateMonitor(c storagebackend.Config) (metrics.Monitor, error) { case storagebackend.StorageTypeETCD2: return nil, fmt.Errorf("%s is no longer a supported storage backend", c.Type) case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3: - return newETCD3ProberMonitor(c) + return newRetryingETCD3ProberMonitor(c) default: return nil, fmt.Errorf("unknown storage type: %s", c.Type) } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/retry_etcdprobemonitor.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/retry_etcdprobemonitor.go new file mode 100644 index 0000000000000..ab210464f50fa --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/retry_etcdprobemonitor.go @@ -0,0 +1,46 @@ +package factory + +import ( + "context" + + "k8s.io/apiserver/pkg/storage/etcd3/etcd3retry" + "k8s.io/apiserver/pkg/storage/etcd3/metrics" + "k8s.io/apiserver/pkg/storage/storagebackend" +) + +type proberMonitor interface { + Prober + metrics.Monitor +} + +type etcd3RetryingProberMonitor struct { + delegate proberMonitor +} + +func newRetryingETCD3ProberMonitor(c storagebackend.Config) (*etcd3RetryingProberMonitor, error) { + delegate, err := newETCD3ProberMonitor(c) + if err != nil { + return nil, err + } + return &etcd3RetryingProberMonitor{delegate: delegate}, nil +} + +func (t *etcd3RetryingProberMonitor) Probe(ctx context.Context) error { + return etcd3retry.OnError(ctx, etcd3retry.DefaultRetry, etcd3retry.IsRetriableEtcdError, func() error { + return t.delegate.Probe(ctx) + }) +} + +func (t *etcd3RetryingProberMonitor) Monitor(ctx context.Context) (metrics.StorageMetrics, error) { + var ret metrics.StorageMetrics + err := etcd3retry.OnError(ctx, etcd3retry.DefaultRetry, etcd3retry.IsRetriableEtcdError, func() error { + var innerErr error + ret, innerErr = t.delegate.Monitor(ctx) + return innerErr + }) + return ret, err +} + +func (t *etcd3RetryingProberMonitor) Close() error { + return t.delegate.Close() +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/retry_etcdprobemonitor_test.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/retry_etcdprobemonitor_test.go new file mode 100644 index 0000000000000..db6819ef5f684 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/retry_etcdprobemonitor_test.go @@ -0,0 +1,147 @@ +package factory + +import ( + "context" + "fmt" + "testing" + + etcdrpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" + + "k8s.io/apiserver/pkg/storage/etcd3/metrics" +) + +func getRetryScenarios() []struct { + name string + retryFnError func() error + expectedRetries int + expectedFinalError error +} { + return []struct { + name string + retryFnError func() error + expectedRetries int + expectedFinalError error + }{ + { + name: "retry ErrLeaderChanged", + retryFnError: func() error { + return etcdrpc.ErrLeaderChanged + }, + expectedRetries: 5, + expectedFinalError: etcdrpc.ErrLeaderChanged, + }, + { + name: "retry ErrLeaderChanged a few times", + retryFnError: func() func() error { + retryCounter := -1 + return func() error { + retryCounter++ + if retryCounter == 3 { + return nil + } + return etcdrpc.ErrLeaderChanged + } + }(), + expectedRetries: 3, + }, + { + name: "no retries", + retryFnError: func() error { + return nil + }, + }, + { + name: "no retries for a random error", + retryFnError: func() error { + return fmt.Errorf("random error") + }, + expectedFinalError: fmt.Errorf("random error"), + }, + } +} + +func TestEtcd3RetryingProber(t *testing.T) { + for _, scenario := range getRetryScenarios() { + t.Run(scenario.name, func(t *testing.T) { + ctx := context.TODO() + targetDelegate := &fakeEtcd3RetryingProberMonitor{ + // we set it to -1 to indicate that the first + // execution is not a retry + actualRetries: -1, + probeFn: scenario.retryFnError, + } + + target := &etcd3RetryingProberMonitor{delegate: targetDelegate} + err := target.Probe(ctx) + + if targetDelegate.actualRetries != scenario.expectedRetries { + t.Errorf("Unexpected number of retries %v, expected %v", targetDelegate.actualRetries, scenario.expectedRetries) + } + if (err == nil && scenario.expectedFinalError != nil) || (err != nil && scenario.expectedFinalError == nil) { + t.Errorf("Expected error %v, got %v", scenario.expectedFinalError, err) + } + if err != nil && scenario.expectedFinalError != nil && err.Error() != scenario.expectedFinalError.Error() { + t.Errorf("Expected error %v, got %v", scenario.expectedFinalError, err) + } + }) + } +} + +func TestEtcd3RetryingMonitor(t *testing.T) { + for _, scenario := range getRetryScenarios() { + t.Run(scenario.name, func(t *testing.T) { + ctx := context.TODO() + expectedRetValue := int64(scenario.expectedRetries) + targetDelegate := &fakeEtcd3RetryingProberMonitor{ + // we set it to -1 to indicate that the first + // execution is not a retry + actualRetries: -1, + monitorFn: func() func() (metrics.StorageMetrics, error) { + retryCounter := -1 + return func() (metrics.StorageMetrics, error) { + retryCounter++ + err := scenario.retryFnError() + ret := metrics.StorageMetrics{int64(retryCounter)} + return ret, err + } + }(), + } + + target := &etcd3RetryingProberMonitor{delegate: targetDelegate} + actualRetValue, err := target.Monitor(ctx) + + if targetDelegate.actualRetries != scenario.expectedRetries { + t.Errorf("Unexpected number of retries %v, expected %v", targetDelegate.actualRetries, scenario.expectedRetries) + } + if (err == nil && scenario.expectedFinalError != nil) || (err != nil && scenario.expectedFinalError == nil) { + t.Errorf("Expected error %v, got %v", scenario.expectedFinalError, err) + } + if err != nil && scenario.expectedFinalError != nil && err.Error() != scenario.expectedFinalError.Error() { + t.Errorf("Expected error %v, got %v", scenario.expectedFinalError, err) + } + if actualRetValue.Size != expectedRetValue { + t.Errorf("Unexpected value returned actual %v, expected %v", actualRetValue.Size, expectedRetValue) + } + }) + } +} + +type fakeEtcd3RetryingProberMonitor struct { + actualRetries int + probeFn func() error + monitorFn func() (metrics.StorageMetrics, error) +} + +func (f *fakeEtcd3RetryingProberMonitor) Probe(_ context.Context) error { + f.actualRetries++ + return f.probeFn() +} + +func (f *fakeEtcd3RetryingProberMonitor) Monitor(_ context.Context) (metrics.StorageMetrics, error) { + f.actualRetries++ + return f.monitorFn() +} + +func (f *fakeEtcd3RetryingProberMonitor) Close() error { + panic("not implemented") +}