From d8cd9b9354dca3df0b10433a81d72007a630c8a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Wed, 25 Sep 2019 11:29:16 +0300 Subject: [PATCH] Refactor compactor constants, fix bucket column (#1561) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * compact: unify different time constants Use downsample.* constants where possible. Move the downsampling time ranges into constants and use them as well. Signed-off-by: Giedrius Statkevičius * bucket: refactor column calculation into compact Fix the column's name and name it UNTIL-DOWN because that is what it actually shows - time until the next downsampling. Move out the calculation into a separate function into the compact package. Ideally we could use the retention policies in this calculation as well but the `bucket` subcommand knows nothing about them :-( Signed-off-by: Giedrius Statkevičius * compact: fix issues with naming Reorder the constants and fix mistakes. Signed-off-by: Giedrius Statkevičius Signed-off-by: Ivan Kiselev --- cmd/thanos/bucket.go | 16 +++++++--------- cmd/thanos/downsample.go | 18 +++++++++--------- pkg/compact/compact.go | 17 ++++++++++++++++- pkg/compact/downsample/downsample.go | 6 ++++++ 4 files changed, 38 insertions(+), 19 deletions(-) diff --git a/cmd/thanos/bucket.go b/cmd/thanos/bucket.go index 9c68d9b2b2..6ab198fbc7 100644 --- a/cmd/thanos/bucket.go +++ b/cmd/thanos/bucket.go @@ -14,6 +14,7 @@ import ( "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/compact" extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/objstore/client" @@ -50,7 +51,7 @@ var ( sort.Strings(s) return s } - inspectColumns = []string{"ULID", "FROM", "UNTIL", "RANGE", "UNTIL-COMP", "#SERIES", "#SAMPLES", "#CHUNKS", "COMP-LEVEL", "COMP-FAILED", "LABELS", "RESOLUTION", "SOURCE"} + inspectColumns = []string{"ULID", "FROM", "UNTIL", "RANGE", "UNTIL-DOWN", "#SERIES", "#SAMPLES", "#CHUNKS", "COMP-LEVEL", "COMP-FAILED", "LABELS", "RESOLUTION", "SOURCE"} ) func registerBucket(m map[string]setupFunc, app *kingpin.Application, name string) { @@ -420,13 +421,10 @@ func printTable(blockMetas []*metadata.Meta, selectorLabels labels.Labels, sortB } timeRange := time.Duration((blockMeta.MaxTime - blockMeta.MinTime) * int64(time.Millisecond)) - // Calculate how long it takes until the next compaction. - untilComp := "-" - if blockMeta.Thanos.Downsample.Resolution == 0 { // data currently raw, downsample if range >= 40 hours - untilComp = (time.Duration(40*60*60*1000*time.Millisecond) - timeRange).String() - } - if blockMeta.Thanos.Downsample.Resolution == 5*60*1000 { // data currently 5m resolution, downsample if range >= 10 days - untilComp = (time.Duration(10*24*60*60*1000*time.Millisecond) - timeRange).String() + + untilDown := "-" + if until, err := compact.UntilNextDownsampling(blockMeta); err == nil { + untilDown = until.String() } var labels []string for _, key := range getKeysAlphabetically(blockMeta.Thanos.Labels) { @@ -438,7 +436,7 @@ func printTable(blockMetas []*metadata.Meta, selectorLabels labels.Labels, sortB line = append(line, time.Unix(blockMeta.MinTime/1000, 0).Format("02-01-2006 15:04:05")) line = append(line, time.Unix(blockMeta.MaxTime/1000, 0).Format("02-01-2006 15:04:05")) line = append(line, timeRange.String()) - line = append(line, untilComp) + line = append(line, untilDown) line = append(line, p.Sprintf("%d", blockMeta.Stats.NumSeries)) line = append(line, p.Sprintf("%d", blockMeta.Stats.NumSamples)) line = append(line, p.Sprintf("%d", blockMeta.Stats.NumChunks)) diff --git a/cmd/thanos/downsample.go b/cmd/thanos/downsample.go index 2f90f368ac..cb5a676533 100644 --- a/cmd/thanos/downsample.go +++ b/cmd/thanos/downsample.go @@ -170,13 +170,13 @@ func downsampleBucket( for _, m := range metas { switch m.Thanos.Downsample.Resolution { - case 0: + case downsample.ResLevel0: continue - case 5 * 60 * 1000: + case downsample.ResLevel1: for _, id := range m.Compaction.Sources { sources5m[id] = struct{}{} } - case 60 * 60 * 1000: + case downsample.ResLevel2: for _, id := range m.Compaction.Sources { sources1h[id] = struct{}{} } @@ -187,7 +187,7 @@ func downsampleBucket( for _, m := range metas { switch m.Thanos.Downsample.Resolution { - case 0: + case downsample.ResLevel0: missing := false for _, id := range m.Compaction.Sources { if _, ok := sources5m[id]; !ok { @@ -201,16 +201,16 @@ func downsampleBucket( // Only downsample blocks once we are sure to get roughly 2 chunks out of it. // NOTE(fabxc): this must match with at which block size the compactor creates downsampled // blocks. Otherwise we may never downsample some data. - if m.MaxTime-m.MinTime < 40*60*60*1000 { + if m.MaxTime-m.MinTime < downsample.DownsampleRange0 { continue } - if err := processDownsampling(ctx, logger, bkt, m, dir, 5*60*1000); err != nil { + if err := processDownsampling(ctx, logger, bkt, m, dir, downsample.ResLevel1); err != nil { metrics.downsampleFailures.WithLabelValues(compact.GroupKey(*m)).Inc() return errors.Wrap(err, "downsampling to 5 min") } metrics.downsamples.WithLabelValues(compact.GroupKey(*m)).Inc() - case 5 * 60 * 1000: + case downsample.ResLevel1: missing := false for _, id := range m.Compaction.Sources { if _, ok := sources1h[id]; !ok { @@ -224,10 +224,10 @@ func downsampleBucket( // Only downsample blocks once we are sure to get roughly 2 chunks out of it. // NOTE(fabxc): this must match with at which block size the compactor creates downsampled // blocks. Otherwise we may never downsample some data. - if m.MaxTime-m.MinTime < 10*24*60*60*1000 { + if m.MaxTime-m.MinTime < downsample.DownsampleRange1 { continue } - if err := processDownsampling(ctx, logger, bkt, m, dir, 60*60*1000); err != nil { + if err := processDownsampling(ctx, logger, bkt, m, dir, downsample.ResLevel2); err != nil { metrics.downsampleFailures.WithLabelValues(compact.GroupKey(*m)) return errors.Wrap(err, "downsampling to 60 min") } diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 506d2dada8..87f7dbd11b 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -162,10 +162,25 @@ func (c *Syncer) SyncMetas(ctx context.Context) error { } c.metrics.syncMetas.Inc() c.metrics.syncMetaDuration.Observe(time.Since(begin).Seconds()) - return err } +// UntilNextDownsampling calculates how long it will take until the next downsampling operation. +// Returns an error if there will be no downsampling. +func UntilNextDownsampling(m *metadata.Meta) (time.Duration, error) { + timeRange := time.Duration((m.MaxTime - m.MinTime) * int64(time.Millisecond)) + switch m.Thanos.Downsample.Resolution { + case downsample.ResLevel2: + return time.Duration(0), errors.New("no downsampling") + case downsample.ResLevel1: + return time.Duration(downsample.DownsampleRange1*time.Millisecond) - timeRange, nil + case downsample.ResLevel0: + return time.Duration(downsample.DownsampleRange0*time.Millisecond) - timeRange, nil + default: + panic(fmt.Errorf("invalid resolution %v", m.Thanos.Downsample.Resolution)) + } +} + func (c *Syncer) syncMetas(ctx context.Context) error { var wg sync.WaitGroup defer wg.Wait() diff --git a/pkg/compact/downsample/downsample.go b/pkg/compact/downsample/downsample.go index 3c1deb7ef2..7bd59b3e78 100644 --- a/pkg/compact/downsample/downsample.go +++ b/pkg/compact/downsample/downsample.go @@ -28,6 +28,12 @@ const ( ResLevel2 = int64(60 * 60 * 1000) // 1 hour in milliseconds ) +// Downsampling ranges i.e. after what time we start to downsample blocks (in seconds). +const ( + DownsampleRange0 = 40 * 60 * 60 * 1000 // 40 hours + DownsampleRange1 = 10 * 24 * 60 * 60 * 1000 // 10 days +) + // Downsample downsamples the given block. It writes a new block into dir and returns its ID. func Downsample( logger log.Logger,