Skip to content

Commit

Permalink
add function initLeaderElector
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexseij committed Jul 6, 2024
1 parent 48659d9 commit 49d60ff
Showing 1 changed file with 38 additions and 29 deletions.
67 changes: 38 additions & 29 deletions pkg/manager/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,35 +353,7 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) {

var leaderElector *leaderelection.LeaderElector
if cm.resourceLock != nil {
leaderElector, err = leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
Lock: cm.resourceLock,
LeaseDuration: cm.leaseDuration,
RenewDeadline: cm.renewDeadline,
RetryPeriod: cm.retryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(_ context.Context) {
if err := cm.startLeaderElectionRunnables(); err != nil {
cm.errChan <- err
return
}
close(cm.elected)
},
OnStoppedLeading: func() {
if cm.onStoppedLeading != nil {
cm.onStoppedLeading()
}
// Make sure graceful shutdown is skipped if we lost the leader lock without
// intending to.
cm.gracefulShutdownTimeout = time.Duration(0)
// Most implementations of leader election log.Fatal() here.
// Since Start is wrapped in log.Fatal when called, we can just return
// an error here which will cause the program to exit.
cm.errChan <- errors.New("leader election lost")
},
},
ReleaseOnCancel: cm.leaderElectionReleaseOnCancel,
Name: cm.leaderElectionID,
})
leaderElector, err = cm.initLeaderElector()
if err != nil {
return fmt.Errorf("failed during initialization leader election process: %w", err)
}
Expand Down Expand Up @@ -603,6 +575,43 @@ func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) e
return nil
}

func (cm *controllerManager) initLeaderElector() (*leaderelection.LeaderElector, error) {
leaderElector, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
Lock: cm.resourceLock,
LeaseDuration: cm.leaseDuration,
RenewDeadline: cm.renewDeadline,
RetryPeriod: cm.retryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(_ context.Context) {
if err := cm.startLeaderElectionRunnables(); err != nil {
cm.errChan <- err
return
}
close(cm.elected)
},
OnStoppedLeading: func() {
if cm.onStoppedLeading != nil {
cm.onStoppedLeading()
}
// Make sure graceful shutdown is skipped if we lost the leader lock without
// intending to.
cm.gracefulShutdownTimeout = time.Duration(0)
// Most implementations of leader election log.Fatal() here.
// Since Start is wrapped in log.Fatal when called, we can just return
// an error here which will cause the program to exit.
cm.errChan <- errors.New("leader election lost")
},
},
ReleaseOnCancel: cm.leaderElectionReleaseOnCancel,
Name: cm.leaderElectionID,
})
if err != nil {
return nil, err
}

return leaderElector, nil
}

func (cm *controllerManager) startLeaderElectionRunnables() error {
return cm.runnables.LeaderElection.Start(cm.internalCtx)
}
Expand Down

0 comments on commit 49d60ff

Please sign in to comment.