From 7c59ac65d5b9f0c46ab800759287a2cc89920a67 Mon Sep 17 00:00:00 2001 From: Vince Prignano Date: Tue, 12 Oct 2021 06:58:15 -0700 Subject: [PATCH] Refactor manager internal around RunnableGroup(s) Signed-off-by: Vince Prignano --- go.mod | 1 + pkg/manager/internal.go | 151 ++++++----------- pkg/manager/manager.go | 1 + pkg/manager/manager_test.go | 26 ++- pkg/manager/runnable_group.go | 252 +++++++++++++++++++++++++++++ pkg/manager/runnable_group_test.go | 154 ++++++++++++++++++ 6 files changed, 473 insertions(+), 112 deletions(-) create mode 100644 pkg/manager/runnable_group.go create mode 100644 pkg/manager/runnable_group_test.go diff --git a/go.mod b/go.mod index fabf9b8f66..a3909cbadb 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module sigs.k8s.io/controller-runtime go 1.16 require ( + github.com/davecgh/go-spew v1.1.1 github.com/evanphx/json-patch v4.11.0+incompatible github.com/fsnotify/fsnotify v1.4.9 github.com/go-logr/logr v0.4.0 diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index c3e7076d61..6496246e8c 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -25,6 +25,7 @@ import ( "sync" "time" + "github.com/davecgh/go-spew/spew" "github.com/go-logr/logr" "github.com/prometheus/client_golang/prometheus/promhttp" "k8s.io/apimachinery/pkg/api/meta" @@ -105,10 +106,9 @@ type controllerManager struct { healthzHandler *healthz.Handler mu sync.Mutex - started bool - startedLeader bool healthzStarted bool errChan chan error + runnables *runnables // controllerOptions are the global controller options. controllerOptions v1alpha1.ControllerConfigurationSpec @@ -134,8 +134,6 @@ type controllerManager struct { // election was configured. elected chan struct{} - caches []hasCache - // port is the port that the webhook server serves at. port int // host is the hostname that the webhook server binds to. @@ -160,10 +158,6 @@ type controllerManager struct { // between tries of actions. retryPeriod time.Duration - // waitForRunnable is holding the number of runnables currently running so that - // we can wait for them to exit before quitting the manager - waitForRunnable sync.WaitGroup - // gracefulShutdownTimeout is the duration given to runnable to stop // before the manager actually returns on stop. gracefulShutdownTimeout time.Duration @@ -194,6 +188,7 @@ type hasCache interface { func (cm *controllerManager) Add(r Runnable) error { cm.mu.Lock() defer cm.mu.Unlock() + if cm.stopProcedureEngaged { return errors.New("can't accept new runnable as stop procedure is already engaged") } @@ -203,31 +198,14 @@ func (cm *controllerManager) Add(r Runnable) error { return err } - var shouldStart bool - - // Add the runnable to the leader election or the non-leaderelection list - if leRunnable, ok := r.(LeaderElectionRunnable); ok && !leRunnable.NeedLeaderElection() { - shouldStart = cm.started - cm.nonLeaderElectionRunnables = append(cm.nonLeaderElectionRunnables, r) - } else if hasCache, ok := r.(hasCache); ok { - cm.caches = append(cm.caches, hasCache) - if cm.started { - cm.startRunnable(hasCache) - if !hasCache.GetCache().WaitForCacheSync(cm.internalCtx) { - return fmt.Errorf("could not sync cache") + return cm.runnables.Add(r, func(ctx context.Context) bool { + if cache, ok := r.(hasCache); ok { + if !cache.GetCache().WaitForCacheSync(cm.internalCtx) { + return false } } - } else { - shouldStart = cm.startedLeader - cm.leaderElectionRunnables = append(cm.leaderElectionRunnables, r) - } - - if shouldStart { - // If already started, start the controller - cm.startRunnable(r) - } - - return nil + return true + }) } // Deprecated: use the equivalent Options field to set a field. This method will be removed in v0.10. @@ -385,13 +363,13 @@ func (cm *controllerManager) serveMetrics() { Handler: mux, } // Run the server - cm.startRunnable(RunnableFunc(func(_ context.Context) error { + cm.runnables.Add(RunnableFunc(func(_ context.Context) error { cm.logger.Info("starting metrics server", "path", defaultMetricsEndpoint) if err := server.Serve(cm.metricsListener); err != nil && err != http.ErrServerClosed { return err } return nil - })) + }), nil) // Shutdown the server when stop is closed <-cm.internalProceduresStop @@ -422,12 +400,12 @@ func (cm *controllerManager) serveHealthProbes() { } // Run server - cm.startRunnable(RunnableFunc(func(_ context.Context) error { + cm.runnables.Add(RunnableFunc(func(_ context.Context) error { if err := server.Serve(cm.healthProbeListener); err != nil && err != http.ErrServerClosed { return err } return nil - })) + }), nil) cm.healthzStarted = true }() @@ -438,11 +416,30 @@ func (cm *controllerManager) serveHealthProbes() { } } +// Start starts the manager and locks indefinitely. +// There is only two ways to have start return: +// An error has occurred during in one of the internal operations, +// such as leader election, cache start, webhooks, and so on. +// Or, the context is cancelled. func (cm *controllerManager) Start(ctx context.Context) (err error) { + cm.mu.Lock() + { + // Initialize the internal context. + cm.internalCtx, cm.internalCancel = context.WithCancel(ctx) + + // initialize this here so that we reset the signal channel state on every start + // Everything that might write into this channel must be started in a new goroutine, + // because otherwise we might block this routine trying to write into the full channel + // and will not be able to enter the deferred cm.engageStopProcedure() which drains + // it. + cm.errChan = make(chan error) + } + cm.mu.Unlock() + + // Add the cluster runnable. if err := cm.Add(cm.cluster); err != nil { return fmt.Errorf("failed to add cluster to runnables: %w", err) } - cm.internalCtx, cm.internalCancel = context.WithCancel(ctx) // This chan indicates that stop is complete, in other words all runnables have returned or timeout on stop request stopComplete := make(chan struct{}) @@ -463,13 +460,6 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) { } }() - // initialize this here so that we reset the signal channel state on every start - // Everything that might write into this channel must be started in a new goroutine, - // because otherwise we might block this routine trying to write into the full channel - // and will not be able to enter the deferred cm.engageStopProcedure() which drains - // it. - cm.errChan = make(chan error) - // Metrics should be served whether the controller is leader or not. // (If we don't serve metrics for non-leaders, prometheus will still scrape // the pod but will get a connection refused) @@ -568,7 +558,10 @@ func (cm *controllerManager) waitForRunnableToEnd(shutdownCancel context.CancelF }() go func() { - cm.waitForRunnable.Wait() + cm.runnables.others.StopAndWait() + cm.runnables.caches.StopAndWait() + cm.runnables.leaderElection.StopAndWait() + cm.runnables.webhooks.StopAndWait() shutdownCancel() }() @@ -580,71 +573,29 @@ func (cm *controllerManager) waitForRunnableToEnd(shutdownCancel context.CancelF } func (cm *controllerManager) startNonLeaderElectionRunnables() { - cm.mu.Lock() - defer cm.mu.Unlock() - // First start any webhook servers, which includes conversion, validation, and defaulting // webhooks that are registered. // // WARNING: Webhooks MUST start before any cache is populated, otherwise there is a race condition // between conversion webhooks and the cache sync (usually initial list) which causes the webhooks // to never start because no cache can be populated. - for _, c := range cm.nonLeaderElectionRunnables { - if _, ok := c.(*webhook.Server); ok { - cm.startRunnable(c) - } - } + cm.runnables.webhooks.Start(cm.internalCtx, cm.errChan) + cm.runnables.webhooks.WaitReady(cm.internalCtx) // Start and wait for caches. - cm.waitForCache(cm.internalCtx) + cm.runnables.caches.WaitReady(cm.internalCtx) // Start the non-leaderelection Runnables after the cache has synced - for _, c := range cm.nonLeaderElectionRunnables { - if _, ok := c.(*webhook.Server); ok { - continue - } - - // Controllers block, but we want to return an error if any have an error starting. - // Write any Start errors to a channel so we can return them - cm.startRunnable(c) - } + cm.runnables.others.Start(cm.internalCtx, cm.errChan) } func (cm *controllerManager) startLeaderElectionRunnables() { - cm.mu.Lock() - defer cm.mu.Unlock() - - cm.waitForCache(cm.internalCtx) - - // Start the leader election Runnables after the cache has synced - for _, c := range cm.leaderElectionRunnables { - // Controllers block, but we want to return an error if any have an error starting. - // Write any Start errors to a channel so we can return them - cm.startRunnable(c) - } - - cm.startedLeader = true -} + spew.Dump("STARTING THE CACHES!!!") + cm.runnables.caches.Start(cm.internalCtx, cm.errChan) + cm.runnables.caches.WaitReady(cm.internalCtx) -func (cm *controllerManager) waitForCache(ctx context.Context) { - if cm.started { - return - } - - for _, cache := range cm.caches { - cm.startRunnable(cache) - } - - // Wait for the caches to sync. - // TODO(community): Check the return value and write a test - for _, cache := range cm.caches { - cache.GetCache().WaitForCacheSync(ctx) - } - // TODO: This should be the return value of cm.cache.WaitForCacheSync but we abuse - // cm.started as check if we already started the cache so it must always become true. - // Making sure that the cache doesn't get started twice is needed to not get a "close - // of closed channel" panic - cm.started = true + cm.runnables.leaderElection.Start(cm.internalCtx, cm.errChan) + cm.runnables.leaderElection.WaitReady(cm.internalCtx) } func (cm *controllerManager) startLeaderElection() (err error) { @@ -694,13 +645,3 @@ func (cm *controllerManager) startLeaderElection() (err error) { func (cm *controllerManager) Elected() <-chan struct{} { return cm.elected } - -func (cm *controllerManager) startRunnable(r Runnable) { - cm.waitForRunnable.Add(1) - go func() { - defer cm.waitForRunnable.Done() - if err := r.Start(cm.internalCtx); err != nil { - cm.errChan <- err - } - }() -} diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 2d2733f0a6..6e243573cb 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -367,6 +367,7 @@ func New(config *rest.Config, options Options) (Manager, error) { return &controllerManager{ cluster: cluster, + runnables: newRunnables(), recorderProvider: recorderProvider, resourceLock: resourceLock, metricsListener: metricsListener, diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index 2cb2c72560..d8e0c3a9bf 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -28,6 +28,7 @@ import ( "sync/atomic" "time" + "github.com/davecgh/go-spew/spew" "github.com/go-logr/logr" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -285,8 +286,10 @@ var _ = Describe("manger.Manager", func() { runnableDone := make(chan struct{}) slowRunnable := RunnableFunc(func(ctx context.Context) error { + spew.Dump("START RUNNABLE") <-ctx.Done() - time.Sleep(100 * time.Millisecond) + time.Sleep(80 * time.Millisecond) + spew.Dump("DONE RUNNABLE") close(runnableDone) return nil }) @@ -296,6 +299,7 @@ var _ = Describe("manger.Manager", func() { cm.gracefulShutdownTimeout = time.Second leaderElectionDone := make(chan struct{}) cm.onStoppedLeading = func() { + spew.Dump("CLOSE LE") close(leaderElectionDone) } @@ -306,7 +310,7 @@ var _ = Describe("manger.Manager", func() { Expect(m.Start(ctx)).To(BeNil()) close(mgrDone) }() - <-cm.elected + <-cm.Elected() cancel() select { case <-leaderElectionDone: @@ -435,6 +439,7 @@ var _ = Describe("manger.Manager", func() { Expect(m).To(BeNil()) Expect(err).To(MatchError(ContainSubstring("expected error"))) }) + It("should return an error if namespace not set and not running in cluster", func() { m, err := New(cfg, Options{LeaderElection: true, LeaderElectionID: "controller-runtime"}) Expect(m).To(BeNil()) @@ -653,7 +658,7 @@ var _ = Describe("manger.Manager", func() { } mgr, ok := m.(*controllerManager) Expect(ok).To(BeTrue()) - mgr.caches = []hasCache{&cacheProvider{cache: &informertest.FakeInformers{Error: fmt.Errorf("expected error")}}} + mgr.Add(&cacheProvider{cache: &informertest.FakeInformers{Error: fmt.Errorf("expected error")}}) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -663,6 +668,7 @@ var _ = Describe("manger.Manager", func() { It("should start the cache before starting anything else", func() { fakeCache := &startSignalingInformer{Cache: &informertest.FakeInformers{}} options.NewCache = func(_ *rest.Config, _ cache.Options) (cache.Cache, error) { + spew.Dump("RETURNED CACHE") return fakeCache, nil } m, err := New(cfg, options) @@ -673,10 +679,13 @@ var _ = Describe("manger.Manager", func() { runnableWasStarted := make(chan struct{}) Expect(m.Add(RunnableFunc(func(ctx context.Context) error { + spew.Dump("RUNNABLE START") defer GinkgoRecover() if !fakeCache.wasSynced { + spew.Dump("RUNNABLE NOT SYNCED") return errors.New("runnable got started before cache was synced") } + spew.Dump("RUNNABLE DONE") close(runnableWasStarted) return nil }))).To(Succeed()) @@ -801,8 +810,9 @@ var _ = Describe("manger.Manager", func() { Expect(err).NotTo(HaveOccurred()) Expect(m.Add(fakeCluster)).NotTo(HaveOccurred()) - Expect(fakeCluster.informer.wasStarted).To(BeTrue()) - Expect(fakeCluster.informer.wasSynced).To(BeTrue()) + Eventually(func() bool { + return fakeCluster.informer.wasStarted && fakeCluster.informer.wasSynced + }).Should(BeTrue()) }) It("should wait for runnables to stop", func() { @@ -1392,7 +1402,7 @@ var _ = Describe("manger.Manager", func() { Eventually(func() bool { mgr.mu.Lock() defer mgr.mu.Unlock() - return mgr.started + return mgr.runnables.caches.Started() }).Should(BeTrue()) // Add another component after starting @@ -1423,7 +1433,7 @@ var _ = Describe("manger.Manager", func() { Eventually(func() bool { mgr.mu.Lock() defer mgr.mu.Unlock() - return mgr.started + return mgr.runnables.caches.Started() }).Should(BeTrue()) c1 := make(chan struct{}) @@ -1577,6 +1587,8 @@ var _ = Describe("manger.Manager", func() { defer close(doneCh) Expect(m.Start(ctx)).To(Succeed()) }() + <-m.Elected() + Eventually(func() *corev1.Event { evts, err := clientset.CoreV1().Events("").Search(m.GetScheme(), &ns) Expect(err).NotTo(HaveOccurred()) diff --git a/pkg/manager/runnable_group.go b/pkg/manager/runnable_group.go new file mode 100644 index 0000000000..8ee75b8045 --- /dev/null +++ b/pkg/manager/runnable_group.go @@ -0,0 +1,252 @@ +package manager + +import ( + "context" + "errors" + "sync" + "sync/atomic" + "time" + + "github.com/davecgh/go-spew/spew" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/utils/pointer" + "sigs.k8s.io/controller-runtime/pkg/webhook" +) + +type readyRunnable struct { + Runnable + ReadyCheck readyCheck +} + +type readyCheck func(ctx context.Context) bool + +type runnables struct { + startOnce sync.Once + stopOnce sync.Once + + webhooks *runnableGroup + caches *runnableGroup + leaderElection *runnableGroup + others *runnableGroup +} + +func newRunnables() *runnables { + return &runnables{ + webhooks: newRunnableGroup(), + caches: newRunnableGroup(), + leaderElection: newRunnableGroup(), + others: newRunnableGroup(), + } +} + +func (r *runnables) Add(fn Runnable, ready readyCheck) error { + switch runnable := fn.(type) { + case hasCache: + return r.caches.Add(fn, ready) + case *webhook.Server: + return r.webhooks.Add(fn, ready) + case LeaderElectionRunnable: + if !runnable.NeedLeaderElection() { + return r.others.Add(fn, ready) + } + return r.leaderElection.Add(fn, ready) + default: + return r.others.Add(fn, ready) + } +} + +func (r *runnables) Start(ctx context.Context, errCh chan error) { + r.webhooks.Start(ctx, errCh) + r.caches.Start(ctx, errCh) + r.leaderElection.Start(ctx, errCh) + r.others.Start(ctx, errCh) +} + +type runnableGroup struct { + internalCtx context.Context + errCh chan error + + start sync.Mutex + startOnce sync.Once + startBuffer []*readyRunnable + started bool + + stop sync.RWMutex + stopOnce sync.Once + stopped bool + + ch chan *readyRunnable + wg *sync.WaitGroup + c *int64 + ready *int64 +} + +func newRunnableGroup() *runnableGroup { + r := &runnableGroup{ + startBuffer: []*readyRunnable{}, + ch: make(chan *readyRunnable, 10), + wg: new(sync.WaitGroup), + c: pointer.Int64(0), + ready: pointer.Int64(0), + } + + go func() { + for runnable := range r.ch { + // Handle start. + // If the overall runnable group isn't started yet + // we want to buffer the runnables and let Start() + // queue them up again later. + { + r.start.Lock() + if !r.started { + r.startBuffer = append(r.startBuffer, runnable) + r.start.Unlock() + continue + } + r.start.Unlock() + } + + // Handle stop. + // If the shutdown has been called we want to avoid + // adding new goroutines to the WaitGroup because Wait() + // panics if Add() is called after it. + { + r.stop.RLock() + if r.stopped { + // Drop any runnables if we're stopped. + r.stop.RUnlock() + continue + } + + // Why is this here? + // When StopAndWait is called, if a runnable is in the process + // of being added, we could end up in a situation where + // the WaitGroup is incremented while StopAndWait has called Wait(), + // which would result in a panic. + r.wg.Add(1) + r.stop.RUnlock() + } + + // Start the runnable. + go func(rn *readyRunnable) { + ctx, cancel := context.WithCancel(r.internalCtx) + go func() { + // Run the ready check a fixed number of times + // backing off a bit; this is to give time to the runnables + // to start up before their health check returns true. + ready := false + for i := 0; i < 10; i++ { + <-time.After(10 * time.Millisecond) + if ready = rn.ReadyCheck(ctx); !ready { + continue + } + break + } + if ready { + atomic.AddInt64(r.ready, 1) + defer atomic.AddInt64(r.ready, -1) + <-ctx.Done() + } + }() + + defer r.wg.Done() + defer atomic.AddInt64(r.c, -1) + defer cancel() + if err := rn.Start(ctx); err != nil { + r.errCh <- err + } + }(runnable) + } + }() + + return r +} + +func (r *runnableGroup) Started() bool { + r.start.Lock() + defer r.start.Unlock() + return r.started +} + +func (r *runnableGroup) Start(ctx context.Context, errCh chan error) { + r.startOnce.Do(func() { + r.start.Lock() + r.internalCtx = ctx + r.errCh = errCh + r.started = true + r.start.Unlock() + for _, fn := range r.startBuffer { + r.stop.RLock() + if !r.stopped { + r.ch <- fn + } + r.stop.RUnlock() + } + }) +} + +// WaitReady polls until the group is ready until the context is cancelled. +func (r *runnableGroup) WaitReady(ctx context.Context) error { + return wait.ExponentialBackoffWithContext(ctx, + wait.Backoff{ + Steps: 10, + Duration: 10 * time.Millisecond, + Factor: 5.0, + Jitter: 1.0, + }, + func() (bool, error) { + if !r.Started() { + return false, nil + } + return atomic.LoadInt64(r.c) == atomic.LoadInt64(r.ready), nil + }, + ) +} + +// Add should be able to be called before and after Start, but not after shutdown. +// Add should return an error when called during shutdown. +func (r *runnableGroup) Add(rn Runnable, ready readyCheck) error { + r.stop.RLock() + defer r.stop.RUnlock() + if r.stopped { + return errors.New("can't accept new runnable as stop procedure is already engaged") + } + + // Increment the internal counter of runnables. + atomic.AddInt64(r.c, 1) + + // If we don't have a readiness check, always return true. + if ready == nil { + ready = func(_ context.Context) bool { return true } + } + + // Enqueue the runnable. + r.ch <- &readyRunnable{ + Runnable: rn, + ReadyCheck: ready, + } + return nil +} + +// StopAndWait waits for all the runnables to finish before returning. +func (r *runnableGroup) StopAndWait() { + r.stopOnce.Do(func() { + if r.Started() { + spew.Dump("STOP WAITING") + r.WaitReady(context.TODO()) + } + + r.stop.Lock() + // Store the stopped variable so we don't accept any new + // runnables for the time being. + spew.Dump("STOPPED") + r.stopped = true + r.stop.Unlock() + + // Wait for all the runnables to finish. + spew.Dump("STOP WG") + r.wg.Wait() + close(r.ch) + spew.Dump("STOP RETURNED") + }) +} diff --git a/pkg/manager/runnable_group_test.go b/pkg/manager/runnable_group_test.go new file mode 100644 index 0000000000..7db56a371e --- /dev/null +++ b/pkg/manager/runnable_group_test.go @@ -0,0 +1,154 @@ +package manager + +import ( + "context" + "sync/atomic" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "k8s.io/utils/pointer" +) + +var _ = Describe("runnableGroup", func() { + errCh := make(chan error) + + Describe("new", func() { + It("should be able to add new runnables before it starts", func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + rg := newRunnableGroup() + rg.Add(RunnableFunc(func(c context.Context) error { + <-ctx.Done() + return nil + }), nil) + + Expect(rg.Started()).To(BeFalse()) + }) + + It("should be able to add new runnables before and after start", func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + rg := newRunnableGroup() + rg.Add(RunnableFunc(func(c context.Context) error { + <-ctx.Done() + return nil + }), nil) + rg.Start(ctx, errCh) + Expect(rg.Started()).To(BeTrue()) + rg.Add(RunnableFunc(func(c context.Context) error { + <-ctx.Done() + return nil + }), nil) + Expect(rg.WaitReady(ctx)).To(Succeed()) + }) + + It("should be able to add new runnables before and after start concurrently", func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + rg := newRunnableGroup() + + go func() { + <-time.After(50 * time.Millisecond) + rg.Start(ctx, errCh) + }() + + for i := 0; i < 20; i++ { + go func(i int) { + <-time.After(time.Duration(i) * 10 * time.Millisecond) + rg.Add(RunnableFunc(func(c context.Context) error { + <-ctx.Done() + return nil + }), nil) + }(i) + } + Expect(rg.WaitReady(ctx)).To(Succeed()) + Expect(*rg.ready).To(BeNumerically("==", 20)) + Expect(*rg.c).To(BeNumerically("==", 20)) + }) + + It("should be able to close the group and wait for all runnables to finish", func() { + ctx, cancel := context.WithCancel(context.Background()) + + exited := pointer.Int64(0) + rg := newRunnableGroup() + for i := 0; i < 10; i++ { + rg.Add(RunnableFunc(func(c context.Context) error { + defer atomic.AddInt64(exited, 1) + <-ctx.Done() + <-time.After(time.Duration(i) * 10 * time.Millisecond) + return nil + }), nil) + } + rg.Start(ctx, errCh) + Expect(rg.WaitReady(ctx)).To(Succeed()) + + // Cancel the context, asking the runnables to exit. + cancel() + + // Watch for stop sign. + stopped := make(chan struct{}) + go func() { + rg.StopAndWait() + close(stopped) + }() + <-stopped + + Expect(rg.Add(RunnableFunc(func(c context.Context) error { + return nil + }), nil)).ToNot(Succeed()) + + Expect(*exited).To(BeNumerically("==", 10)) + }) + + It("should be able to wait for all runnables to be ready at different intervals", func() { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + rg := newRunnableGroup() + + go func() { + <-time.After(50 * time.Millisecond) + rg.Start(ctx, errCh) + }() + + for i := 0; i < 20; i++ { + go func(i int) { + rg.Add(RunnableFunc(func(c context.Context) error { + <-ctx.Done() + return nil + }), func(_ context.Context) bool { + <-time.After(time.Duration(i) * 10 * time.Millisecond) + return true + }) + }(i) + } + Expect(rg.WaitReady(ctx)).To(Succeed()) + Expect(*rg.ready).To(BeNumerically("==", 20)) + Expect(*rg.c).To(BeNumerically("==", 20)) + }) + + It("should not turn ready if some readiness check fail", func() { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + rg := newRunnableGroup() + + go func() { + <-time.After(50 * time.Millisecond) + rg.Start(ctx, errCh) + }() + + for i := 0; i < 20; i++ { + go func(i int) { + rg.Add(RunnableFunc(func(c context.Context) error { + <-ctx.Done() + return nil + }), func(_ context.Context) bool { + <-time.After(time.Duration(i) * 10 * time.Millisecond) + return i%2 == 0 // Return false readiness all uneven indexes. + }) + }(i) + } + Expect(rg.WaitReady(ctx)).ToNot(Succeed()) + }) + }) +})