Skip to content
This repository has been archived by the owner on Dec 20, 2023. It is now read-only.

Commit

Permalink
Add a generic labeled gauge for K8s plugin resources (#137)
Browse files Browse the repository at this point in the history
# TL;DR
Adds a generic labeled gauge for K8s plugin resources

## Type
 - [ ] Bug Fix
 - [x] Feature
 - [ ] Plugin

## Are all requirements met?

 - [x] Code completed
 - [x] Smoke tested
 - [x] Unit tests added
 - [x] Code documentation added
 - [x] Any pending items have an associated Issue

## Complete description
* We are adding a generic utility to emit gauge metrics in the `plugin_manager` using the labeled gauges now found in flytestdlib.
* Henceforth, a goroutine will be spun up that will create a `ResourceLevelMonitor` for each type that a plugin registers.
* Changing the existing gauge collecting utilty for FlyteWorkflow CRD objects to use the same labeled gauge.
* We may wish to combine these in the future but as their aggregations are different, we are keeping them separate for now.

## Tracking Issue
flyteorg/flyte#311

## Follow-up issue
NA
  • Loading branch information
wild-endeavor authored Jun 24, 2020
1 parent b327495 commit 562969e
Show file tree
Hide file tree
Showing 9 changed files with 395 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
package tools

import (
_ "github.com/alvaroloes/enumer"
_ "github.com/golangci/golangci-lint/cmd/golangci-lint"
_ "github.com/lyft/flytestdlib/cli/pflags"
_ "github.com/vektra/mockery/cmd/mockery"
_ "github.com/alvaroloes/enumer"
)
18 changes: 4 additions & 14 deletions flytepropeller/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -391,25 +391,12 @@ github.com/lyft/apimachinery v0.0.0-20191031200210-047e3ea32d7f/go.mod h1:llRdnz
github.com/lyft/datacatalog v0.2.1 h1:W7LsAjaS297iLCtSH9ZaAAG3YPofwkbbgIaqkfdeM0o=
github.com/lyft/datacatalog v0.2.1/go.mod h1:ktrPvzTDUwHO5Lv0hLH38zLHnOJ++rGoAO0iQ/sIPJ4=
github.com/lyft/flyteidl v0.17.0/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.17.9 h1:JXT9PovHqS9V3YN74x9zWT0kvIEL48c2uNoujF1KMes=
github.com/lyft/flyteidl v0.17.9/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.17.24 h1:N5mmk2/0062VjbIeUXLHWVZwkxGW20RdZtshaea2nL0=
github.com/lyft/flyteidl v0.17.24/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.17.32/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.17.34 h1:8ERT/8vY40dOPPJrdD8ossBb30WkvzUx/IAFMR/7+9U=
github.com/lyft/flyteidl v0.17.34/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteplugins v0.3.23 h1:cN6d6f1ZkoHw+HD4wFCSVFVv+sCSeyx13E+hXIYEDzo=
github.com/lyft/flyteplugins v0.3.23/go.mod h1:NDhdkOAn2q6p7YLq9a0/lxyS0dburoAEgipOY5TiO8A=
github.com/lyft/flyteplugins v0.3.28 h1:4YSjJyQUHFtVoQio4X3wYtS7WRIGdJQf9Wtcl75e+1w=
github.com/lyft/flyteplugins v0.3.28/go.mod h1:NDhdkOAn2q6p7YLq9a0/lxyS0dburoAEgipOY5TiO8A=
github.com/lyft/flyteplugins v0.3.29 h1:NN88yXv6sTouMVwQEgbP0A6k+uznGr00ZcKP6ZFUPrU=
github.com/lyft/flyteplugins v0.3.29/go.mod h1:HHO6KC/2z77n9o9KM697YvSP85IWDe6jl6tAIrMLqWU=
github.com/lyft/flyteplugins v0.3.34/go.mod h1:HHO6KC/2z77n9o9KM697YvSP85IWDe6jl6tAIrMLqWU=
github.com/lyft/flyteplugins v0.3.35 h1:9s2BrJ82RoTJa1Cy02vqQy+ajxS+d4MQAkuUFoaiCuQ=
github.com/lyft/flyteplugins v0.3.35/go.mod h1:Dk9rnPCbgR7rC9dNM49260TQ51TvRsljDBJ6uBjZ9ys=
github.com/lyft/flytestdlib v0.3.0/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU=
github.com/lyft/flytestdlib v0.3.3 h1:MkWXPkwQinh6MR3Yf5siZhmRSt9r4YmsF+5kvVVVedE=
github.com/lyft/flytestdlib v0.3.3/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU=
github.com/lyft/flytestdlib v0.3.7/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU=
github.com/lyft/flytestdlib v0.3.9 h1:NaKp9xkeWWwhVvqTOcR/FqlASy1N2gu/kN7PVe4S7YI=
github.com/lyft/flytestdlib v0.3.9/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU=
github.com/lyft/spark-on-k8s-operator v0.1.3 h1:rmke8lR2Oy8mvKXRhloKuEu7fgGuXepDxiBNiorVUFI=
Expand Down Expand Up @@ -582,6 +569,7 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
Expand Down Expand Up @@ -915,7 +903,9 @@ gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c h1:grhR+C34yXImVGp7EzNk+DTIk+323eIUWOmEevy6bDo=
gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
Expand Down
16 changes: 6 additions & 10 deletions flytepropeller/pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"runtime/pprof"
"time"

"github.com/lyft/flytestdlib/promutils/labeled"

"github.com/lyft/flytestdlib/contextutils"
"k8s.io/apimachinery/pkg/labels"

Expand Down Expand Up @@ -193,7 +195,7 @@ type ResourceLevelMonitor struct {

// System Observability: This is a labeled gauge that emits the current number of FlyteWorkflow objects in the informer. It is used
// to monitor current levels. It currently only splits by project/domain, not workflow status.
levels *prometheus.GaugeVec
levels labeled.Gauge

// The thing that we want to measure the current levels of
lister lister.FlyteWorkflowLister
Expand Down Expand Up @@ -235,15 +237,10 @@ func (r *ResourceLevelMonitor) collect(ctx context.Context) {
counts := r.countList(ctx, workflows)

// Emit labeled metrics, for each project/domain combination. This can be aggregated later with Prometheus queries.
metricKeys := []contextutils.Key{contextutils.ProjectKey, contextutils.DomainKey}
for project, val := range counts {
for domain, num := range val {
tempContext := contextutils.WithProjectDomain(ctx, project, domain)
gauge, err := r.levels.GetMetricWith(contextutils.Values(tempContext, metricKeys...))
if err != nil {
panic(err)
}
gauge.Set(float64(num))
r.levels.Set(tempContext, float64(num))
}
}
}
Expand Down Expand Up @@ -271,9 +268,8 @@ func NewResourceLevelMonitor(scope promutils.Scope, lister lister.FlyteWorkflowL
return &ResourceLevelMonitor{
Scope: scope,
CollectorTimer: scope.MustNewStopWatch("collection_cycle", "Measures how long it takes to run a collection", time.Millisecond),
levels: scope.MustNewGaugeVec("flyteworkflow", "Current FlyteWorkflow levels",
contextutils.ProjectKey.String(), contextutils.DomainKey.String()),
lister: lister,
levels: labeled.NewGauge("flyteworkflow", "Current FlyteWorkflow levels per instance of propeller", scope),
lister: lister,
}
}

Expand Down
10 changes: 6 additions & 4 deletions flytepropeller/pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"testing"
"time"

"github.com/lyft/flytestdlib/promutils/labeled"

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
listers "github.com/lyft/flytepropeller/pkg/client/listers/flyteworkflow/v1alpha1"
Expand Down Expand Up @@ -62,7 +64,7 @@ func (m mockWFLister) List(_ labels.Selector) (ret []*v1alpha1.FlyteWorkflow, er

func TestResourceLevelMonitor_collect(t *testing.T) {
scope := promutils.NewScope("testscope")
g := scope.MustNewGaugeVec("unittest", "testing", "project", "domain")
g := labeled.NewGauge("unittest", "testing", scope)
lm := &ResourceLevelMonitor{
Scope: scope,
CollectorTimer: scope.MustNewStopWatch("collection_cycle", "Measures how long it takes to run a collection", time.Millisecond),
Expand All @@ -74,10 +76,10 @@ func TestResourceLevelMonitor_collect(t *testing.T) {
var expected = `
# HELP testscope:unittest testing
# TYPE testscope:unittest gauge
testscope:unittest{domain="dev",project="proj"} 2
testscope:unittest{domain="dev",project="proj2"} 1
testscope:unittest{domain="dev",project="proj", task="",wf=""} 2
testscope:unittest{domain="dev",project="proj2", task="",wf=""} 1
`

err := testutil.CollectAndCompare(g, strings.NewReader(expected))
err := testutil.CollectAndCompare(g.GaugeVec, strings.NewReader(expected))
assert.NoError(t, err)
}
182 changes: 182 additions & 0 deletions flytepropeller/pkg/controller/nodes/task/k8s/plugin_collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package k8s

import (
"context"
"runtime/pprof"
"strings"
"sync"
"time"

"github.com/lyft/flytestdlib/logger"
"github.com/lyft/flytestdlib/promutils/labeled"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/cache"

"github.com/lyft/flytestdlib/contextutils"
"github.com/lyft/flytestdlib/promutils"
)

const resourceLevelMonitorCycleDuration = 10 * time.Second
const KindKey contextutils.Key = "kind"

// This object is responsible for emitting metrics that show the current number of a given K8s resource kind, cut by namespace.
// It needs to be kicked off. The periodicity is not currently configurable because it seems unnecessary. It will also
// a timer measuring how long it takes to run each measurement cycle.
type ResourceLevelMonitor struct {
Scope promutils.Scope

// Meta timer - this times each collection cycle to measure how long it takes to collect the levels GaugeVec below
CollectorTimer *labeled.StopWatch

// System Observability: This is a labeled gauge that emits the current number of objects in the informer. It is used
// to monitor current levels.
Levels *labeled.Gauge

// This informer will be used to get a list of the underlying objects that we want a tally of
sharedInformer cache.SharedIndexInformer

// The kind here will be used to differentiate all the metrics, we'll leave out group and version for now
gvk schema.GroupVersionKind

// We only want to create one of these ResourceLevelMonitor objects per K8s resource type. If one of these objects
// already exists because another plugin created it, we pass along the pointer to that older object. However, we don't
// want the collection goroutine to be kicked off multiple times.
once sync.Once
}

// The reason that we use namespace as the one and only thing to cut by is because it's the feature that we are sure that any
// K8s resource created by a plugin will have (as yet, Flyte doesn't have any plugins that create cluster level resources and
// it probably won't for a long time). We can't assume that all the operators and CRDs that Flyte will ever work with will have
// the exact same set of labels or annotations or owner references. The only thing we can really count on is namespace.
func (r *ResourceLevelMonitor) countList(ctx context.Context, objects []interface{}) map[string]int {
// Map of namespace to counts
counts := map[string]int{}

// Collect the object counts by namespace
for _, v := range objects {
metadata, err := meta.Accessor(v)
if err != nil {
logger.Errorf(ctx, "Error converting obj %v to an Accessor %s\n", v, err)
continue
}
counts[metadata.GetNamespace()]++
}

return counts
}

// The context here is expected to already have a value for the KindKey
func (r *ResourceLevelMonitor) collect(ctx context.Context) {
// Emit gauges at the namespace layer - since these are just K8s resources, we cannot be guaranteed to have the necessary
// information to derive project/domain
objects := r.sharedInformer.GetStore().List()
counts := r.countList(ctx, objects)

for ns, count := range counts {
withNamespaceCtx := contextutils.WithNamespace(ctx, ns)
r.Levels.Set(withNamespaceCtx, float64(count))
}
}

func (r *ResourceLevelMonitor) RunCollector(ctx context.Context) {
ticker := time.NewTicker(resourceLevelMonitorCycleDuration)
collectorCtx := contextutils.WithGoroutineLabel(ctx, "k8s-resource-level-monitor")
// Since the idea is that one of these objects is always only responsible for exactly one type of K8s resource, we
// can safely set the context here for that kind for all downstream usage
collectorCtx = context.WithValue(ctx, KindKey, strings.ToLower(r.gvk.Kind))

go func() {
defer ticker.Stop()
pprof.SetGoroutineLabels(collectorCtx)
r.sharedInformer.HasSynced()
logger.Infof(ctx, "K8s resource collector %s has synced", r.gvk.Kind)
for {
select {
case <-collectorCtx.Done():
return
case <-ticker.C:
t := r.CollectorTimer.Start(collectorCtx)
r.collect(collectorCtx)
t.Stop()
}
}
}()
}

func (r *ResourceLevelMonitor) RunCollectorOnce(ctx context.Context) {
r.once.Do(func() {
r.RunCollector(ctx)
})
}

// This struct is here to ensure that we do not create more than one of these monitors for a given GVK. It wouldn't necessarily break
// anything, but it's a waste of compute cycles to compute counts multiple times. This can happen if multiple plugins create the same
// underlying K8s resource type. If two plugins both created Pods (ie sidecar and container), without this we would launch two
// ResourceLevelMonitor's, have two goroutines spinning, etc.
type ResourceMonitorIndex struct {
lock *sync.Mutex
monitors map[schema.GroupVersionKind]*ResourceLevelMonitor

// These are declared here because this constructor will be called more than once, by different K8s resource types (Pods, SparkApps, OtherCrd, etc.)
// and metric names have to be unique. It felt more reasonable at time of writing to have one metric and have each resource type just be a label
// rather than one metric per type, but can revisit this down the road.
gauges map[promutils.Scope]*labeled.Gauge
stopwatches map[promutils.Scope]*labeled.StopWatch
}

func (r *ResourceMonitorIndex) GetOrCreateResourceLevelMonitor(ctx context.Context, scope promutils.Scope, si cache.SharedIndexInformer,
gvk schema.GroupVersionKind) *ResourceLevelMonitor {

logger.Infof(ctx, "Attempting to create K8s gauge emitter for kind %s/%s", gvk.Version, gvk.Kind)

r.lock.Lock()
defer r.lock.Unlock()

if r.monitors[gvk] != nil {
logger.Infof(ctx, "Monitor for resource type %s already created, skipping...", gvk.Kind)
return r.monitors[gvk]
}
logger.Infof(ctx, "Creating monitor for resource type %s...", gvk.Kind)

// Refer to the existing labels in main.go of this repo. For these guys, we need to add namespace and kind (the K8s resource name, pod, sparkapp, etc.)
additionalLabels := labeled.AdditionalLabelsOption{
Labels: []string{contextutils.NamespaceKey.String(), KindKey.String()},
}
if r.gauges[scope] == nil {
x := labeled.NewGauge("k8s_resources", "Current levels of K8s objects as seen from their informer caches", scope, additionalLabels)
r.gauges[scope] = &x
}
if r.stopwatches[scope] == nil {
x := labeled.NewStopWatch("k8s_collection_cycle", "Measures how long it takes to run a collection",
time.Millisecond, scope, additionalLabels)
r.stopwatches[scope] = &x
}

rm := &ResourceLevelMonitor{
Scope: scope,
CollectorTimer: r.stopwatches[scope],
Levels: r.gauges[scope],
sharedInformer: si,
gvk: gvk,
}
r.monitors[gvk] = rm

return rm
}

// This is a global variable to this file. At runtime, the NewResourceMonitorIndex() function should only be called once
// but can be called multiple times in unit tests.
var index *ResourceMonitorIndex

func NewResourceMonitorIndex() *ResourceMonitorIndex {
if index == nil {
index = &ResourceMonitorIndex{
lock: &sync.Mutex{},
monitors: make(map[schema.GroupVersionKind]*ResourceLevelMonitor),
gauges: make(map[promutils.Scope]*labeled.Gauge),
stopwatches: make(map[promutils.Scope]*labeled.StopWatch),
}
}
return index
}
Loading

0 comments on commit 562969e

Please sign in to comment.