Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Added redis-role selector for consistent access to 'master' or 'slave' #785

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
33 changes: 33 additions & 0 deletions controllers/rediscluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,39 @@
return ctrl.Result{RequeueAfter: time.Second * 10}, nil
}

if followerReplicas < instance.Status.ReadyFollowerReplicas {
reqLogger.Info("Redis cluster is downscaling...", "Ready.ReadyFollowerReplicas", instance.Status.ReadyFollowerReplicas, "Expected.ReadFollowerReplicas", followerReplicas)

Check warning on line 103 in controllers/rediscluster_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/rediscluster_controller.go#L103

Added line #L103 was not covered by tests

// loop count times to remove the latest leader/follower pod
count := instance.Status.ReadyLeaderReplicas - leaderReplicas
for i := int32(0); i < count; i++ {
reqLogger.Info("Redis cluster is downscaling", "The times of loop", i)

Check warning on line 108 in controllers/rediscluster_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/rediscluster_controller.go#L106-L108

Added lines #L106 - L108 were not covered by tests

// Imp if the last index of leader sts is not leader make it then
// check whether the redis is leader or not ?
// if not true then make it leader pod
if !(k8sutils.VerifyLeaderPod(ctx, r.K8sClient, r.Log, instance)) {

Check warning on line 113 in controllers/rediscluster_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/rediscluster_controller.go#L113

Added line #L113 was not covered by tests
// lastLeaderPod is slaving right now Make it the master Pod
// We have to bring a manual failover here to make it a leaderPod
// clusterFailover should also include the clusterReplicate since we have to map the followers to new leader
k8sutils.ClusterFailover(ctx, r.K8sClient, r.Log, instance)

Check warning on line 117 in controllers/rediscluster_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/rediscluster_controller.go#L117

Added line #L117 was not covered by tests
}
// Step 1 Remove the Follower Node
k8sutils.RemoveRedisFollowerNodesFromCluster(ctx, r.K8sClient, r.Log, instance)

Check warning on line 120 in controllers/rediscluster_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/rediscluster_controller.go#L120

Added line #L120 was not covered by tests
// Step 2 Reshard the Cluster
k8sutils.ReshardRedisCluster(r.K8sClient, r.Log, instance, true)

Check warning on line 122 in controllers/rediscluster_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/rediscluster_controller.go#L122

Added line #L122 was not covered by tests
}
reqLogger.Info("Redis cluster is downscaled... Rebalancing the cluster")

Check warning on line 124 in controllers/rediscluster_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/rediscluster_controller.go#L124

Added line #L124 was not covered by tests
// Step 3 Rebalance the cluster
k8sutils.RebalanceRedisCluster(r.K8sClient, r.Log, instance)
reqLogger.Info("Redis cluster is downscaled... Rebalancing the cluster is done")
err = k8sutils.UpdateRedisClusterStatus(instance, status.RedisClusterReady, status.ReadyClusterReason, leaderReplicas, leaderReplicas, r.Dk8sClient)
if err != nil {
return ctrl.Result{}, err

Check warning on line 130 in controllers/rediscluster_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/rediscluster_controller.go#L126-L130

Added lines #L126 - L130 were not covered by tests
}
return ctrl.Result{RequeueAfter: time.Second * 60}, nil

Check warning on line 132 in controllers/rediscluster_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/rediscluster_controller.go#L132

Added line #L132 was not covered by tests
}

// Mark the cluster status as initializing if there are no leader or follower nodes
if (instance.Status.ReadyLeaderReplicas == 0 && instance.Status.ReadyFollowerReplicas == 0) ||
instance.Status.ReadyLeaderReplicas != leaderReplicas {
Expand Down
42 changes: 37 additions & 5 deletions controllers/redisreplication_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,26 @@

// Check that the Leader and Follower are ready in redis replication
if redisReplicationInfo.Status.ReadyReplicas != totalReplicas {
reqLogger.Info("Redis replication nodes are not ready yet", "Ready.Replicas", strconv.Itoa(int(redisReplicationInfo.Status.ReadyReplicas)), "Expected.Replicas", totalReplicas)
return ctrl.Result{RequeueAfter: time.Second * 60}, nil
var realMaster string
masterNodes := k8sutils.GetRedisNodesByRole(ctx, r.K8sClient, r.Log, instance, "master")
slaveNodes := k8sutils.GetRedisNodesByRole(ctx, r.K8sClient, r.Log, instance, "slave")

if redisReplicationInfo.Status.ReadyReplicas == 0 {
reqLogger.Info("Redis replication nodes are not ready yet", "Ready.Replicas", strconv.Itoa(int(redisReplicationInfo.Status.ReadyReplicas)), "Expected.Replicas", totalReplicas)
return ctrl.Result{RequeueAfter: time.Second * 60}, nil
}

reqLogger.Info("The number of Redis replication replicas is less than the desired status\n", "Ready.Replicas", strconv.Itoa(int(redisReplicationInfo.Status.ReadyReplicas)), "Expected.Replicas", totalReplicas)
if len(masterNodes) == int(leaderReplicas) && followerReplicas != 0 && len(slaveNodes) != 0 {
realMaster = k8sutils.GetRedisReplicationRealMaster(ctx, r.K8sClient, r.Log, instance, masterNodes)
if err = k8sutils.UpdateRoleLabelPod(ctx, r.K8sClient, r.Log, instance, "master", []string{realMaster}); err != nil {
return ctrl.Result{Requeue: true}, err

Check warning on line 89 in controllers/redisreplication_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/redisreplication_controller.go#L85-L89

Added lines #L85 - L89 were not covered by tests
}
if err = k8sutils.UpdateRoleLabelPod(ctx, r.K8sClient, r.Log, instance, "slave", slaveNodes); err != nil {
return ctrl.Result{RequeueAfter: time.Second * 1}, err

Check warning on line 92 in controllers/redisreplication_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/redisreplication_controller.go#L91-L92

Added lines #L91 - L92 were not covered by tests
}
}
return ctrl.Result{RequeueAfter: time.Second * 1}, nil

Check warning on line 95 in controllers/redisreplication_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/redisreplication_controller.go#L95

Added line #L95 was not covered by tests
}

var realMaster string
Expand All @@ -86,15 +104,29 @@
if len(slaveNodes) == 0 {
realMaster = masterNodes[0]
}
err := k8sutils.CreateMasterSlaveReplication(ctx, r.K8sClient, r.Log, instance, masterNodes, realMaster)
if err = k8sutils.UpdateRoleLabelPod(ctx, r.K8sClient, r.Log, instance, "master", []string{realMaster}); err != nil {
return ctrl.Result{Requeue: true}, err

Check warning on line 108 in controllers/redisreplication_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/redisreplication_controller.go#L107-L108

Added lines #L107 - L108 were not covered by tests
}
err = k8sutils.CreateMasterSlaveReplication(ctx, r.K8sClient, r.Log, instance, masterNodes, realMaster)

Check warning on line 110 in controllers/redisreplication_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/redisreplication_controller.go#L110

Added line #L110 was not covered by tests
if err != nil {
return ctrl.Result{RequeueAfter: time.Second * 60}, err
return ctrl.Result{Requeue: true}, err

Check warning on line 112 in controllers/redisreplication_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/redisreplication_controller.go#L112

Added line #L112 was not covered by tests
}
if err = k8sutils.UpdateRoleLabelPod(ctx, r.K8sClient, r.Log, instance, "slave", slaveNodes); err != nil {
return ctrl.Result{Requeue: true}, err

Check warning on line 115 in controllers/redisreplication_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/redisreplication_controller.go#L114-L115

Added lines #L114 - L115 were not covered by tests
}
}
realMaster = k8sutils.GetRedisReplicationRealMaster(ctx, r.K8sClient, r.Log, instance, masterNodes)
if err := r.UpdateRedisReplicationMaster(ctx, instance, realMaster); err != nil {
slaveNodes := k8sutils.GetRedisNodesByRole(ctx, r.K8sClient, r.Log, instance, "slave")
if err = r.UpdateRedisReplicationMaster(ctx, instance, realMaster); err != nil {
return ctrl.Result{}, err

Check warning on line 121 in controllers/redisreplication_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/redisreplication_controller.go#L119-L121

Added lines #L119 - L121 were not covered by tests
}
if err = k8sutils.UpdateRoleLabelPod(ctx, r.K8sClient, r.Log, instance, "master", []string{realMaster}); err != nil {

Check warning on line 123 in controllers/redisreplication_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/redisreplication_controller.go#L123

Added line #L123 was not covered by tests
return ctrl.Result{}, err
}
if err = k8sutils.UpdateRoleLabelPod(ctx, r.K8sClient, r.Log, instance, "slave", slaveNodes); err != nil {
return ctrl.Result{}, err

Check warning on line 127 in controllers/redisreplication_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/redisreplication_controller.go#L126-L127

Added lines #L126 - L127 were not covered by tests
}

reqLogger.Info("Will reconcile redis operator in again 10 seconds")
return ctrl.Result{RequeueAfter: time.Second * 10}, nil
}
Expand Down
30 changes: 30 additions & 0 deletions k8sutils/redis-replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
redisv1beta2 "github.com/OT-CONTAINER-KIT/redis-operator/api/v1beta2"
"github.com/OT-CONTAINER-KIT/redis-operator/pkg/util"
"github.com/go-logr/logr"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/utils/ptr"
Expand All @@ -31,6 +32,8 @@
}
objectMetaInfo := generateObjectMetaInformation(cr.ObjectMeta.Name, cr.Namespace, labels, annotations)
headlessObjectMetaInfo := generateObjectMetaInformation(cr.ObjectMeta.Name+"-headless", cr.Namespace, labels, annotations)
masterObjectMetaInfo := generateObjectMetaInformation(cr.ObjectMeta.Name+"-leader", cr.Namespace, labels, annotations)
slaveObjectMetaInfo := generateObjectMetaInformation(cr.ObjectMeta.Name+"-follower", cr.Namespace, labels, annotations)

Check warning on line 36 in k8sutils/redis-replication.go

View check run for this annotation

Codecov / codecov/patch

k8sutils/redis-replication.go#L35-L36

Added lines #L35 - L36 were not covered by tests
additionalObjectMetaInfo := generateObjectMetaInformation(cr.ObjectMeta.Name+"-additional", cr.Namespace, labels, generateServiceAnots(cr.ObjectMeta, additionalServiceAnnotations, epp))
err := CreateOrUpdateService(cr.Namespace, headlessObjectMetaInfo, redisReplicationAsOwner(cr), disableMetrics, true, "ClusterIP", redisPort, cl)
if err != nil {
Expand All @@ -51,6 +54,14 @@
logger.Error(err, "Cannot create additional service for Redis Replication")
return err
}
err = CreateOrUpdateService(cr.Namespace, masterObjectMetaInfo, redisReplicationAsOwner(cr), disableMetrics, false, additionalServiceType, redisPort, cl)
if err != nil {
logger.Error(err, "Cannot create additional service for Redis Replication")

Check warning on line 59 in k8sutils/redis-replication.go

View check run for this annotation

Codecov / codecov/patch

k8sutils/redis-replication.go#L57-L59

Added lines #L57 - L59 were not covered by tests
}
err = CreateOrUpdateService(cr.Namespace, slaveObjectMetaInfo, redisReplicationAsOwner(cr), disableMetrics, false, additionalServiceType, redisPort, cl)
if err != nil {
logger.Error(err, "Cannot create additional service for Redis Replication")

Check warning on line 63 in k8sutils/redis-replication.go

View check run for this annotation

Codecov / codecov/patch

k8sutils/redis-replication.go#L61-L63

Added lines #L61 - L63 were not covered by tests
}
return nil
}

Expand Down Expand Up @@ -218,3 +229,22 @@
}
return true
}

func UpdateRoleLabelPod(ctx context.Context, cl kubernetes.Interface, logger logr.Logger, cr *redisv1beta2.RedisReplication, role string, nodes []string) error {
for _, node := range nodes {
pod, err := cl.CoreV1().Pods(cr.Namespace).Get(context.TODO(), node, metav1.GetOptions{})
if err != nil {
logger.Error(err, "Cannot get redis replication pod")
return err

Check warning on line 238 in k8sutils/redis-replication.go

View check run for this annotation

Codecov / codecov/patch

k8sutils/redis-replication.go#L233-L238

Added lines #L233 - L238 were not covered by tests
}
// set Label redis-role
metav1.SetMetaDataLabel(&pod.ObjectMeta, "redis-role", role)

Check warning on line 241 in k8sutils/redis-replication.go

View check run for this annotation

Codecov / codecov/patch

k8sutils/redis-replication.go#L241

Added line #L241 was not covered by tests
// update Label
_, err = cl.CoreV1().Pods(cr.Namespace).Update(context.TODO(), pod, metav1.UpdateOptions{})
if err != nil {
logger.Error(err, "Cannot update redis replication pod")
return err

Check warning on line 246 in k8sutils/redis-replication.go

View check run for this annotation

Codecov / codecov/patch

k8sutils/redis-replication.go#L243-L246

Added lines #L243 - L246 were not covered by tests
}
}
return nil

Check warning on line 249 in k8sutils/redis-replication.go

View check run for this annotation

Codecov / codecov/patch

k8sutils/redis-replication.go#L249

Added line #L249 was not covered by tests
}
12 changes: 11 additions & 1 deletion k8sutils/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,23 @@
} else {
PortName = "redis-client"
}
selectorLabels := serviceMeta.GetLabels()
if serviceMeta.GetName() == "redis-replication-leader" {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the svc always name:"redis-replication-leader", We should create svc based on the specific instance name.

selectorLabels["redis-role"] = "master"

Check warning on line 41 in k8sutils/services.go

View check run for this annotation

Codecov / codecov/patch

k8sutils/services.go#L41

Added line #L41 was not covered by tests
}
if serviceMeta.GetName() == "redis-replication-follower" {
selectorLabels["redis-role"] = "slave"

Check warning on line 44 in k8sutils/services.go

View check run for this annotation

Codecov / codecov/patch

k8sutils/services.go#L44

Added line #L44 was not covered by tests
}
if serviceMeta.GetName() == "redis-replication-follower" {
selectorLabels["redis-role"] = "slave"

Check warning on line 47 in k8sutils/services.go

View check run for this annotation

Codecov / codecov/patch

k8sutils/services.go#L47

Added line #L47 was not covered by tests
}
service := &corev1.Service{
TypeMeta: generateMetaInformation("Service", "v1"),
ObjectMeta: serviceMeta,
Spec: corev1.ServiceSpec{
Type: generateServiceType(serviceType),
ClusterIP: "",
Selector: serviceMeta.GetLabels(),
Selector: selectorLabels,
Ports: []corev1.ServicePort{
{
Name: PortName,
Expand Down
84 changes: 84 additions & 0 deletions k8sutils/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,90 @@ func TestGenerateServiceDef(t *testing.T) {
},
},
},
{
name: "Test redis-replication-leader with ClusterIP service type and metrics enabled",
serviceMeta: metav1.ObjectMeta{
Name: "test-redis-replication-leader",
Labels: map[string]string{
"redis-role": "master",
},
},
enableMetrics: defaultExporterPortProvider,
headless: false,
serviceType: "ClusterIP",
port: redisPort,
expected: &corev1.Service{
TypeMeta: metav1.TypeMeta{
Kind: "Service",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "test-redis-replication-leader",
Labels: map[string]string{
"redis-role": "master",
},
OwnerReferences: []metav1.OwnerReference{
{},
},
},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Name: "redis-client",
Port: redisPort,
TargetPort: intstr.FromInt(int(redisPort)),
Protocol: corev1.ProtocolTCP,
},
*enableMetricsPort(redisExporterPort),
},
Selector: map[string]string{"redis-role": "master"},
ClusterIP: "",
Type: corev1.ServiceTypeClusterIP,
},
},
},
{
name: "Test redis-replication-follower with ClusterIP service type and metrics enabled",
serviceMeta: metav1.ObjectMeta{
Name: "test-redis-replication-follower",
Labels: map[string]string{
"redis-role": "slave",
},
},
enableMetrics: defaultExporterPortProvider,
headless: false,
serviceType: "ClusterIP",
port: redisPort,
expected: &corev1.Service{
TypeMeta: metav1.TypeMeta{
Kind: "Service",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "test-redis-replication-follower",
Labels: map[string]string{
"redis-role": "slave",
},
OwnerReferences: []metav1.OwnerReference{
{},
},
},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Name: "redis-client",
Port: redisPort,
TargetPort: intstr.FromInt(int(redisPort)),
Protocol: corev1.ProtocolTCP,
},
*enableMetricsPort(redisExporterPort),
},
Selector: map[string]string{"redis-role": "slave"},
ClusterIP: "",
Type: corev1.ServiceTypeClusterIP,
},
},
},
}

for _, tt := range tests {
Expand Down
Loading