Skip to content

Commit

Permalink
feat: richer prometheus stats and Kubernetes events (#1115)
Browse files Browse the repository at this point in the history
Signed-off-by: Jesse Suen <[email protected]>
  • Loading branch information
jessesuen authored May 13, 2021
1 parent 698beb5 commit f5e2d82
Show file tree
Hide file tree
Showing 71 changed files with 1,479 additions and 1,306 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,17 @@ More information:
To avoid the Argo CD OutOfSync conditions, you can remove `spec.preserveUnknownFields` from the manifests
entirely *after upgrading from v0.10*.

Alternatively, you can instruct Argo CD to ignore differences using ignoreDifferences in the Application spec:

```yaml
spec:
ignoreDifferences:
- group: apiextensions.k8s.io
kind: CustomResourceDefinition
jsonPointers:
- /spec/preserveUnknownFields
```
## Changes since v0.10
### Controller
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ plugin-image:
if [ "$(DOCKER_PUSH)" = "true" ] ; then docker push $(IMAGE_PREFIX)kubectl-argo-rollouts:$(IMAGE_TAG) ; fi

.PHONY: lint
lint:
lint: go-mod-vendor
golangci-lint run --fix

.PHONY: test
Expand Down
41 changes: 19 additions & 22 deletions analysis/analysis.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
analysisutil "github.com/argoproj/argo-rollouts/utils/analysis"
"github.com/argoproj/argo-rollouts/utils/defaults"
logutil "github.com/argoproj/argo-rollouts/utils/log"
"github.com/argoproj/argo-rollouts/utils/record"
)

const (
Expand All @@ -26,12 +27,6 @@ const (
DefaultErrorRetryInterval time.Duration = 10 * time.Second
)

// Event reasons for analysis events
const (
EventReasonStatusFailed = "Failed"
EventReasonStatusCompleted = "Complete"
)

// metricTask holds the metric which need to be measured during this reconciliation along with
// an in-progress measurement
type metricTask struct {
Expand All @@ -52,7 +47,7 @@ func (c *Controller) reconcileAnalysisRun(origRun *v1alpha1.AnalysisRun) *v1alph
log.Warn(message)
run.Status.Phase = v1alpha1.AnalysisPhaseError
run.Status.Message = message
c.recorder.Eventf(run, corev1.EventTypeWarning, EventReasonStatusFailed, "analysis completed %s", run.Status.Phase)
c.recordAnalysisRunCompletionEvent(run)
return run
}
run.Spec.Metrics = metrics
Expand All @@ -65,7 +60,7 @@ func (c *Controller) reconcileAnalysisRun(origRun *v1alpha1.AnalysisRun) *v1alph
log.Warn(message)
run.Status.Phase = v1alpha1.AnalysisPhaseError
run.Status.Message = message
c.recorder.Eventf(run, corev1.EventTypeWarning, EventReasonStatusFailed, "analysis completed %s", run.Status.Phase)
c.recordAnalysisRunCompletionEvent(run)
return run
}
}
Expand All @@ -78,24 +73,17 @@ func (c *Controller) reconcileAnalysisRun(origRun *v1alpha1.AnalysisRun) *v1alph
log.Warn(message)
run.Status.Phase = v1alpha1.AnalysisPhaseError
run.Status.Message = message
c.recorder.Eventf(run, corev1.EventTypeWarning, EventReasonStatusFailed, "analysis completed %s", run.Status.Phase)
c.recordAnalysisRunCompletionEvent(run)
return run
}

newStatus, newMessage := c.assessRunStatus(run)
if newStatus != run.Status.Phase {
message := fmt.Sprintf("analysis transitioned from %s -> %s", run.Status.Phase, newStatus)
if newStatus.Completed() {
switch newStatus {
case v1alpha1.AnalysisPhaseError, v1alpha1.AnalysisPhaseFailed:
c.recorder.Eventf(run, corev1.EventTypeWarning, EventReasonStatusFailed, "analysis completed %s", newStatus)
default:
c.recorder.Eventf(run, corev1.EventTypeNormal, EventReasonStatusCompleted, "analysis completed %s", newStatus)
}
}
log.Info(message)
run.Status.Phase = newStatus
run.Status.Message = newMessage
if newStatus.Completed() {
c.recordAnalysisRunCompletionEvent(run)
}
}

err = c.garbageCollectMeasurements(run, DefaultMeasurementHistoryLimit)
Expand All @@ -116,6 +104,15 @@ func (c *Controller) reconcileAnalysisRun(origRun *v1alpha1.AnalysisRun) *v1alph
return run
}

func (c *Controller) recordAnalysisRunCompletionEvent(run *v1alpha1.AnalysisRun) {
eventType := corev1.EventTypeNormal
switch run.Status.Phase {
case v1alpha1.AnalysisPhaseError, v1alpha1.AnalysisPhaseFailed:
eventType = corev1.EventTypeWarning
}
c.recorder.Eventf(run, record.EventOptions{EventType: eventType, EventReason: "AnalysisRun" + string(run.Status.Phase)}, "analysis completed %s", run.Status.Phase)
}

// generateMetricTasks generates a list of metrics tasks needed to be measured as part of this
// sync, based on the last completion times that metric was measured (if ever). If the run is
// terminating (e.g. due to manual termination or failing metric), will not schedule further
Expand Down Expand Up @@ -381,12 +378,12 @@ func (c *Controller) assessRunStatus(run *v1alpha1.AnalysisRun) (v1alpha1.Analys
if result.Phase != metricStatus {
log.Infof("metric transitioned from %s -> %s", result.Phase, metricStatus)
if metricStatus.Completed() {
eventType := corev1.EventTypeNormal
switch metricStatus {
case v1alpha1.AnalysisPhaseError, v1alpha1.AnalysisPhaseFailed:
c.recorder.Eventf(run, corev1.EventTypeWarning, EventReasonStatusFailed, "metric '%s' completed %s", metric.Name, metricStatus)
default:
c.recorder.Eventf(run, corev1.EventTypeNormal, EventReasonStatusCompleted, "metric '%s' completed %s", metric.Name, metricStatus)
eventType = corev1.EventTypeWarning
}
c.recorder.Eventf(run, record.EventOptions{EventType: eventType, EventReason: "Metric" + string(metricStatus)}, "metric '%s' completed %s", metric.Name, metricStatus)
}
if lastMeasurement := analysisutil.LastMeasurement(run, metric.Name); lastMeasurement != nil {
result.Message = lastMeasurement.Message
Expand Down
2 changes: 1 addition & 1 deletion analysis/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
batchinformers "k8s.io/client-go/informers/batch/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"

"github.com/argoproj/argo-rollouts/controller/metrics"
Expand All @@ -22,6 +21,7 @@ import (
listers "github.com/argoproj/argo-rollouts/pkg/client/listers/rollouts/v1alpha1"
controllerutil "github.com/argoproj/argo-rollouts/utils/controller"
logutil "github.com/argoproj/argo-rollouts/utils/log"
"github.com/argoproj/argo-rollouts/utils/record"
)

// Controller is the controller implementation for Analysis resources
Expand Down
4 changes: 2 additions & 2 deletions analysis/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
k8sfake "k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"

"github.com/argoproj/argo-rollouts/controller/metrics"
Expand All @@ -27,6 +26,7 @@ import (
"github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
"github.com/argoproj/argo-rollouts/pkg/client/clientset/versioned/fake"
informers "github.com/argoproj/argo-rollouts/pkg/client/informers/externalversions"
"github.com/argoproj/argo-rollouts/utils/record"
)

var (
Expand Down Expand Up @@ -102,7 +102,7 @@ func (f *fixture) newController(resync resyncFunc) (*Controller, informers.Share
ResyncPeriod: resync(),
AnalysisRunWorkQueue: analysisRunWorkqueue,
MetricsServer: metricsServer,
Recorder: &record.FakeRecorder{},
Recorder: record.NewFakeEventRecorder(),
})

c.enqueueAnalysis = func(obj interface{}) {
Expand Down
2 changes: 1 addition & 1 deletion cmd/kubectl-argo-rollouts/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
_ "k8s.io/client-go/plugin/pkg/client/auth/azure"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
_ "k8s.io/client-go/plugin/pkg/client/auth/oidc"
"k8s.io/klog"
"k8s.io/klog/v2"

"github.com/argoproj/argo-rollouts/pkg/kubectl-argo-rollouts/cmd"
"github.com/argoproj/argo-rollouts/pkg/kubectl-argo-rollouts/options"
Expand Down
19 changes: 5 additions & 14 deletions cmd/rollouts-controller/main.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package main

import (
"flag"
"fmt"
"os"
"strconv"
"time"

smiclientset "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/split/clientset/versioned"
Expand All @@ -20,7 +18,6 @@ import (
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
_ "k8s.io/client-go/plugin/pkg/client/auth/oidc"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog"

"github.com/argoproj/argo-rollouts/controller"
"github.com/argoproj/argo-rollouts/controller/metrics"
Expand All @@ -34,9 +31,10 @@ import (
"github.com/argoproj/argo-rollouts/utils/defaults"
"github.com/argoproj/argo-rollouts/utils/istio"
istioutil "github.com/argoproj/argo-rollouts/utils/istio"
kubeclientmetrics "github.com/argoproj/argo-rollouts/utils/kubeclientmetrics"
logutil "github.com/argoproj/argo-rollouts/utils/log"
"github.com/argoproj/argo-rollouts/utils/tolerantinformer"
"github.com/argoproj/argo-rollouts/utils/version"
"github.com/argoproj/pkg/kubeclientmetrics"
)

const (
Expand All @@ -49,7 +47,7 @@ func newCommand() *cobra.Command {
clientConfig clientcmd.ClientConfig
rolloutResyncPeriod int64
logLevel string
glogLevel int
klogLevel int
metricsPort int
instanceID string
rolloutThreads int
Expand Down Expand Up @@ -79,7 +77,7 @@ func newCommand() *cobra.Command {
FullTimestamp: true,
}
log.SetFormatter(formatter)
setGLogLevel(glogLevel)
logutil.SetKLogLevel(klogLevel)
log.WithField("version", version.GetVersion()).Info("Argo Rollouts starting")

// set up signals so we handle the first shutdown signal gracefully
Expand Down Expand Up @@ -189,7 +187,7 @@ func newCommand() *cobra.Command {
command.Flags().Int64Var(&rolloutResyncPeriod, "rollout-resync", controller.DefaultRolloutResyncPeriod, "Time period in seconds for rollouts resync.")
command.Flags().BoolVar(&namespaced, "namespaced", false, "runs controller in namespaced mode (does not require cluster RBAC)")
command.Flags().StringVar(&logLevel, "loglevel", "info", "Set the logging level. One of: debug|info|warn|error")
command.Flags().IntVar(&glogLevel, "gloglevel", 0, "Set the glog logging level")
command.Flags().IntVar(&klogLevel, "kloglevel", 0, "Set the klog logging level")
command.Flags().IntVar(&metricsPort, "metricsport", controller.DefaultMetricsPort, "Set the port the metrics endpoint should be exposed over")
command.Flags().StringVar(&instanceID, "instance-id", "", "Indicates which argo rollout objects the controller should operate on")
command.Flags().IntVar(&rolloutThreads, "rollout-threads", controller.DefaultRolloutThreads, "Set the number of worker threads for the Rollout controller")
Expand Down Expand Up @@ -233,13 +231,6 @@ func setLogLevel(logLevel string) {
log.SetLevel(level)
}

// setGLogLevel set the glog level for the k8s go-client
func setGLogLevel(glogLevel int) {
klog.InitFlags(nil)
_ = flag.Set("logtostderr", "true")
_ = flag.Set("v", strconv.Itoa(glogLevel))
}

func checkError(err error) {
if err != nil {
log.Fatal(err)
Expand Down
29 changes: 11 additions & 18 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/pkg/errors"
smiclientset "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/split/clientset/versioned"
log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
Expand All @@ -20,9 +19,7 @@ import (
extensionsinformers "k8s.io/client-go/informers/extensions/v1beta1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"

"github.com/argoproj/argo-rollouts/analysis"
Expand All @@ -34,10 +31,9 @@ import (
informers "github.com/argoproj/argo-rollouts/pkg/client/informers/externalversions/rollouts/v1alpha1"
"github.com/argoproj/argo-rollouts/rollout"
"github.com/argoproj/argo-rollouts/service"
"github.com/argoproj/argo-rollouts/utils/record"
)

const controllerAgentName = "rollouts-controller"

const (
// DefaultRolloutResyncPeriod is the default time in seconds for rollout resync period
DefaultRolloutResyncPeriod = 15 * 60
Expand Down Expand Up @@ -123,20 +119,15 @@ func NewManager(
utilruntime.Must(rolloutscheme.AddToScheme(scheme.Scheme))
log.Info("Creating event broadcaster")

// Create event broadcaster
// Add argo-rollouts custom resources to the default Kubernetes Scheme so Events can be
// logged for argo-rollouts types.
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(log.Infof)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
metricsAddr := fmt.Sprintf("0.0.0.0:%d", metricsPort)
metricsServer := metrics.NewMetricsServer(metrics.ServerConfig{
Addr: metricsAddr,
RolloutLister: rolloutsInformer.Lister(),
AnalysisRunLister: analysisRunInformer.Lister(),
ExperimentLister: experimentsInformer.Lister(),
K8SRequestProvider: k8sRequestProvider,
Addr: metricsAddr,
RolloutLister: rolloutsInformer.Lister(),
AnalysisRunLister: analysisRunInformer.Lister(),
AnalysisTemplateLister: analysisTemplateInformer.Lister(),
ClusterAnalysisTemplateLister: clusterAnalysisTemplateInformer.Lister(),
ExperimentLister: experimentsInformer.Lister(),
K8SRequestProvider: k8sRequestProvider,
})

rolloutWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts")
Expand All @@ -147,6 +138,8 @@ func NewManager(

refResolver := rollout.NewInformerBasedWorkloadRefResolver(namespace, dynamicclientset, discoveryClient, rolloutWorkqueue, rolloutsInformer.Informer())

recorder := record.NewEventRecorder(kubeclientset, metrics.MetricRolloutEventsTotal)

rolloutController := rollout.NewController(rollout.ControllerConfig{
Namespace: namespace,
KubeClientSet: kubeclientset,
Expand Down Expand Up @@ -290,7 +283,7 @@ func (c *Manager) Run(rolloutThreadiness, serviceThreadiness, ingressThreadiness
err := c.metricsServer.ListenAndServe()
if err != nil {
err = errors.Wrap(err, "Starting Metric Server")
log.Fatal(err)
log.Error(err)
}
}()
<-stopCh
Expand Down
Loading

0 comments on commit f5e2d82

Please sign in to comment.