Skip to content

Commit

Permalink
Take into account non-increasing retention
Browse files Browse the repository at this point in the history
Signed-off-by: Olivier Biesmans <[email protected]>
  • Loading branch information
Olivier Biesmans committed Apr 16, 2020
1 parent 4d635e5 commit 7da91fc
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 11 deletions.
8 changes: 4 additions & 4 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {
*acceptMalformedIndex,
*wait,
*generateMissingIndexCacheFiles,
map[compact.ResolutionLevel]time.Duration{
compact.RetentionPolicy{
compact.ResolutionLevelRaw: time.Duration(*retentionRaw),
compact.ResolutionLevel5m: time.Duration(*retention5m),
compact.ResolutionLevel1h: time.Duration(*retention1h),
Expand Down Expand Up @@ -192,7 +192,7 @@ func runCompact(
consistencyDelay time.Duration,
deleteDelay time.Duration,
haltOnError, acceptMalformedIndex, wait, generateMissingIndexCacheFiles bool,
retentionByResolution map[compact.ResolutionLevel]time.Duration,
retentionByResolution compact.RetentionPolicy,
component component.Component,
disableDownsampling bool,
maxCompactionLevel, blockSyncConcurrency int,
Expand Down Expand Up @@ -388,8 +388,8 @@ func runCompact(
}

compactMainFn := func() error {
// Remove blocks that are older than the retention policy and won't be needed in downsampling
if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, retentionByResolution.InitialRetentionPolicy()); err != nil {
// Remove blocks that are older than the retention policy and won't be needed in downsampling.
if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, sy.Metas(), retentionByResolution.InitialRetentionPolicy(), blocksMarkedForDeletion); err != nil {
return errors.Wrap(err, fmt.Sprintf("initial retention failed"))
}

Expand Down
24 changes: 24 additions & 0 deletions pkg/compact/retention.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package compact

import (
"context"
"math"
"time"

"github.com/go-kit/kit/log"
Expand All @@ -17,6 +18,8 @@ import (
"github.com/thanos-io/thanos/pkg/objstore"
)

type RetentionPolicy map[ResolutionLevel]time.Duration

// ApplyRetentionPolicyByResolution removes blocks depending on the specified retentionByResolution based on blocks MaxTime.
// A value of 0 disables the retention for its resolution.
func ApplyRetentionPolicyByResolution(
Expand Down Expand Up @@ -45,3 +48,24 @@ func ApplyRetentionPolicyByResolution(
level.Info(logger).Log("msg", "optional retention apply done")
return nil
}

// InitialRetentionPolicy calculates a RetentionPolicy that is safe to apply
// before compatction and downsampling take place.
func (rp RetentionPolicy) InitialRetentionPolicy() RetentionPolicy {
retention := time.Duration(0)

if rp[ResolutionLevelRaw] != 0 && rp[ResolutionLevel5m] != 0 && rp[ResolutionLevel1h] != 0 {
retention = time.Duration(
math.Max(
math.Max(
float64(rp[ResolutionLevelRaw]), float64(rp[ResolutionLevel5m])),
float64(rp[ResolutionLevel1h]),
),
)
}
return RetentionPolicy{
ResolutionLevelRaw: retention,
ResolutionLevel5m: retention,
ResolutionLevel1h: retention,
}
}
67 changes: 60 additions & 7 deletions pkg/compact/retention_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ func TestApplyRetentionPolicyByResolution(t *testing.T) {
for _, tt := range []struct {
name string
blocks []testBlock
retentionByResolution map[compact.ResolutionLevel]time.Duration
retentionByResolution compact.RetentionPolicy
want []string
wantErr bool
}{
{
"empty bucket",
[]testBlock{},
map[compact.ResolutionLevel]time.Duration{
compact.RetentionPolicy{
compact.ResolutionLevelRaw: 24 * time.Hour,
compact.ResolutionLevel5m: 7 * 24 * time.Hour,
compact.ResolutionLevel1h: 14 * 24 * time.Hour,
Expand Down Expand Up @@ -82,7 +82,7 @@ func TestApplyRetentionPolicyByResolution(t *testing.T) {
compact.ResolutionLevelRaw,
},
},
map[compact.ResolutionLevel]time.Duration{
compact.RetentionPolicy{
compact.ResolutionLevelRaw: 24 * time.Hour,
compact.ResolutionLevel5m: 0,
compact.ResolutionLevel1h: 0,
Expand Down Expand Up @@ -122,7 +122,7 @@ func TestApplyRetentionPolicyByResolution(t *testing.T) {
compact.ResolutionLevelRaw,
},
},
map[compact.ResolutionLevel]time.Duration{
compact.RetentionPolicy{
compact.ResolutionLevelRaw: 0,
compact.ResolutionLevel5m: 0,
compact.ResolutionLevel1h: 0,
Expand Down Expand Up @@ -157,7 +157,7 @@ func TestApplyRetentionPolicyByResolution(t *testing.T) {
compact.ResolutionLevel1h,
},
},
map[compact.ResolutionLevel]time.Duration{
compact.RetentionPolicy{
compact.ResolutionLevelRaw: 0,
compact.ResolutionLevel5m: 0,
compact.ResolutionLevel1h: 0,
Expand All @@ -179,7 +179,7 @@ func TestApplyRetentionPolicyByResolution(t *testing.T) {
compact.ResolutionLevel(1),
},
},
map[compact.ResolutionLevel]time.Duration{},
compact.RetentionPolicy{},
[]string{
"01CPHBEX20729MJQZXE3W0BW48/",
},
Expand Down Expand Up @@ -225,7 +225,7 @@ func TestApplyRetentionPolicyByResolution(t *testing.T) {
compact.ResolutionLevel1h,
},
},
map[compact.ResolutionLevel]time.Duration{
compact.RetentionPolicy{
compact.ResolutionLevelRaw: 24 * time.Hour,
compact.ResolutionLevel5m: 7 * 24 * time.Hour,
compact.ResolutionLevel1h: 14 * 24 * time.Hour,
Expand Down Expand Up @@ -301,3 +301,56 @@ func uploadMockBlock(t *testing.T, bkt objstore.Bucket, id string, minTime, maxT
testutil.Ok(t, bkt.Upload(context.Background(), id+"/chunks/000002", strings.NewReader("@test-data@")))
testutil.Ok(t, bkt.Upload(context.Background(), id+"/chunks/000003", strings.NewReader("@test-data@")))
}

func TestInitialRetentionPolicy(t *testing.T) {
for _, tt := range []struct {
name string
retentionByResolution compact.RetentionPolicy
want compact.RetentionPolicy
}{
{
"increasing",
compact.RetentionPolicy{
compact.ResolutionLevelRaw: time.Duration(1000),
compact.ResolutionLevel5m: time.Duration(5000),
compact.ResolutionLevel1h: time.Duration(10000),
},
compact.RetentionPolicy{
compact.ResolutionLevelRaw: time.Duration(10000),
compact.ResolutionLevel5m: time.Duration(10000),
compact.ResolutionLevel1h: time.Duration(10000),
},
},
{
"non-increasing",
compact.RetentionPolicy{
compact.ResolutionLevelRaw: time.Duration(10000),
compact.ResolutionLevel5m: time.Duration(5000),
compact.ResolutionLevel1h: time.Duration(1000),
},
compact.RetentionPolicy{
compact.ResolutionLevelRaw: time.Duration(10000),
compact.ResolutionLevel5m: time.Duration(10000),
compact.ResolutionLevel1h: time.Duration(10000),
},
},
{
"disabled",
compact.RetentionPolicy{
compact.ResolutionLevelRaw: time.Duration(1000),
compact.ResolutionLevel5m: time.Duration(0),
compact.ResolutionLevel1h: time.Duration(10000),
},
compact.RetentionPolicy{
compact.ResolutionLevelRaw: time.Duration(0),
compact.ResolutionLevel5m: time.Duration(0),
compact.ResolutionLevel1h: time.Duration(0),
},
},
} {
t.Run(tt.name, func(t *testing.T) {
testutil.Equals(t, tt.retentionByResolution.InitialRetentionPolicy(), tt.want)
})
}

}

0 comments on commit 7da91fc

Please sign in to comment.