Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V1.2 compaction error fix #1113

Merged
merged 4 commits into from
Nov 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* [ENHANCEMENT] Jsonnet: add `$._config.namespace` to filter by namespace in cortex metrics [#1098](https://github.com/grafana/tempo/pull/1098) (@mapno)
* [ENHANCEMENT] Add middleware to compress frontend HTTP responses with gzip if requested [#1080](https://github.com/grafana/tempo/pull/1080) (@kvrhdn, @zalegrala)
* [BUGFIX] Fix defaults for MaxBytesPerTrace (ingester.max-bytes-per-trace) and MaxSearchBytesPerTrace (ingester.max-search-bytes-per-trace) (@bitprocessor)
* [BUGFIX] Ignore empty objects during compaction [#1113](https://github.com/grafana/tempo/pull/1113) (@mdisibio)

## v1.2.0 / 2021-11-05
* [CHANGE] **BREAKING CHANGE** Drop support for v0 and v1 blocks. See [1.1 changelog](https://github.com/grafana/tempo/releases/tag/v1.1.0) for details [#919](https://github.com/grafana/tempo/pull/919) (@joe-elliott)
Expand Down
2 changes: 1 addition & 1 deletion tempodb/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (rw *readerWriter) compact(blockMetas []*backend.BlockMeta, tenantID string
compactionLevelLabel: compactionLevelLabel,
}

iter := encoding.NewMultiblockIterator(ctx, iters, rw.compactorCfg.IteratorBufferSize, combiner, dataEncoding)
iter := encoding.NewMultiblockIterator(ctx, iters, rw.compactorCfg.IteratorBufferSize, combiner, dataEncoding, rw.logger)
defer iter.Close()

for {
Expand Down
37 changes: 33 additions & 4 deletions tempodb/encoding/iterator_multiblock.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ package encoding
import (
"bytes"
"context"
"errors"
"encoding/hex"
"io"

"github.com/go-kit/log"
"github.com/go-kit/log/level"

"github.com/grafana/tempo/tempodb/encoding/common"

"github.com/uber-go/atomic"
Expand All @@ -18,6 +21,7 @@ type multiblockIterator struct {
resultsCh chan iteratorResult
quitCh chan struct{}
err atomic.Error
logger log.Logger
}

var _ Iterator = (*multiblockIterator)(nil)
Expand All @@ -29,12 +33,13 @@ type iteratorResult struct {

// NewMultiblockIterator Creates a new multiblock iterator. Iterates concurrently in a separate goroutine and results are buffered.
// Traces are deduped and combined using the object combiner.
func NewMultiblockIterator(ctx context.Context, inputs []Iterator, bufferSize int, combiner common.ObjectCombiner, dataEncoding string) Iterator {
func NewMultiblockIterator(ctx context.Context, inputs []Iterator, bufferSize int, combiner common.ObjectCombiner, dataEncoding string, logger log.Logger) Iterator {
i := multiblockIterator{
combiner: combiner,
dataEncoding: dataEncoding,
resultsCh: make(chan iteratorResult, bufferSize),
quitCh: make(chan struct{}, 1),
logger: logger,
}

for _, iter := range inputs {
Expand Down Expand Up @@ -120,8 +125,18 @@ func (i *multiblockIterator) iterate(ctx context.Context) {
}

if len(lowestID) == 0 || len(lowestObject) == 0 || lowestBookmark == nil {
i.err.Store(errors.New("failed to find a lowest object in compaction"))
return
// Skip empty objects or when the bookmarks failed to return an object.
// This intentional here because we concluded that the bookmarks have already
// been skipping most empties (but not all) and there is no reason to treat the
// unskipped edge cases differently. Edge cases:
// * Two empties in a row: the bookmark won't skip the second empty. Since
// we already skipped the first, go ahead and skip the second.
// * Last trace across all blocks is empty. In that case there is no next record
// for the bookmarks to skip to, and lowestBookmark remains nil. Since we
// already skipped every other empty, skip the last (but not least) entry.
// (todo: research needed to determine how empties get in the block)
level.Warn(i.logger).Log("msg", "multiblock iterator skipping empty object", "id", hex.EncodeToString(lowestID), "obj", lowestObject, "bookmark", lowestBookmark)
continue
}

// Copy slices allows data to escape the iterators
Expand Down Expand Up @@ -164,6 +179,20 @@ func newBookmark(iter Iterator) *bookmark {
}

func (b *bookmark) current(ctx context.Context) ([]byte, []byte, error) {
// This check is how the bookmark knows to iterate after being cleared,
// but it also unintentionally skips empty objects that somehow got in
// the block (b.currentObject is empty slice). Normal usage of the bookmark
// is to call done() and then current(). done() calls current() which reads iter.Next()
// and saves empty, it is then iterated again by a direct call to current(),
// which interprets the empty object as a cleared state and iterates again.
// This is mostly harmless and has been true historically for some time,
// which isn't great because it masks empty objects present in a block
// (todo: research needed to determine how they get there), but it's made worse
// in that the skip fails in some edge cases:
// * Two empty objects in a row: done()/current() will return the
// second empty object and not skip it, and this fails up the call chain.
// * Last object is empty: This is an issue in multiblock-iterator, see
// notes there.
if len(b.currentID) != 0 && len(b.currentObject) != 0 {
return b.currentID, b.currentObject, nil
}
Expand Down
48 changes: 44 additions & 4 deletions tempodb/encoding/iterator_multiblock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,19 @@ package encoding
import (
"context"
"io"
"os"
"testing"
"time"

"github.com/go-kit/log"

"github.com/grafana/tempo/tempodb/encoding/common"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
)

var testLogger log.Logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stdout))

var _ Iterator = (*testIterator)(nil)

// testIterator iterates over in-memory contents. Doesn't require tempodb or a block
Expand Down Expand Up @@ -81,7 +86,7 @@ func TestMultiblockSorts(t *testing.T) {
iterOdds.Add([]byte{3}, []byte{3}, nil)
iterOdds.Add([]byte{5}, []byte{5}, nil)

iter := NewMultiblockIterator(context.TODO(), []Iterator{iterEvens, iterOdds}, 10, nil, "")
iter := NewMultiblockIterator(context.TODO(), []Iterator{iterEvens, iterOdds}, 10, nil, "", testLogger)

count := 0
lastID := -1
Expand Down Expand Up @@ -132,7 +137,7 @@ func TestMultiblockIteratorCanBeCancelled(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)

// Create iterator and cancel/close it after 100ms
iter := NewMultiblockIterator(ctx, []Iterator{inner}, recordCount/2, nil, "")
iter := NewMultiblockIterator(ctx, []Iterator{inner}, recordCount/2, nil, "", testLogger)
time.Sleep(100 * time.Millisecond)
if tc.close {
iter.Close()
Expand Down Expand Up @@ -160,7 +165,7 @@ func TestMultiblockIteratorCanBeCancelled(t *testing.T) {
func TestMultiblockIteratorCanBeCancelledMultipleTimes(t *testing.T) {
inner := &testIterator{}

iter := NewMultiblockIterator(context.TODO(), []Iterator{inner}, 1, nil, "")
iter := NewMultiblockIterator(context.TODO(), []Iterator{inner}, 1, nil, "", testLogger)

iter.Close()
iter.Close()
Expand All @@ -178,7 +183,7 @@ func TestMultiblockIteratorPropogatesErrors(t *testing.T) {
inner2.Add([]byte{2}, []byte{2}, nil)
inner2.Add([]byte{3}, []byte{3}, nil)

iter := NewMultiblockIterator(ctx, []Iterator{inner, inner2}, 10, nil, "")
iter := NewMultiblockIterator(ctx, []Iterator{inner, inner2}, 10, nil, "", testLogger)

_, _, err := iter.Next(ctx)
require.NoError(t, err)
Expand All @@ -187,3 +192,38 @@ func TestMultiblockIteratorPropogatesErrors(t *testing.T) {

require.Equal(t, io.ErrClosedPipe, err)
}

func TestMultiblockIteratorSkipsEmptyObjects(t *testing.T) {
ctx := context.TODO()

// Empty objects a beginning, middle, and end.
inner := &testIterator{}
inner.Add([]byte{1}, []byte{}, nil)
inner.Add([]byte{2}, []byte{2}, nil)
inner.Add([]byte{3}, []byte{3}, nil)
inner.Add([]byte{4}, []byte{}, nil) // Two empties in a row
inner.Add([]byte{5}, []byte{}, nil)
inner.Add([]byte{6}, []byte{6}, nil)
inner.Add([]byte{7}, []byte{7}, nil)
inner.Add([]byte{8}, []byte{}, nil)

expected := []struct {
id common.ID
obj []byte
err error
}{
{[]byte{2}, []byte{2}, nil},
{[]byte{3}, []byte{3}, nil},
{[]byte{6}, []byte{6}, nil},
{[]byte{7}, []byte{7}, nil},
{nil, nil, io.EOF},
}

iter := NewMultiblockIterator(ctx, []Iterator{inner}, 10, nil, "", testLogger)
for i := 0; i < len(expected); i++ {
id, obj, err := iter.Next(ctx)
require.Equal(t, expected[i].err, err)
require.Equal(t, expected[i].id, id)
require.Equal(t, expected[i].obj, obj)
}
}