Skip to content

Commit

Permalink
Review feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Dr. Stefan Schimanski <[email protected]>

Kubernetes-commit: 68226b0501996fc86c9c2bddb7d61e6a64c91304
  • Loading branch information
sttts authored and k8s-publishing-bot committed Jul 23, 2024
1 parent 2099375 commit 1f27757
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 27 deletions.
48 changes: 26 additions & 22 deletions tools/leaderelection/leasecandidate.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package leaderelection

import (
"context"
"reflect"
"time"

v1 "k8s.io/api/coordination/v1"
Expand All @@ -37,11 +38,15 @@ import (

const requeueInterval = 5 * time.Minute

type CacheSyncWaiter interface {
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
}

type LeaseCandidate struct {
LeaseClient coordinationv1alpha1client.LeaseCandidateInterface
LeaseCandidateInformer cache.SharedIndexInformer
InformerFactory informers.SharedInformerFactory
HasSynced cache.InformerSynced
leaseClient coordinationv1alpha1client.LeaseCandidateInterface
leaseCandidateInformer cache.SharedIndexInformer
informerFactory informers.SharedInformerFactory
hasSynced cache.InformerSynced

// At most there will be one item in this Queue (since we only watch one item)
queue workqueue.TypedRateLimitingInterface[int]
Expand All @@ -52,7 +57,7 @@ type LeaseCandidate struct {
// controller lease
leaseName string

Clock clock.Clock
clock clock.Clock

binaryVersion, emulationVersion string
preferredStrategies []v1.CoordinatedLeaseStrategy
Expand All @@ -62,10 +67,9 @@ func NewCandidate(clientset kubernetes.Interface,
candidateName string,
candidateNamespace string,
targetLease string,
clock clock.Clock,
binaryVersion, emulationVersion string,
preferredStrategies []v1.CoordinatedLeaseStrategy,
) (*LeaseCandidate, error) {
) (*LeaseCandidate, CacheSyncWaiter, error) {
fieldSelector := fields.OneTermEqualSelector("metadata.name", candidateName).String()
// A separate informer factory is required because this must start before informerFactories
// are started for leader elected components
Expand All @@ -78,20 +82,20 @@ func NewCandidate(clientset kubernetes.Interface,
leaseCandidateInformer := informerFactory.Coordination().V1alpha1().LeaseCandidates().Informer()

lc := &LeaseCandidate{
LeaseClient: clientset.CoordinationV1alpha1().LeaseCandidates(candidateNamespace),
LeaseCandidateInformer: leaseCandidateInformer,
InformerFactory: informerFactory,
leaseClient: clientset.CoordinationV1alpha1().LeaseCandidates(candidateNamespace),
leaseCandidateInformer: leaseCandidateInformer,
informerFactory: informerFactory,
name: candidateName,
namespace: candidateNamespace,
leaseName: targetLease,
Clock: clock,
clock: clock.RealClock{},
binaryVersion: binaryVersion,
emulationVersion: emulationVersion,
preferredStrategies: preferredStrategies,
}
lc.queue = workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[int](), workqueue.TypedRateLimitingQueueConfig[int]{Name: "leasecandidate"})

synced, err := leaseCandidateInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
h, err := leaseCandidateInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, newObj interface{}) {
if leasecandidate, ok := newObj.(*v1alpha1.LeaseCandidate); ok {
if leasecandidate.Spec.PingTime != nil {
Expand All @@ -101,18 +105,18 @@ func NewCandidate(clientset kubernetes.Interface,
},
})
if err != nil {
return nil, err
return nil, nil, err
}
lc.HasSynced = synced.HasSynced
lc.hasSynced = h.HasSynced

return lc, nil
return lc, informerFactory, nil
}

func (c *LeaseCandidate) Run(ctx context.Context) {
defer c.queue.ShutDown()

go c.InformerFactory.Start(ctx.Done())
if !cache.WaitForNamedCacheSync("leasecandidateclient", ctx.Done(), c.HasSynced) {
go c.informerFactory.Start(ctx.Done())
if !cache.WaitForNamedCacheSync("leasecandidateclient", ctx.Done(), c.hasSynced) {
return
}

Expand Down Expand Up @@ -153,12 +157,12 @@ func (c *LeaseCandidate) enqueueLease() {
// ensureLease creates the lease if it does not exist and renew it if it exists. Returns the lease and
// a bool (true if this call created the lease), or any error that occurs.
func (c *LeaseCandidate) ensureLease(ctx context.Context) error {
lease, err := c.LeaseClient.Get(ctx, c.name, metav1.GetOptions{})
lease, err := c.leaseClient.Get(ctx, c.name, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
klog.V(2).Infof("Creating lease candidate")
// lease does not exist, create it.
leaseToCreate := c.newLease()
_, err := c.LeaseClient.Create(ctx, leaseToCreate, metav1.CreateOptions{})
_, err := c.leaseClient.Create(ctx, leaseToCreate, metav1.CreateOptions{})
if err != nil {
return err
}
Expand All @@ -169,9 +173,9 @@ func (c *LeaseCandidate) ensureLease(ctx context.Context) error {
}
klog.V(2).Infof("lease candidate exists.. renewing")
clone := lease.DeepCopy()
clone.Spec.RenewTime = &metav1.MicroTime{Time: c.Clock.Now()}
clone.Spec.RenewTime = &metav1.MicroTime{Time: c.clock.Now()}
clone.Spec.PingTime = nil
_, err = c.LeaseClient.Update(ctx, clone, metav1.UpdateOptions{})
_, err = c.leaseClient.Update(ctx, clone, metav1.UpdateOptions{})
if err != nil {
return err
}
Expand All @@ -191,6 +195,6 @@ func (c *LeaseCandidate) newLease() *v1alpha1.LeaseCandidate {
PreferredStrategies: c.preferredStrategies,
},
}
lease.Spec.RenewTime = &metav1.MicroTime{Time: c.Clock.Now()}
lease.Spec.RenewTime = &metav1.MicroTime{Time: c.clock.Now()}
return lease
}
7 changes: 2 additions & 5 deletions tools/leaderelection/leasecandidate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/utils/clock"
)

type testcase struct {
Expand All @@ -47,12 +46,11 @@ func TestLeaseCandidateCreation(t *testing.T) {
defer cancel()

client := fake.NewSimpleClientset()
candidate, err := NewCandidate(
candidate, _, err := NewCandidate(
client,
tc.candidateName,
tc.candidateNamespace,
tc.leaseName,
clock.RealClock{},
tc.binaryVersion,
tc.emulationVersion,
[]v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion},
Expand Down Expand Up @@ -82,12 +80,11 @@ func TestLeaseCandidateAck(t *testing.T) {

client := fake.NewSimpleClientset()

candidate, err := NewCandidate(
candidate, _, err := NewCandidate(
client,
tc.candidateName,
tc.candidateNamespace,
tc.leaseName,
clock.RealClock{},
tc.binaryVersion,
tc.emulationVersion,
[]v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion},
Expand Down

0 comments on commit 1f27757

Please sign in to comment.