From d14a50597eeafaa541f46ca382d6d230f9d27ef7 Mon Sep 17 00:00:00 2001 From: Jordan Rodgers Date: Fri, 2 Aug 2024 10:16:20 -0700 Subject: [PATCH] fix: ping instead of checking pod readiness --- controllers/rediscluster_controller.go | 4 ++-- controllers/redisreplication_controller.go | 2 +- k8sutils/cluster-scaling.go | 26 ++++++++++++++++++++++ k8sutils/statefulset.go | 20 ++++++++++++----- 4 files changed, 44 insertions(+), 8 deletions(-) diff --git a/controllers/rediscluster_controller.go b/controllers/rediscluster_controller.go index 969d44291..747678512 100644 --- a/controllers/rediscluster_controller.go +++ b/controllers/rediscluster_controller.go @@ -121,7 +121,7 @@ func (r *RedisClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request return intctrlutil.RequeueWithError(err, reqLogger, "") } - if r.IsStatefulSetReady(ctx, instance.Namespace, instance.Name+"-leader") { + if r.IsStatefulSetReady(ctx, r.K8sClient, instance, instance.Name+"-leader", instance.Namespace) { // Mark the cluster status as initializing if there are no follower nodes if (instance.Status.ReadyLeaderReplicas == 0 && instance.Status.ReadyFollowerReplicas == 0) || instance.Status.ReadyFollowerReplicas != followerReplicas { @@ -147,7 +147,7 @@ func (r *RedisClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request } } - if !(r.IsStatefulSetReady(ctx, instance.Namespace, instance.Name+"-leader") && r.IsStatefulSetReady(ctx, instance.Namespace, instance.Name+"-follower")) { + if !(r.IsStatefulSetReady(ctx, r.K8sClient, instance, instance.Name+"-leader", instance.Namespace) && r.IsStatefulSetReady(ctx, r.K8sClient, instance, instance.Name+"-follower", instance.Namespace)) { return intctrlutil.Reconciled() } diff --git a/controllers/redisreplication_controller.go b/controllers/redisreplication_controller.go index 335d0606e..9cd8f16e1 100644 --- a/controllers/redisreplication_controller.go +++ b/controllers/redisreplication_controller.go @@ -57,7 +57,7 @@ func (r *RedisReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Req if err != nil { return intctrlutil.RequeueWithError(err, reqLogger, "") } - if !r.IsStatefulSetReady(ctx, instance.Namespace, instance.Name) { + if !r.IsStatefulSetReady(ctx, r.K8sClient, instance, instance.Name, instance.Namespace) { return intctrlutil.Reconciled() } diff --git a/k8sutils/cluster-scaling.go b/k8sutils/cluster-scaling.go index 4b6edec9e..46e4338b1 100644 --- a/k8sutils/cluster-scaling.go +++ b/k8sutils/cluster-scaling.go @@ -9,6 +9,7 @@ import ( redisv1beta2 "github.com/OT-CONTAINER-KIT/redis-operator/api/v1beta2" "github.com/go-logr/logr" redis "github.com/redis/go-redis/v9" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" ) @@ -106,6 +107,31 @@ func getRedisClusterSlots(ctx context.Context, redisClient *redis.Client, logger return strconv.Itoa(totalSlots) } +// pingRedisNode will ping the redis node to check if it is up and running +func pingRedisNode(ctx context.Context, client kubernetes.Interface, logger logr.Logger, cr runtime.Object, pod RedisDetails) bool { + var redisClient *redis.Client + + switch cr := cr.(type) { + case *redisv1beta2.RedisCluster: + redisClient = configureRedisClient(client, logger, cr, pod.PodName) + case *redisv1beta2.RedisReplication: + redisClient = configureRedisReplicationClient(client, logger, cr, pod.PodName) + default: + logger.Error(nil, "Unknown CR type") + return false + } + + defer redisClient.Close() + + pong, err := redisClient.Ping(ctx).Result() + if err != nil || pong != "PONG" { + logger.Error(err, "Failed to ping Redis server") + return false + } + + return true +} + // getRedisNodeID would return nodeID of a redis node by passing pod func getRedisNodeID(ctx context.Context, client kubernetes.Interface, logger logr.Logger, cr *redisv1beta2.RedisCluster, pod RedisDetails) string { redisClient := configureRedisClient(client, logger, cr, pod.PodName) diff --git a/k8sutils/statefulset.go b/k8sutils/statefulset.go index bc31d4975..d230c9c49 100644 --- a/k8sutils/statefulset.go +++ b/k8sutils/statefulset.go @@ -19,13 +19,14 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" "k8s.io/utils/env" "k8s.io/utils/ptr" ) type StatefulSet interface { - IsStatefulSetReady(ctx context.Context, namespace, name string) bool + IsStatefulSetReady(ctx context.Context, client kubernetes.Interface, cr runtime.Object, name, namespace string) bool } type StatefulSetService struct { @@ -41,7 +42,7 @@ func NewStatefulSetService(kubeClient kubernetes.Interface, log logr.Logger) *St } } -func (s *StatefulSetService) IsStatefulSetReady(ctx context.Context, namespace, name string) bool { +func (s *StatefulSetService) IsStatefulSetReady(ctx context.Context, client kubernetes.Interface, cr runtime.Object, namespace string, name string) bool { var ( partition = 0 replicas = 1 @@ -74,10 +75,19 @@ func (s *StatefulSetService) IsStatefulSetReady(ctx context.Context, namespace, logger.V(1).Info("StatefulSet is not ready", "Status.ObservedGeneration", sts.Status.ObservedGeneration, "ObjectMeta.Generation", sts.ObjectMeta.Generation) return false } - if int(sts.Status.ReadyReplicas) != replicas { - logger.V(1).Info("StatefulSet is not ready", "Status.ReadyReplicas", sts.Status.ReadyReplicas, "Replicas", replicas) - return false + + for i := 0; i < replicas; i++ { + pod := RedisDetails{ + PodName: fmt.Sprintf("%s-%d", name, i), + Namespace: namespace, + } + + if !pingRedisNode(ctx, client, logger, cr, pod) { + logger.V(1).Info("StatefulSet is not ready", "PingRedisNode", pod.PodName) + return false + } } + return true }