Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(controller): leader election preventing two controllers running and gracefully shutting down #2291

Merged
merged 29 commits into from
Oct 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
7a7355b
WIP on fixing leader election fix
zachaller Sep 30, 2022
5ae8779
Start and stop informers as well
zachaller Sep 30, 2022
5c0a54d
lint
zachaller Oct 1, 2022
306661a
Remove tests that do not test anything
zachaller Oct 3, 2022
bc6c327
fix lint
zachaller Oct 3, 2022
b2cfe86
github trigger re-run
zachaller Oct 3, 2022
00b7699
Cleanup
zachaller Oct 3, 2022
28fba55
cleanup
zachaller Oct 3, 2022
809b652
Add back one test
zachaller Oct 3, 2022
8d516fe
remove secondary metric server
zachaller Oct 4, 2022
541123d
Remove secondary metric test
zachaller Oct 4, 2022
593af56
Add single instance test to catch log lines
zachaller Oct 5, 2022
76d5134
We should shutdown if we can not sync
zachaller Oct 5, 2022
5667c3f
Merge branch 'master' of https://github.com/argoproj/argo-rollouts in…
zachaller Oct 5, 2022
72c4697
fix lint
zachaller Oct 5, 2022
0838b65
Redo for loop will have another pr that stops via context
zachaller Oct 5, 2022
c369d1c
Fix comment
zachaller Oct 5, 2022
317bdf2
Add context and graceful shutdown
zachaller Oct 6, 2022
6a84f57
lint
zachaller Oct 6, 2022
1682cc8
Fix test
zachaller Oct 6, 2022
043c10b
github trigger re-run
zachaller Oct 6, 2022
5410287
add more time for startup
zachaller Oct 6, 2022
2b0bf62
add individual controller startup tests
zachaller Oct 7, 2022
b34ebf6
standardize shutdown
zachaller Oct 7, 2022
4bd8150
Standardize leader test
zachaller Oct 7, 2022
b12cedc
Merge branch 'master' of https://github.com/argoproj/argo-rollouts in…
zachaller Oct 7, 2022
11a1ce3
fix test
zachaller Oct 7, 2022
73f9ae4
We can not turn on release on cancel
zachaller Oct 7, 2022
273ac79
fix release on cancel
zachaller Oct 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions analysis/controller.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package analysis

import (
"context"
"sync"
"time"

unstructuredutil "github.com/argoproj/argo-rollouts/utils/unstructured"
Expand Down Expand Up @@ -131,21 +133,26 @@ func NewController(cfg ControllerConfig) *Controller {
return controller
}

func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
func (c *Controller) Run(ctx context.Context, threadiness int) error {
log.Info("Starting analysis workers")
wg := sync.WaitGroup{}
for i := 0; i < threadiness; i++ {
wg.Add(1)
go wait.Until(func() {
controllerutil.RunWorker(c.analysisRunWorkQueue, logutil.AnalysisRunKey, c.syncHandler, c.metricsServer)
}, time.Second, stopCh)
controllerutil.RunWorker(ctx, c.analysisRunWorkQueue, logutil.AnalysisRunKey, c.syncHandler, c.metricsServer)
log.Debug("Analysis worker has stopped")
wg.Done()
}, time.Second, ctx.Done())
}
log.Infof("Started %d analysis workers", threadiness)
<-stopCh
log.Info("Shutting down analysis workers")
<-ctx.Done()
wg.Wait()
log.Info("All analysis workers have stopped")

return nil
}

func (c *Controller) syncHandler(key string) error {
func (c *Controller) syncHandler(ctx context.Context, key string) error {
startTime := timeutil.Now()
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
Expand Down
21 changes: 19 additions & 2 deletions analysis/controller_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package analysis

import (
"context"
"encoding/json"
"reflect"
"testing"
Expand Down Expand Up @@ -97,7 +98,7 @@ func (f *fixture) newController(resync resyncFunc) (*Controller, informers.Share
metricsServer := metrics.NewMetricsServer(metrics.ServerConfig{
Addr: "localhost:8080",
K8SRequestProvider: &metrics.K8sRequestsCountProvider{},
}, true)
})

c := NewController(ControllerConfig{
KubeClientSet: f.kubeclient,
Expand Down Expand Up @@ -159,7 +160,7 @@ func (f *fixture) runController(analysisRunName string, startInformers bool, exp
assert.True(f.t, cache.WaitForCacheSync(stopCh, c.analysisRunSynced))
}

err := c.syncHandler(analysisRunName)
err := c.syncHandler(context.Background(), analysisRunName)
if !expectError && err != nil {
f.t.Errorf("error syncing experiment: %v", err)
} else if expectError && err == nil {
Expand Down Expand Up @@ -314,3 +315,19 @@ func TestNoReconcileForAnalysisRunWithDeletionTimestamp(t *testing.T) {

f.run(getKey(ar, t))
}

func TestRun(t *testing.T) {
f := newFixture(t)
defer f.Close()

// make sure we can start and top the controller
c, _, _ := f.newController(noResyncPeriodFunc)
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
go func() {
time.Sleep(1000 * time.Millisecond)
c.analysisRunWorkQueue.ShutDownWithDrain()
cancel()
}()
c.Run(ctx, 1)
}
27 changes: 10 additions & 17 deletions cmd/rollouts-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func newCommand() *cobra.Command {
log.WithField("version", version.GetVersion()).Info("Argo Rollouts starting")

// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()
ctx := signals.SetupSignalHandlerContext()

defaults.SetVerifyTargetGroup(awsVerifyTargetGroup)
defaults.SetIstioAPIVersion(istioVersion)
Expand Down Expand Up @@ -190,23 +190,16 @@ func newCommand() *cobra.Command {
healthzPort,
k8sRequestProvider,
nginxIngressClasses,
albIngressClasses)
// notice that there is no need to run Start methods in a separate goroutine. (i.e. go kubeInformerFactory.Start(stopCh)
// Start method is non-blocking and runs all registered informers in a dedicated goroutine.
dynamicInformerFactory.Start(stopCh)
if !namespaced {
clusterDynamicInformerFactory.Start(stopCh)
}
kubeInformerFactory.Start(stopCh)
controllerNamespaceInformerFactory.Start(stopCh)
jobInformerFactory.Start(stopCh)

// Check if Istio installed on cluster before starting dynamicInformerFactory
if istioutil.DoesIstioExist(istioPrimaryDynamicClient, namespace) {
istioDynamicInformerFactory.Start(stopCh)
}
albIngressClasses,
dynamicInformerFactory,
clusterDynamicInformerFactory,
istioDynamicInformerFactory,
namespaced,
kubeInformerFactory,
controllerNamespaceInformerFactory,
jobInformerFactory)

if err = cm.Run(rolloutThreads, serviceThreads, ingressThreads, experimentThreads, analysisThreads, electOpts, stopCh); err != nil {
if err = cm.Run(ctx, rolloutThreads, serviceThreads, ingressThreads, experimentThreads, analysisThreads, electOpts); err != nil {
log.Fatalf("Error running controller: %s", err.Error())
}
return nil
Expand Down
Loading