Skip to content

Commit

Permalink
cache: capture metrics related to cache records and pruning
Browse files Browse the repository at this point in the history
Signed-off-by: Jonathan A. Sternberg <[email protected]>
  • Loading branch information
jsternberg committed Dec 27, 2023
1 parent 686c0ad commit 9b98442
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 11 deletions.
32 changes: 25 additions & 7 deletions cache/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
imagespecidentity "github.com/opencontainers/image-spec/identity"
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
"golang.org/x/sync/errgroup"
)

Expand All @@ -49,6 +51,7 @@ type ManagerOpt struct {
Differ diff.Comparer
MetadataStore *metadata.Store
MountPoolRoot string
MeterProvider metric.MeterProvider
}

type Accessor interface {
Expand Down Expand Up @@ -97,6 +100,7 @@ type cacheManager struct {

muPrune sync.Mutex // make sure parallel prune is not allowed so there will not be inconsistent results
unlazyG flightcontrol.Group[struct{}]
metrics *metrics
}

func NewManager(opt ManagerOpt) (Manager, error) {
Expand Down Expand Up @@ -124,6 +128,12 @@ func NewManager(opt ManagerOpt) (Manager, error) {

// cm.scheduleGC(5 * time.Minute)

mp := opt.MeterProvider
if mp == nil {
mp = noop.NewMeterProvider()
}
cm.metrics = newMetrics(cm, mp)

return cm, nil
}

Expand Down Expand Up @@ -339,6 +349,7 @@ func (cm *cacheManager) IdentityMapping() *idtools.IdentityMapping {
// method should be called after Close.
func (cm *cacheManager) Close() error {
// TODO: allocate internal context and cancel it here
_ = cm.metrics.Close()
return cm.MetadataStore.Close()
}

Expand Down Expand Up @@ -1000,17 +1011,24 @@ func (cm *cacheManager) createDiffRef(ctx context.Context, parents parentRefs, d
}

func (cm *cacheManager) Prune(ctx context.Context, ch chan client.UsageInfo, opts ...client.PruneInfo) error {
cm.muPrune.Lock()
if err := func() error {
record := cm.metrics.MeasurePrune()
defer record(ctx)

for _, opt := range opts {
if err := cm.pruneOnce(ctx, ch, opt); err != nil {
cm.muPrune.Unlock()
return err
cm.muPrune.Lock()
defer cm.muPrune.Unlock()

for _, opt := range opts {
if err := cm.pruneOnce(ctx, ch, opt); err != nil {
cm.muPrune.Unlock()
return err
}
}
return nil
}(); err != nil {
return err
}

cm.muPrune.Unlock()

if cm.GarbageCollect != nil {
if _, err := cm.GarbageCollect(ctx); err != nil {
return err
Expand Down
85 changes: 85 additions & 0 deletions cache/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package cache

import (
"context"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
)

const (
instrumentationName = "github.com/moby/buildkit/cache"
metricCacheRecords = "cache.records.count"
metricCachePruneDuration = "cache.prune.duration"
)

type metrics struct {
CacheRecords metric.Int64ObservableGauge
CachePruneDuration metric.Int64Histogram
meter metric.Meter
regs []metric.Registration
}

func newMetrics(cm *cacheManager, mp metric.MeterProvider) *metrics {
m := &metrics{}

var err error
m.meter = mp.Meter(instrumentationName)

m.CacheRecords, err = m.meter.Int64ObservableGauge(metricCacheRecords,
metric.WithDescription("Number of cache records."),
)
if err != nil {
otel.Handle(err)
}

m.CachePruneDuration, err = m.meter.Int64Histogram(metricCachePruneDuration,
metric.WithDescription("Measures the duration of cache prune operations."),
metric.WithUnit("ms"),
)
if err != nil {
otel.Handle(err)
}

reg, err := m.meter.RegisterCallback(cm.collectMetrics, m.CacheRecords)
if err != nil {
otel.Handle(err)
}
m.regs = append(m.regs, reg)

return m
}

func (m *metrics) MeasurePrune() (record func(ctx context.Context)) {
start := time.Now()
return func(ctx context.Context) {
dur := int64(time.Since(start) / time.Millisecond)
m.CachePruneDuration.Record(ctx, dur)
}
}

func (m *metrics) Close() error {
for _, reg := range m.regs {
_ = reg.Unregister()
}
return nil
}

type cacheStats struct {
NumRecords int64
}

func (cm *cacheManager) readStats() (stats cacheStats) {
cm.mu.Lock()
defer cm.mu.Unlock()

stats.NumRecords = int64(len(cm.records))
return stats
}

func (cm *cacheManager) collectMetrics(ctx context.Context, o metric.Observer) error {
stats := cm.readStats()
o.ObserveInt64(cm.metrics.CacheRecords, stats.NumRecords)
return nil
}
13 changes: 9 additions & 4 deletions cmd/buildkitd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ import (
"github.com/urfave/cli"
"go.etcd.io/bbolt"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/propagation"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
Expand All @@ -86,6 +87,9 @@ func init() {

// enable in memory recording for buildkitd traces
detect.Recorder = detect.NewTraceRecorder()

// register alternative handler for otel
otel.SetErrorHandler(bklog.OTELErrorHandler{})
}

var propagators = propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})
Expand All @@ -94,6 +98,7 @@ type workerInitializerOpt struct {
config *config.Config
sessionManager *session.Manager
traceSocket string
meterProvider metric.MeterProvider
}

type workerInitializer struct {
Expand Down Expand Up @@ -316,7 +321,7 @@ func main() {
os.RemoveAll(lockPath)
}()

controller, err := newController(c, &cfg)
controller, err := newController(c, &cfg, mp)
if err != nil {
return err
}
Expand Down Expand Up @@ -711,7 +716,7 @@ func serverCredentials(cfg config.TLSConfig) (*tls.Config, error) {
return tlsConf, nil
}

func newController(c *cli.Context, cfg *config.Config) (*control.Controller, error) {
func newController(c *cli.Context, cfg *config.Config, mp metric.MeterProvider) (*control.Controller, error) {
sessionManager, err := session.NewManager()
if err != nil {
return nil, err
Expand All @@ -734,6 +739,7 @@ func newController(c *cli.Context, cfg *config.Config) (*control.Controller, err
config: cfg,
sessionManager: sessionManager,
traceSocket: traceSocket,
meterProvider: mp,
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -922,8 +928,7 @@ type traceCollector struct {
}

func (t *traceCollector) Export(ctx context.Context, req *tracev1.ExportTraceServiceRequest) (*tracev1.ExportTraceServiceResponse, error) {
err := t.exporter.ExportSpans(ctx, transform.Spans(req.GetResourceSpans()))
if err != nil {
if err := t.exporter.ExportSpans(ctx, transform.Spans(req.GetResourceSpans())); err != nil {
return nil, err
}
return &tracev1.ExportTraceServiceResponse{}, nil
Expand Down
1 change: 1 addition & 0 deletions cmd/buildkitd/main_containerd_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ func containerdWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([
opt.GCPolicy = getGCPolicy(cfg.GCConfig, common.config.Root)
opt.BuildkitVersion = getBuildkitVersion()
opt.RegistryHosts = resolverFunc(common.config)
opt.MeterProvider = common.meterProvider

if platformsStr := cfg.Platforms; len(platformsStr) != 0 {
platforms, err := parsePlatforms(platformsStr)
Expand Down
1 change: 1 addition & 0 deletions cmd/buildkitd/main_oci_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ func ociWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([]worker
opt.GCPolicy = getGCPolicy(cfg.GCConfig, common.config.Root)
opt.BuildkitVersion = getBuildkitVersion()
opt.RegistryHosts = hosts
opt.MeterProvider = common.meterProvider

if platformsStr := cfg.Platforms; len(platformsStr) != 0 {
platforms, err := parsePlatforms(platformsStr)
Expand Down
6 changes: 6 additions & 0 deletions util/bklog/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,9 @@ func TraceLevelOnlyStack() string {
}
return ""
}

type OTELErrorHandler struct{}

func (o OTELErrorHandler) Handle(err error) {
G(context.Background()).Error(err)
}
3 changes: 3 additions & 0 deletions worker/base/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
digest "github.com/opencontainers/go-digest"
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"go.opentelemetry.io/otel/metric"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
)
Expand Down Expand Up @@ -81,6 +82,7 @@ type WorkerOpt struct {
MetadataStore *metadata.Store
MountPoolRoot string
ResourceMonitor *resources.Monitor
MeterProvider metric.MeterProvider
}

// Worker is a local worker instance with dedicated snapshotter, cache, and so on.
Expand Down Expand Up @@ -111,6 +113,7 @@ func NewWorker(ctx context.Context, opt WorkerOpt) (*Worker, error) {
Differ: opt.Differ,
MetadataStore: opt.MetadataStore,
MountPoolRoot: opt.MountPoolRoot,
MeterProvider: opt.MeterProvider,
})
if err != nil {
return nil, err
Expand Down

0 comments on commit 9b98442

Please sign in to comment.