Skip to content

Commit

Permalink
Restructure to use channels instead of a pull model
Browse files Browse the repository at this point in the history
Signed-off-by: Vince Prignano <[email protected]>
  • Loading branch information
vincepri committed Nov 3, 2021
1 parent fe8076b commit 423ec74
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 147 deletions.
12 changes: 6 additions & 6 deletions pkg/manager/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (cm *controllerManager) add(r Runnable) error {
if err := cm.SetFields(r); err != nil {
return err
}
return cm.runnables.Add(r, nil)
return cm.runnables.Add(r)
}

// Deprecated: use the equivalent Options field to set a field. This method will be removed in v0.10.
Expand All @@ -216,7 +216,7 @@ func (cm *controllerManager) AddMetricsExtraHandler(path string, handler http.Ha
defer cm.Unlock()

if cm.started {
return fmt.Errorf("uunable to add new metrics handler because metrics endpoint has already been created")
return fmt.Errorf("unable to add new metrics handler because metrics endpoint has already been created")
}

if path == defaultMetricsEndpoint {
Expand Down Expand Up @@ -456,21 +456,21 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) {
// WARNING: Webhooks MUST start before any cache is populated, otherwise there is a race condition
// between conversion webhooks and the cache sync (usually initial list) which causes the webhooks
// to never start because no cache can be populated.
if err := cm.runnables.Webhooks.StartAndWaitReady(cm.internalCtx); err != nil {
if err := cm.runnables.Webhooks.Start(cm.internalCtx); err != nil {
if err != wait.ErrWaitTimeout {
return err
}
}

// Start and wait for caches.
if err := cm.runnables.Caches.StartAndWaitReady(cm.internalCtx); err != nil {
if err := cm.runnables.Caches.Start(cm.internalCtx); err != nil {
if err != wait.ErrWaitTimeout {
return err
}
}

// Start the non-leaderelection Runnables after the cache has synced.
if err := cm.runnables.Others.StartAndWaitReady(cm.internalCtx); err != nil {
if err := cm.runnables.Others.Start(cm.internalCtx); err != nil {
if err != wait.ErrWaitTimeout {
return err
}
Expand Down Expand Up @@ -601,7 +601,7 @@ func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) e
}

func (cm *controllerManager) startLeaderElectionRunnables() error {
return cm.runnables.LeaderElection.StartAndWaitReady(cm.internalCtx)
return cm.runnables.LeaderElection.Start(cm.internalCtx)
}

func (cm *controllerManager) startLeaderElection(ctx context.Context) (err error) {
Expand Down
3 changes: 0 additions & 3 deletions pkg/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1817,9 +1817,6 @@ func (c *startSignalingInformer) Start(ctx context.Context) error {

func (c *startSignalingInformer) WaitForCacheSync(ctx context.Context) bool {
defer func() {
for !c.started() {
continue
}
c.mu.Lock()
c.wasSynced = true
c.mu.Unlock()
Expand Down
152 changes: 69 additions & 83 deletions pkg/manager/runnable_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ import (
"context"
"errors"
"sync"
"time"

"k8s.io/apimachinery/pkg/util/wait"
"sigs.k8s.io/controller-runtime/pkg/webhook"
)

Expand All @@ -18,8 +16,8 @@ var (
// a ready check.
type readyRunnable struct {
Runnable
Check runnableCheck
Ready bool
Check runnableCheck
signalReady bool
}

// runnableCheck can be passed to Add() to let the runnable group determine that a
Expand All @@ -46,33 +44,27 @@ func newRunnables(errChan chan error) *runnables {
}
}

// Add adds a runnable and its ready check to the closest
// group of runnable that they belong to.
// Add adds a runnable to closest group of runnable that they belong to.
//
// Add should be able to be called before and after Start, but not after StopAndWait.
// Add should return an error when called during StopAndWait.
// The runnables added before Start are started when Start is called.
// The runnables added after Start are started directly.
func (r *runnables) Add(fn Runnable, ready runnableCheck) error {
if ready == nil {
// If we don't have a readiness check, always return true.
ready = func(_ context.Context) bool { return true }
}

func (r *runnables) Add(fn Runnable) error {
switch runnable := fn.(type) {
case hasCache:
return r.Caches.Add(fn, func(ctx context.Context) bool {
return ready(ctx) && runnable.GetCache().WaitForCacheSync(ctx)
return runnable.GetCache().WaitForCacheSync(ctx)
})
case *webhook.Server:
return r.Webhooks.Add(fn, ready)
return r.Webhooks.Add(fn, nil)
case LeaderElectionRunnable:
if !runnable.NeedLeaderElection() {
return r.Others.Add(fn, ready)
return r.Others.Add(fn, nil)
}
return r.LeaderElection.Add(fn, ready)
return r.LeaderElection.Add(fn, nil)
default:
return r.LeaderElection.Add(fn, ready)
return r.LeaderElection.Add(fn, nil)
}
}

Expand All @@ -85,9 +77,11 @@ type runnableGroup struct {
ctx context.Context
cancel context.CancelFunc

start sync.Mutex
startOnce sync.Once
started bool
start sync.Mutex
startOnce sync.Once
started bool
startQueue []*readyRunnable
startReadyCh chan *readyRunnable

stop sync.RWMutex
stopOnce sync.Once
Expand All @@ -104,23 +98,14 @@ type runnableGroup struct {
// wg is an internal sync.WaitGroup that allows us to properly stop
// and wait for all the runnables to finish before returning.
wg *sync.WaitGroup

// group is a sync.Map that contains every runnable ever.
// The key of the map is the runnable itself (key'd by pointer),
// while the value is its ready state.
//
// The group of runnable is append-only, runnables scheduled
// through this group are going to be stored in this internal map
// until the application exits. The limit is the available memory.
group *sync.Map
}

func newRunnableGroup(errChan chan error) *runnableGroup {
r := &runnableGroup{
errChan: errChan,
ch: make(chan *readyRunnable),
wg: new(sync.WaitGroup),
group: new(sync.Map),
startReadyCh: make(chan *readyRunnable),
errChan: errChan,
ch: make(chan *readyRunnable),
wg: new(sync.WaitGroup),
}
r.ctx, r.cancel = context.WithCancel(context.Background())
return r
Expand All @@ -133,25 +118,57 @@ func (r *runnableGroup) Started() bool {
return r.started
}

// StartAndWaitReady starts all the runnables previously
// added to the group and waits for all to report ready.
func (r *runnableGroup) StartAndWaitReady(ctx context.Context) error {
r.Start()
return r.WaitReady(ctx)
}
// Start starts the group and waits for all
// initially registered runnables to start.
// It can only be called once, subsequent calls have no effect.
func (r *runnableGroup) Start(ctx context.Context) error {
var retErr error

// Start starts the group, it can only be called once.
func (r *runnableGroup) Start() {
r.startOnce.Do(func() {
defer close(r.startReadyCh)

// Start the internal reconciler.
go r.reconcile()

// Start the group and queue up all
// the runnables that were added prior.
r.start.Lock()
r.started = true
r.group.Range(func(key, _ interface{}) bool {
r.ch <- key.(*readyRunnable)
return true
})
for _, rn := range r.startQueue {
rn.signalReady = true
r.ch <- rn
}
r.start.Unlock()

// If we don't have any queue, return.
if len(r.startQueue) == 0 {
return
}

// Wait for all runnables to signal.
for {
select {
case <-ctx.Done():
if err := ctx.Err(); !errors.Is(err, context.Canceled) {
retErr = err
}
case rn := <-r.startReadyCh:
for i, existing := range r.startQueue {
if existing == rn {
// Remove the item from the start queue.
r.startQueue = append(r.startQueue[:i], r.startQueue[i+1:]...)
break
}
}
// We're done waiting if the queue is empty, return.
if len(r.startQueue) == 0 {
return
}
}
}
})

return retErr
}

// reconcile is our main entrypoint for every runnable added
Expand Down Expand Up @@ -185,26 +202,17 @@ func (r *runnableGroup) reconcile() {
go func(rn *readyRunnable) {
go func() {
if rn.Check(r.ctx) {
r.group.Store(rn, true)
if rn.signalReady {
r.startReadyCh <- rn
}
}
}()

// If we return, the runnable ended cleanly
// or returned an error to the channel.
//
// We should always decrement the WaitGroup and
// mark the runnable as ready.
//
// Think about the group as an append-only system.
//
// A runnable is marked as ready if:
// - The health check return true.
// - The runnable Start() method returned and
// it either finished cleanly (e.g. one shot operations)
// or it failed to run and it returned an error which
// gets propagated to the manager.
// We should always decrement the WaitGroup here.
defer r.wg.Done()
defer r.group.Store(rn, true)

// Start the runnable.
if err := rn.Start(r.ctx); err != nil {
Expand All @@ -214,27 +222,6 @@ func (r *runnableGroup) reconcile() {
}
}

// WaitReady polls until the group is ready or until the context is cancelled.
func (r *runnableGroup) WaitReady(ctx context.Context) error {
return wait.PollImmediateInfiniteWithContext(ctx,
100*time.Millisecond,
func(_ context.Context) (bool, error) {
if !r.Started() {
return false, nil
}
ready, total := 0, 0
r.group.Range(func(_, value interface{}) bool {
total++
if rd, ok := value.(bool); ok && rd {
ready++
}
return true
})
return ready == total, nil
},
)
}

// Add should be able to be called before and after Start, but not after StopAndWait.
// Add should return an error when called during StopAndWait.
func (r *runnableGroup) Add(rn Runnable, ready runnableCheck) error {
Expand All @@ -261,11 +248,10 @@ func (r *runnableGroup) Add(rn Runnable, ready runnableCheck) error {
{
r.start.Lock()

// Store the runnable in the internal buffer.
r.group.Store(readyRunnable, false)

// Check if we're already started.
if !r.started {
// Store the runnable in the internal if not.
r.startQueue = append(r.startQueue, readyRunnable)
r.start.Unlock()
return nil
}
Expand All @@ -283,7 +269,7 @@ func (r *runnableGroup) StopAndWait(ctx context.Context) {
// Close the reconciler channel once we're done.
defer close(r.ch)

r.Start()
_ = r.Start(ctx)
r.stop.Lock()
// Store the stopped variable so we don't accept any new
// runnables for the time being.
Expand Down
Loading

0 comments on commit 423ec74

Please sign in to comment.