Skip to content

Commit

Permalink
cache: capture metrics related to cache records and pruning
Browse files Browse the repository at this point in the history
Signed-off-by: Jonathan A. Sternberg <[email protected]>
  • Loading branch information
jsternberg committed Dec 8, 2023
1 parent c52719b commit 16ac232
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 15 deletions.
32 changes: 25 additions & 7 deletions cache/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
imagespecidentity "github.com/opencontainers/image-spec/identity"
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"go.opentelemetry.io/otel/metric"
"golang.org/x/sync/errgroup"
)

Expand All @@ -49,6 +50,7 @@ type ManagerOpt struct {
Differ diff.Comparer
MetadataStore *metadata.Store
MountPoolRoot string
MeterProvider metric.MeterProvider
}

type Accessor interface {
Expand Down Expand Up @@ -97,6 +99,7 @@ type cacheManager struct {

muPrune sync.Mutex // make sure parallel prune is not allowed so there will not be inconsistent results
unlazyG flightcontrol.Group[struct{}]
metrics *metrics
}

func NewManager(opt ManagerOpt) (Manager, error) {
Expand Down Expand Up @@ -124,6 +127,13 @@ func NewManager(opt ManagerOpt) (Manager, error) {

// cm.scheduleGC(5 * time.Minute)

if opt.MeterProvider != nil {
cm.metrics, err = newMetrics(cm, opt.MeterProvider)
if err != nil {
return nil, err
}
}

return cm, nil
}

Expand Down Expand Up @@ -339,6 +349,7 @@ func (cm *cacheManager) IdentityMapping() *idtools.IdentityMapping {
// method should be called after Close.
func (cm *cacheManager) Close() error {
// TODO: allocate internal context and cancel it here
_ = cm.metrics.Close()
return cm.MetadataStore.Close()
}

Expand Down Expand Up @@ -1000,17 +1011,24 @@ func (cm *cacheManager) createDiffRef(ctx context.Context, parents parentRefs, d
}

func (cm *cacheManager) Prune(ctx context.Context, ch chan client.UsageInfo, opts ...client.PruneInfo) error {
cm.muPrune.Lock()
if err := func() error {
record := cm.metrics.MeasurePrune()
defer record(ctx)

for _, opt := range opts {
if err := cm.pruneOnce(ctx, ch, opt); err != nil {
cm.muPrune.Unlock()
return err
cm.muPrune.Lock()
defer cm.muPrune.Unlock()

for _, opt := range opts {
if err := cm.pruneOnce(ctx, ch, opt); err != nil {
cm.muPrune.Unlock()
return err
}
}
return nil
}(); err != nil {
return err
}

cm.muPrune.Unlock()

if cm.GarbageCollect != nil {
if _, err := cm.GarbageCollect(ctx); err != nil {
return err
Expand Down
92 changes: 92 additions & 0 deletions cache/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package cache

import (
"context"
"time"

"go.opentelemetry.io/otel/metric"
)

const (
instrumentationName = "github.com/moby/buildkit/cache"
metricCacheRecords = "cache.records.count"
metricCachePruneDuration = "cache.prune.duration"
)

type metrics struct {
CacheRecords metric.Int64ObservableGauge
CachePruneDuration metric.Int64Histogram
meter metric.Meter
regs []metric.Registration
}

func newMetrics(cm *cacheManager, mp metric.MeterProvider) (*metrics, error) {
m := &metrics{}

var err error
m.meter = mp.Meter(instrumentationName)

m.CacheRecords, err = m.meter.Int64ObservableGauge(metricCacheRecords,
metric.WithDescription("Number of cache records."),
)
if err != nil {
return nil, err
}

m.CachePruneDuration, err = m.meter.Int64Histogram(metricCachePruneDuration,
metric.WithDescription("Measures the duration of cache prune operations."),
metric.WithUnit("ms"),
)
if err != nil {
return nil, err
}

reg, err := m.meter.RegisterCallback(cm.collectMetrics, m.CacheRecords)
if err != nil {
return nil, err
}
m.regs = append(m.regs, reg)

return m, nil
}

func (m *metrics) MeasurePrune() (record func(ctx context.Context)) {
if m == nil {
return func(_ context.Context) {}
}

start := time.Now()
return func(ctx context.Context) {
dur := int64(time.Since(start) / time.Millisecond)
m.CachePruneDuration.Record(ctx, dur)
}
}

func (m *metrics) Close() error {
if m == nil {
return nil
}

for _, reg := range m.regs {
_ = reg.Unregister()
}
return nil
}

type cacheStats struct {
NumRecords int64
}

func (cm *cacheManager) readStats() (stats cacheStats) {
cm.mu.Lock()
defer cm.mu.Unlock()

stats.NumRecords = int64(len(cm.records))
return stats
}

func (cm *cacheManager) collectMetrics(ctx context.Context, o metric.Observer) error {
stats := cm.readStats()
o.ObserveInt64(cm.metrics.CacheRecords, stats.NumRecords)
return nil
}
9 changes: 5 additions & 4 deletions cmd/buildkitd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ type workerInitializerOpt struct {
config *config.Config
sessionManager *session.Manager
traceSocket string
meterProvider metric.MeterProvider
}

type workerInitializer struct {
Expand Down Expand Up @@ -316,7 +317,7 @@ func main() {
os.RemoveAll(lockPath)
}()

controller, err := newController(c, &cfg)
controller, err := newController(c, &cfg, mp)
if err != nil {
return err
}
Expand Down Expand Up @@ -711,7 +712,7 @@ func serverCredentials(cfg config.TLSConfig) (*tls.Config, error) {
return tlsConf, nil
}

func newController(c *cli.Context, cfg *config.Config) (*control.Controller, error) {
func newController(c *cli.Context, cfg *config.Config, mp metric.MeterProvider) (*control.Controller, error) {
sessionManager, err := session.NewManager()
if err != nil {
return nil, err
Expand All @@ -734,6 +735,7 @@ func newController(c *cli.Context, cfg *config.Config) (*control.Controller, err
config: cfg,
sessionManager: sessionManager,
traceSocket: traceSocket,
meterProvider: mp,
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -922,8 +924,7 @@ type traceCollector struct {
}

func (t *traceCollector) Export(ctx context.Context, req *tracev1.ExportTraceServiceRequest) (*tracev1.ExportTraceServiceResponse, error) {
err := t.exporter.ExportSpans(ctx, transform.Spans(req.GetResourceSpans()))
if err != nil {
if err := t.exporter.ExportSpans(ctx, transform.Spans(req.GetResourceSpans())); err != nil {
return nil, err
}
return &tracev1.ExportTraceServiceResponse{}, nil
Expand Down
1 change: 1 addition & 0 deletions cmd/buildkitd/main_containerd_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ func containerdWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([
opt.GCPolicy = getGCPolicy(cfg.GCConfig, common.config.Root)
opt.BuildkitVersion = getBuildkitVersion()
opt.RegistryHosts = resolverFunc(common.config)
opt.MeterProvider = common.meterProvider

if platformsStr := cfg.Platforms; len(platformsStr) != 0 {
platforms, err := parsePlatforms(platformsStr)
Expand Down
1 change: 1 addition & 0 deletions cmd/buildkitd/main_oci_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ func ociWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([]worker
opt.GCPolicy = getGCPolicy(cfg.GCConfig, common.config.Root)
opt.BuildkitVersion = getBuildkitVersion()
opt.RegistryHosts = hosts
opt.MeterProvider = common.meterProvider

if platformsStr := cfg.Platforms; len(platformsStr) != 0 {
platforms, err := parsePlatforms(platformsStr)
Expand Down
8 changes: 4 additions & 4 deletions docker-bake.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,10 @@ target "lint" {
matrix = {
buildtags = [
{ name = "default", tags = "", target = "golangci-lint" },
{ name = "labs", tags = "dfrunsecurity dfparents", target = "golangci-lint" },
{ name = "nydus", tags = "nydus", target = "golangci-lint" },
{ name = "yaml", tags = "", target = "yamllint" },
{ name = "proto", tags = "", target = "protolint" },
//{ name = "labs", tags = "dfrunsecurity dfparents", target = "golangci-lint" },
//{ name = "nydus", tags = "nydus", target = "golangci-lint" },
//{ name = "yaml", tags = "", target = "yamllint" },
//{ name = "proto", tags = "", target = "protolint" },
]
}
}
Expand Down
3 changes: 3 additions & 0 deletions worker/base/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
digest "github.com/opencontainers/go-digest"
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"go.opentelemetry.io/otel/metric"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
)
Expand Down Expand Up @@ -81,6 +82,7 @@ type WorkerOpt struct {
MetadataStore *metadata.Store
MountPoolRoot string
ResourceMonitor *resources.Monitor
MeterProvider metric.MeterProvider
}

// Worker is a local worker instance with dedicated snapshotter, cache, and so on.
Expand Down Expand Up @@ -111,6 +113,7 @@ func NewWorker(ctx context.Context, opt WorkerOpt) (*Worker, error) {
Differ: opt.Differ,
MetadataStore: opt.MetadataStore,
MountPoolRoot: opt.MountPoolRoot,
MeterProvider: opt.MeterProvider,
})
if err != nil {
return nil, err
Expand Down

0 comments on commit 16ac232

Please sign in to comment.