Skip to content

Commit

Permalink
Refactor manager internal around RunnableGroup(s)
Browse files Browse the repository at this point in the history
Signed-off-by: Vince Prignano <[email protected]>
  • Loading branch information
vincepri committed Oct 12, 2021
1 parent 3e870eb commit 7c59ac6
Show file tree
Hide file tree
Showing 6 changed files with 473 additions and 112 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module sigs.k8s.io/controller-runtime
go 1.16

require (
github.com/davecgh/go-spew v1.1.1
github.com/evanphx/json-patch v4.11.0+incompatible
github.com/fsnotify/fsnotify v1.4.9
github.com/go-logr/logr v0.4.0
Expand Down
151 changes: 46 additions & 105 deletions pkg/manager/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sync"
"time"

"github.com/davecgh/go-spew/spew"
"github.com/go-logr/logr"
"github.com/prometheus/client_golang/prometheus/promhttp"
"k8s.io/apimachinery/pkg/api/meta"
Expand Down Expand Up @@ -105,10 +106,9 @@ type controllerManager struct {
healthzHandler *healthz.Handler

mu sync.Mutex
started bool
startedLeader bool
healthzStarted bool
errChan chan error
runnables *runnables

// controllerOptions are the global controller options.
controllerOptions v1alpha1.ControllerConfigurationSpec
Expand All @@ -134,8 +134,6 @@ type controllerManager struct {
// election was configured.
elected chan struct{}

caches []hasCache

// port is the port that the webhook server serves at.
port int
// host is the hostname that the webhook server binds to.
Expand All @@ -160,10 +158,6 @@ type controllerManager struct {
// between tries of actions.
retryPeriod time.Duration

// waitForRunnable is holding the number of runnables currently running so that
// we can wait for them to exit before quitting the manager
waitForRunnable sync.WaitGroup

// gracefulShutdownTimeout is the duration given to runnable to stop
// before the manager actually returns on stop.
gracefulShutdownTimeout time.Duration
Expand Down Expand Up @@ -194,6 +188,7 @@ type hasCache interface {
func (cm *controllerManager) Add(r Runnable) error {
cm.mu.Lock()
defer cm.mu.Unlock()

if cm.stopProcedureEngaged {
return errors.New("can't accept new runnable as stop procedure is already engaged")
}
Expand All @@ -203,31 +198,14 @@ func (cm *controllerManager) Add(r Runnable) error {
return err
}

var shouldStart bool

// Add the runnable to the leader election or the non-leaderelection list
if leRunnable, ok := r.(LeaderElectionRunnable); ok && !leRunnable.NeedLeaderElection() {
shouldStart = cm.started
cm.nonLeaderElectionRunnables = append(cm.nonLeaderElectionRunnables, r)
} else if hasCache, ok := r.(hasCache); ok {
cm.caches = append(cm.caches, hasCache)
if cm.started {
cm.startRunnable(hasCache)
if !hasCache.GetCache().WaitForCacheSync(cm.internalCtx) {
return fmt.Errorf("could not sync cache")
return cm.runnables.Add(r, func(ctx context.Context) bool {
if cache, ok := r.(hasCache); ok {
if !cache.GetCache().WaitForCacheSync(cm.internalCtx) {
return false
}
}
} else {
shouldStart = cm.startedLeader
cm.leaderElectionRunnables = append(cm.leaderElectionRunnables, r)
}

if shouldStart {
// If already started, start the controller
cm.startRunnable(r)
}

return nil
return true
})
}

// Deprecated: use the equivalent Options field to set a field. This method will be removed in v0.10.
Expand Down Expand Up @@ -385,13 +363,13 @@ func (cm *controllerManager) serveMetrics() {
Handler: mux,
}
// Run the server
cm.startRunnable(RunnableFunc(func(_ context.Context) error {
cm.runnables.Add(RunnableFunc(func(_ context.Context) error {
cm.logger.Info("starting metrics server", "path", defaultMetricsEndpoint)
if err := server.Serve(cm.metricsListener); err != nil && err != http.ErrServerClosed {
return err
}
return nil
}))
}), nil)

// Shutdown the server when stop is closed
<-cm.internalProceduresStop
Expand Down Expand Up @@ -422,12 +400,12 @@ func (cm *controllerManager) serveHealthProbes() {
}

// Run server
cm.startRunnable(RunnableFunc(func(_ context.Context) error {
cm.runnables.Add(RunnableFunc(func(_ context.Context) error {
if err := server.Serve(cm.healthProbeListener); err != nil && err != http.ErrServerClosed {
return err
}
return nil
}))
}), nil)
cm.healthzStarted = true
}()

Expand All @@ -438,11 +416,30 @@ func (cm *controllerManager) serveHealthProbes() {
}
}

// Start starts the manager and locks indefinitely.
// There is only two ways to have start return:
// An error has occurred during in one of the internal operations,
// such as leader election, cache start, webhooks, and so on.
// Or, the context is cancelled.
func (cm *controllerManager) Start(ctx context.Context) (err error) {
cm.mu.Lock()
{
// Initialize the internal context.
cm.internalCtx, cm.internalCancel = context.WithCancel(ctx)

// initialize this here so that we reset the signal channel state on every start
// Everything that might write into this channel must be started in a new goroutine,
// because otherwise we might block this routine trying to write into the full channel
// and will not be able to enter the deferred cm.engageStopProcedure() which drains
// it.
cm.errChan = make(chan error)
}
cm.mu.Unlock()

// Add the cluster runnable.
if err := cm.Add(cm.cluster); err != nil {
return fmt.Errorf("failed to add cluster to runnables: %w", err)
}
cm.internalCtx, cm.internalCancel = context.WithCancel(ctx)

// This chan indicates that stop is complete, in other words all runnables have returned or timeout on stop request
stopComplete := make(chan struct{})
Expand All @@ -463,13 +460,6 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) {
}
}()

// initialize this here so that we reset the signal channel state on every start
// Everything that might write into this channel must be started in a new goroutine,
// because otherwise we might block this routine trying to write into the full channel
// and will not be able to enter the deferred cm.engageStopProcedure() which drains
// it.
cm.errChan = make(chan error)

// Metrics should be served whether the controller is leader or not.
// (If we don't serve metrics for non-leaders, prometheus will still scrape
// the pod but will get a connection refused)
Expand Down Expand Up @@ -568,7 +558,10 @@ func (cm *controllerManager) waitForRunnableToEnd(shutdownCancel context.CancelF
}()

go func() {
cm.waitForRunnable.Wait()
cm.runnables.others.StopAndWait()
cm.runnables.caches.StopAndWait()
cm.runnables.leaderElection.StopAndWait()
cm.runnables.webhooks.StopAndWait()
shutdownCancel()
}()

Expand All @@ -580,71 +573,29 @@ func (cm *controllerManager) waitForRunnableToEnd(shutdownCancel context.CancelF
}

func (cm *controllerManager) startNonLeaderElectionRunnables() {
cm.mu.Lock()
defer cm.mu.Unlock()

// First start any webhook servers, which includes conversion, validation, and defaulting
// webhooks that are registered.
//
// WARNING: Webhooks MUST start before any cache is populated, otherwise there is a race condition
// between conversion webhooks and the cache sync (usually initial list) which causes the webhooks
// to never start because no cache can be populated.
for _, c := range cm.nonLeaderElectionRunnables {
if _, ok := c.(*webhook.Server); ok {
cm.startRunnable(c)
}
}
cm.runnables.webhooks.Start(cm.internalCtx, cm.errChan)
cm.runnables.webhooks.WaitReady(cm.internalCtx)

// Start and wait for caches.
cm.waitForCache(cm.internalCtx)
cm.runnables.caches.WaitReady(cm.internalCtx)

// Start the non-leaderelection Runnables after the cache has synced
for _, c := range cm.nonLeaderElectionRunnables {
if _, ok := c.(*webhook.Server); ok {
continue
}

// Controllers block, but we want to return an error if any have an error starting.
// Write any Start errors to a channel so we can return them
cm.startRunnable(c)
}
cm.runnables.others.Start(cm.internalCtx, cm.errChan)
}

func (cm *controllerManager) startLeaderElectionRunnables() {
cm.mu.Lock()
defer cm.mu.Unlock()

cm.waitForCache(cm.internalCtx)

// Start the leader election Runnables after the cache has synced
for _, c := range cm.leaderElectionRunnables {
// Controllers block, but we want to return an error if any have an error starting.
// Write any Start errors to a channel so we can return them
cm.startRunnable(c)
}

cm.startedLeader = true
}
spew.Dump("STARTING THE CACHES!!!")
cm.runnables.caches.Start(cm.internalCtx, cm.errChan)
cm.runnables.caches.WaitReady(cm.internalCtx)

func (cm *controllerManager) waitForCache(ctx context.Context) {
if cm.started {
return
}

for _, cache := range cm.caches {
cm.startRunnable(cache)
}

// Wait for the caches to sync.
// TODO(community): Check the return value and write a test
for _, cache := range cm.caches {
cache.GetCache().WaitForCacheSync(ctx)
}
// TODO: This should be the return value of cm.cache.WaitForCacheSync but we abuse
// cm.started as check if we already started the cache so it must always become true.
// Making sure that the cache doesn't get started twice is needed to not get a "close
// of closed channel" panic
cm.started = true
cm.runnables.leaderElection.Start(cm.internalCtx, cm.errChan)
cm.runnables.leaderElection.WaitReady(cm.internalCtx)
}

func (cm *controllerManager) startLeaderElection() (err error) {
Expand Down Expand Up @@ -694,13 +645,3 @@ func (cm *controllerManager) startLeaderElection() (err error) {
func (cm *controllerManager) Elected() <-chan struct{} {
return cm.elected
}

func (cm *controllerManager) startRunnable(r Runnable) {
cm.waitForRunnable.Add(1)
go func() {
defer cm.waitForRunnable.Done()
if err := r.Start(cm.internalCtx); err != nil {
cm.errChan <- err
}
}()
}
1 change: 1 addition & 0 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ func New(config *rest.Config, options Options) (Manager, error) {

return &controllerManager{
cluster: cluster,
runnables: newRunnables(),
recorderProvider: recorderProvider,
resourceLock: resourceLock,
metricsListener: metricsListener,
Expand Down
Loading

0 comments on commit 7c59ac6

Please sign in to comment.