Skip to content

Commit

Permalink
🐛 Prevent LeaderElector setup error from being swallowed (#2876)
Browse files Browse the repository at this point in the history
* fix leader election bug

* fix tests

* fix

* add function initLeaderElector

* add test case for Start function bug

* add comment for new leader elector code

* fix comment

* fix test
  • Loading branch information
Alexseij authored Aug 3, 2024
1 parent 7394f7b commit abb2d86
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 22 deletions.
51 changes: 29 additions & 22 deletions pkg/manager/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,16 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) {
// Initialize the internal context.
cm.internalCtx, cm.internalCancel = context.WithCancel(ctx)

// Leader elector must be created before defer that contains engageStopProcedure function
// https://github.com/kubernetes-sigs/controller-runtime/issues/2873
var leaderElector *leaderelection.LeaderElector
if cm.resourceLock != nil {
leaderElector, err = cm.initLeaderElector()
if err != nil {
return fmt.Errorf("failed during initialization leader election process: %w", err)
}
}

// This chan indicates that stop is complete, in other words all runnables have returned or timeout on stop request
stopComplete := make(chan struct{})
defer close(stopComplete)
Expand Down Expand Up @@ -433,19 +443,22 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) {
{
ctx, cancel := context.WithCancel(context.Background())
cm.leaderElectionCancel = cancel
go func() {
if cm.resourceLock != nil {
if err := cm.startLeaderElection(ctx); err != nil {
cm.errChan <- err
}
} else {
if leaderElector != nil {
// Start the leader elector process
go func() {
leaderElector.Run(ctx)
<-ctx.Done()
close(cm.leaderElectionStopped)
}()
} else {
go func() {
// Treat not having leader election enabled the same as being elected.
if err := cm.startLeaderElectionRunnables(); err != nil {
cm.errChan <- err
}
close(cm.elected)
}
}()
}()
}
}

ready = true
Expand Down Expand Up @@ -564,12 +577,8 @@ func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) e
return nil
}

func (cm *controllerManager) startLeaderElectionRunnables() error {
return cm.runnables.LeaderElection.Start(cm.internalCtx)
}

func (cm *controllerManager) startLeaderElection(ctx context.Context) (err error) {
l, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
func (cm *controllerManager) initLeaderElector() (*leaderelection.LeaderElector, error) {
leaderElector, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
Lock: cm.resourceLock,
LeaseDuration: cm.leaseDuration,
RenewDeadline: cm.renewDeadline,
Expand Down Expand Up @@ -599,16 +608,14 @@ func (cm *controllerManager) startLeaderElection(ctx context.Context) (err error
Name: cm.leaderElectionID,
})
if err != nil {
return err
return nil, err
}

// Start the leader elector process
go func() {
l.Run(ctx)
<-ctx.Done()
close(cm.leaderElectionStopped)
}()
return nil
return leaderElector, nil
}

func (cm *controllerManager) startLeaderElectionRunnables() error {
return cm.runnables.LeaderElection.Start(cm.internalCtx)
}

func (cm *controllerManager) Elected() <-chan struct{} {
Expand Down
17 changes: 17 additions & 0 deletions pkg/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1165,6 +1165,23 @@ var _ = Describe("manger.Manager", func() {
cm.onStoppedLeading = func() {}
},
)

It("should return an error if leader election param incorrect", func() {
renewDeadline := time.Second * 20
m, err := New(cfg, Options{
LeaderElection: true,
LeaderElectionID: "controller-runtime",
LeaderElectionNamespace: "default",
newResourceLock: fakeleaderelection.NewResourceLock,
RenewDeadline: &renewDeadline,
})
Expect(err).NotTo(HaveOccurred())
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
err = m.Start(ctx)
Expect(err).To(HaveOccurred())
Expect(errors.Is(err, context.DeadlineExceeded)).NotTo(BeTrue())
})
})

Context("should start serving metrics", func() {
Expand Down

0 comments on commit abb2d86

Please sign in to comment.