Skip to content

Commit

Permalink
Compact more than 2 blocks at a time (#348)
Browse files Browse the repository at this point in the history
* Update timeWindowBlockSelector to compact more than 2 blocks at a time, configurable min/max limits

* Update changelog

* Fix block selector to return any number of blocks between min and max

* Fix block selector to evaluate all window blocks and case where minimum block count was not honored
  • Loading branch information
mdisibio authored Nov 19, 2020
1 parent 2b34ea3 commit 143bd10
Show file tree
Hide file tree
Showing 5 changed files with 250 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

* [CHANGE] From path.Join to filepath.Join [#338](https://github.com/grafana/tempo/pull/338)
* [CHANGE] Upgrade Cortex from v1.3.0 to v.1.4.0 [#341](https://github.com/grafana/tempo/pull/341)
* [CHANGE] Compact more than 2 blocks at a time [#348](https://github.com/grafana/tempo/pull/348)
* [ENHANCEMENT] Add tempodb_compaction_objects_combined metric. [#339](https://github.com/grafana/tempo/pull/339)
* [BUGFIX] Frequent errors logged by compactor regarding meta not found [#327](https://github.com/grafana/tempo/pull/327)
* [BUGFIX] Fix distributors panicking on rollout [#343](https://github.com/grafana/tempo/pull/343)
Expand Down
44 changes: 30 additions & 14 deletions tempodb/compaction_block_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ type CompactionBlockSelector interface {
}

const (
activeWindowDuration = 24 * time.Hour
activeWindowDuration = 24 * time.Hour
defaultMinInputBlocks = 2
defaultMaxInputBlocks = 8
)

/*************************** Simple Block Selector **************************/
Expand Down Expand Up @@ -57,15 +59,19 @@ func (sbs *simpleBlockSelector) BlocksToCompact() ([]*encoding.BlockMeta, string

type timeWindowBlockSelector struct {
blocklist []*encoding.BlockMeta
MinInputBlocks int
MaxInputBlocks int
MaxCompactionRange time.Duration // Size of the time window - say 6 hours
MaxCompactionObjects int // maximum size of compacted objects
}

var _ (CompactionBlockSelector) = (*timeWindowBlockSelector)(nil)

func newTimeWindowBlockSelector(blocklist []*encoding.BlockMeta, maxCompactionRange time.Duration, maxCompactionObjects int) CompactionBlockSelector {
func newTimeWindowBlockSelector(blocklist []*encoding.BlockMeta, maxCompactionRange time.Duration, maxCompactionObjects int, minInputBlocks int, maxInputBlocks int) CompactionBlockSelector {
twbs := &timeWindowBlockSelector{
blocklist: append([]*encoding.BlockMeta(nil), blocklist...),
MinInputBlocks: minInputBlocks,
MaxInputBlocks: maxInputBlocks,
MaxCompactionRange: maxCompactionRange,
MaxCompactionObjects: maxCompactionObjects,
}
Expand Down Expand Up @@ -126,7 +132,7 @@ func (twbs *timeWindowBlockSelector) BlocksToCompact() ([]*encoding.BlockMeta, s
}

// did we find enough blocks?
if len(windowBlocks) >= inputBlocks {
if len(windowBlocks) >= twbs.MinInputBlocks {
var compactBlocks []*encoding.BlockMeta

// blocks in the currently active window
Expand All @@ -140,29 +146,31 @@ func (twbs *timeWindowBlockSelector) BlocksToCompact() ([]*encoding.BlockMeta, s
// the active window should be compacted by level
if activeWindow <= blockWindow {
// search forward for inputBlocks in a row that have the same compaction level
for i := 0; i+inputBlocks-1 < len(windowBlocks); i++ {
if windowBlocks[i].CompactionLevel == windowBlocks[i+inputBlocks-1].CompactionLevel {
compactBlocks = windowBlocks[i : i+inputBlocks]
// Gather as many as possible while staying within limits
for i := 0; i <= len(windowBlocks)-twbs.MinInputBlocks+1; i++ {
for j := i + 1; j <= len(windowBlocks)-1 &&
windowBlocks[i].CompactionLevel == windowBlocks[j].CompactionLevel &&
len(compactBlocks)+1 <= twbs.MaxInputBlocks &&
totalObjects(compactBlocks)+windowBlocks[j].TotalObjects <= twbs.MaxCompactionObjects; j++ {
compactBlocks = windowBlocks[i : j+1]
}
if len(compactBlocks) > 0 {
// Found a stripe of blocks
break
}
}

compact = false
if len(compactBlocks) > 0 {
if len(compactBlocks) >= twbs.MinInputBlocks {
compact = true
hashString = fmt.Sprintf("%v-%v-%v", compactBlocks[0].TenantID, compactBlocks[0].CompactionLevel, currentWindow)
}
} else { // all other windows will be compacted using their two smallest blocks
compactBlocks = windowBlocks[:inputBlocks]
compactBlocks = windowBlocks[:twbs.MinInputBlocks]
hashString = fmt.Sprintf("%v-%v", compactBlocks[0].TenantID, currentWindow)
}

// are they small enough
totalObjects := 0
for _, block := range compactBlocks {
totalObjects += block.TotalObjects
}
if totalObjects > twbs.MaxCompactionObjects {
if totalObjects(compactBlocks) > twbs.MaxCompactionObjects {
compact = false
}

Expand Down Expand Up @@ -191,6 +199,14 @@ func (twbs *timeWindowBlockSelector) BlocksToCompact() ([]*encoding.BlockMeta, s
return nil, ""
}

func totalObjects(blocks []*encoding.BlockMeta) int {
totalObjects := 0
for _, b := range blocks {
totalObjects += b.TotalObjects
}
return totalObjects
}

func (twbs *timeWindowBlockSelector) windowForBlock(meta *encoding.BlockMeta) int64 {
return twbs.windowForTime(meta.EndTime)
}
Expand Down
218 changes: 216 additions & 2 deletions tempodb/compaction_block_selector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ func TestTimeWindowBlockSelectorBlocksToCompact(t *testing.T) {
tests := []struct {
name string
blocklist []*encoding.BlockMeta
minInputBlocks int // optional, defaults to global const
maxInputBlocks int // optional, defaults to global const
expected []*encoding.BlockMeta
expectedHash string
expectedSecond []*encoding.BlockMeta
Expand Down Expand Up @@ -76,6 +78,7 @@ func TestTimeWindowBlockSelectorBlocksToCompact(t *testing.T) {
EndTime: now,
},
},
maxInputBlocks: 2,
expected: []*encoding.BlockMeta{
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"),
Expand Down Expand Up @@ -157,6 +160,7 @@ func TestTimeWindowBlockSelectorBlocksToCompact(t *testing.T) {
TotalObjects: 12,
},
},
maxInputBlocks: 2,
expected: []*encoding.BlockMeta{
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"),
Expand Down Expand Up @@ -336,11 +340,221 @@ func TestTimeWindowBlockSelectorBlocksToCompact(t *testing.T) {
},
expectedHash2: fmt.Sprintf("%v-%v-%v", tenantID, 0, now.Add(-timeWindow).Unix()),
},
{
name: "doesn't choose across time windows",
blocklist: []*encoding.BlockMeta{
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000001"),
EndTime: now,
},
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"),
EndTime: now.Add(-timeWindow),
},
},
expected: nil,
expectedHash: "",
expectedSecond: nil,
expectedHash2: "",
},
{
name: "doesn't exceed max compaction objects",
blocklist: []*encoding.BlockMeta{
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000001"),
TotalObjects: 99,
EndTime: now,
},
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"),
TotalObjects: 2,
EndTime: now,
},
},
expected: nil,
expectedHash: "",
expectedSecond: nil,
expectedHash2: "",
},
{
name: "Returns as many blocks as possible without exceeding max compaction objects",
blocklist: []*encoding.BlockMeta{
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000001"),
TotalObjects: 50,
EndTime: now,
},
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"),
TotalObjects: 50,
EndTime: now,
},
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000003"),
TotalObjects: 50,
EndTime: now,
}},
expected: []*encoding.BlockMeta{
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000001"),
TotalObjects: 50,
EndTime: now,
},
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"),
TotalObjects: 50,
EndTime: now,
},
},
expectedHash: fmt.Sprintf("%v-%v-%v", tenantID, 0, now.Unix()),
expectedSecond: nil,
expectedHash2: "",
},
{
// First compaction gets 3 blocks, second compaction gets 2 more
name: "choose more than 2 blocks",
maxInputBlocks: 3,
blocklist: []*encoding.BlockMeta{
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000001"),
EndTime: now,
TotalObjects: 1,
},
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"),
EndTime: now,
TotalObjects: 2,
},
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000003"),
EndTime: now,
TotalObjects: 3,
},
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000004"),
EndTime: now,
TotalObjects: 4,
},
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000005"),
EndTime: now,
TotalObjects: 5,
},
},
expected: []*encoding.BlockMeta{
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000001"),
EndTime: now,
TotalObjects: 1,
},
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"),
EndTime: now,
TotalObjects: 2,
},
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000003"),
EndTime: now,
TotalObjects: 3,
},
},
expectedHash: fmt.Sprintf("%v-%v-%v", tenantID, 0, now.Unix()),
expectedSecond: []*encoding.BlockMeta{
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000004"),
EndTime: now,
TotalObjects: 4,
},
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000005"),
EndTime: now,
TotalObjects: 5,
},
},
expectedHash2: fmt.Sprintf("%v-%v-%v", tenantID, 0, now.Unix()),
},
{
name: "honors minimum block count",
blocklist: []*encoding.BlockMeta{
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000001"),
EndTime: now,
},
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"),
EndTime: now,
},
},
minInputBlocks: 3,
maxInputBlocks: 3,
expected: nil,
expectedHash: "",
expectedSecond: nil,
expectedHash2: "",
},
{
name: "can choose blocks not at the lowest compaction level",
blocklist: []*encoding.BlockMeta{
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000001"),
EndTime: now,
CompactionLevel: 0,
},
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"),
EndTime: now,
CompactionLevel: 1,
},
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000003"),
EndTime: now,
CompactionLevel: 1,
},
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000004"),
EndTime: now,
CompactionLevel: 1,
},
},
minInputBlocks: 3,
maxInputBlocks: 3,
expected: []*encoding.BlockMeta{
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"),
EndTime: now,
CompactionLevel: 1,
},
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000003"),
EndTime: now,
CompactionLevel: 1,
},
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000004"),
EndTime: now,
CompactionLevel: 1,
},
},
expectedHash: fmt.Sprintf("%v-%v-%v", tenantID, 1, now.Unix()),
expectedSecond: nil,
expectedHash2: "",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
selector := newTimeWindowBlockSelector(tt.blocklist, time.Second, 100)

min := defaultMinInputBlocks
if tt.minInputBlocks > 0 {
min = tt.minInputBlocks
}

max := defaultMaxInputBlocks
if tt.maxInputBlocks > 0 {
max = tt.maxInputBlocks
}

selector := newTimeWindowBlockSelector(tt.blocklist, time.Second, 100, min, max)

actual, hash := selector.BlocksToCompact()
assert.Equal(t, tt.expected, actual)
Expand Down Expand Up @@ -530,7 +744,7 @@ func TestTimeWindowBlockSelectorSort(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
selector := newTimeWindowBlockSelector(tt.blocklist, timeWindow, 100)
selector := newTimeWindowBlockSelector(tt.blocklist, timeWindow, 100, defaultMinInputBlocks, defaultMaxInputBlocks)
actual := selector.(*timeWindowBlockSelector).blocklist
assert.Equal(t, tt.expected, actual)
})
Expand Down
2 changes: 1 addition & 1 deletion tempodb/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (rw *readerWriter) doCompaction() {
tenantID := tenants[rand.Intn(len(tenants))].(string)
blocklist := rw.blocklist(tenantID)

blockSelector := newTimeWindowBlockSelector(blocklist, rw.compactorCfg.MaxCompactionRange, rw.compactorCfg.MaxCompactionObjects)
blockSelector := newTimeWindowBlockSelector(blocklist, rw.compactorCfg.MaxCompactionRange, rw.compactorCfg.MaxCompactionObjects, defaultMinInputBlocks, defaultMaxInputBlocks)

start := time.Now()

Expand Down
4 changes: 2 additions & 2 deletions tempodb/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func TestCompaction(t *testing.T) {
rw.pollBlocklist()

blocklist := rw.blocklist(testTenantID)
blockSelector := newTimeWindowBlockSelector(blocklist, rw.compactorCfg.MaxCompactionRange, 10000)
blockSelector := newTimeWindowBlockSelector(blocklist, rw.compactorCfg.MaxCompactionRange, 10000, defaultMinInputBlocks, 2)

expectedCompactions := len(blocklist) / inputBlocks
compactions := 0
Expand Down Expand Up @@ -230,7 +230,7 @@ func TestSameIDCompaction(t *testing.T) {

var blocks []*encoding.BlockMeta
blocklist := rw.blocklist(testTenantID)
blockSelector := newTimeWindowBlockSelector(blocklist, rw.compactorCfg.MaxCompactionRange, 10000)
blockSelector := newTimeWindowBlockSelector(blocklist, rw.compactorCfg.MaxCompactionRange, 10000, defaultMinInputBlocks, 2)
blocks, _ = blockSelector.BlocksToCompact()
assert.Len(t, blocks, inputBlocks)

Expand Down

0 comments on commit 143bd10

Please sign in to comment.