Skip to content

Commit

Permalink
Parquet: Make FindTraceByID honor buffer and caching settings (#1697)
Browse files Browse the repository at this point in the history
* querier use search options

Signed-off-by: Joe Elliott <[email protected]>

* fix race

Signed-off-by: Joe Elliott <[email protected]>

* changelog

Signed-off-by: Joe Elliott <[email protected]>

* remove todone todo

Signed-off-by: Joe Elliott <[email protected]>

* removed innocuous change

Signed-off-by: Joe Elliott <[email protected]>

Signed-off-by: Joe Elliott <[email protected]>
  • Loading branch information
joe-elliott authored Aug 29, 2022
1 parent f7762e5 commit e177ad0
Show file tree
Hide file tree
Showing 12 changed files with 43 additions and 24 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
* [ENHANCEMENT] metrics-generator: expose span size as a metric [#1662](https://github.com/grafana/tempo/pull/1662) (@ie-pham)
* [ENHANCEMENT] Set Max Idle connections to 100 for Azure, should reduce DNS errors in Azure [#1632](https://github.com/grafana/tempo/pull/1632) (@electron0zero)
* [ENHANCEMENT] Add PodDisruptionBudget to ingesters in jsonnet [#1691](https://github.com/grafana/tempo/pull/1691) (@joe-elliott)
* [BUGFIX] Honor caching and buffering settings when finding traces by id [#1697](https://github.com/grafana/tempo/pull/1697) (@joe-elliott)

## v1.5.0 / 2022-08-17

Expand Down
6 changes: 5 additions & 1 deletion cmd/tempo-cli/cmd-query-blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/grafana/tempo/pkg/model/trace"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/util"
"github.com/grafana/tempo/tempodb"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/encoding"
"github.com/grafana/tempo/tempodb/encoding/common"
Expand Down Expand Up @@ -148,7 +149,10 @@ func queryBlock(ctx context.Context, r backend.Reader, c backend.Compactor, bloc
return nil, err
}

trace, err := block.FindTraceByID(ctx, traceID)
searchOpts := common.SearchOptions{}
tempodb.SearchConfig{}.ApplyToOptions(&searchOpts)

trace, err := block.FindTraceByID(ctx, traceID, searchOpts)
if err != nil {
return nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions modules/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ func (i *instance) FindTraceByID(ctx context.Context, id []byte) (*tempopb.Trace
combiner := trace.NewCombiner()
combiner.Consume(completeTrace)
for _, c := range i.completeBlocks {
found, err := c.FindTraceByID(ctx, id)
found, err := c.FindTraceByID(ctx, id, common.SearchOptions{})
if err != nil {
return nil, fmt.Errorf("completeBlock.FindTraceByID failed: %w", err)
}
Expand Down Expand Up @@ -480,7 +480,8 @@ func (i *instance) AddCompletingBlock(b *wal.AppendBlock, s *search.StreamingSea
}

// getOrCreateTrace will return a new trace object for the given request
// It must be called under the i.tracesMtx lock
//
// It must be called under the i.tracesMtx lock
func (i *instance) getOrCreateTrace(traceID []byte) *liveTrace {
fp := i.tokenForTraceID(traceID)
trace, ok := i.traces[fp]
Expand Down
2 changes: 1 addition & 1 deletion tempodb/encoding/common/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

type Finder interface {
FindTraceByID(ctx context.Context, id ID) (*tempopb.Trace, error)
FindTraceByID(ctx context.Context, id ID, opts SearchOptions) (*tempopb.Trace, error)
}

type Searcher interface {
Expand Down
2 changes: 1 addition & 1 deletion tempodb/encoding/v2/backend_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (b *BackendBlock) BlockMeta() *backend.BlockMeta {
return b.meta
}

func (b *BackendBlock) FindTraceByID(ctx context.Context, id common.ID) (*tempopb.Trace, error) {
func (b *BackendBlock) FindTraceByID(ctx context.Context, id common.ID, _ common.SearchOptions) (*tempopb.Trace, error) {
obj, err := b.find(ctx, id)
if err != nil {
return nil, err
Expand Down
17 changes: 11 additions & 6 deletions tempodb/encoding/vparquet/block_findtracebyid.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (b *backendBlock) checkBloom(ctx context.Context, id common.ID) (found bool
return filter.Test(id), nil
}

func (b *backendBlock) FindTraceByID(ctx context.Context, traceID common.ID) (_ *tempopb.Trace, err error) {
func (b *backendBlock) FindTraceByID(ctx context.Context, traceID common.ID, opts common.SearchOptions) (_ *tempopb.Trace, err error) {
span, derivedCtx := opentracing.StartSpanFromContext(ctx, "parquet.backendBlock.FindTraceByID",
opentracing.Tags{
"blockID": b.meta.BlockID,
Expand All @@ -173,15 +173,20 @@ func (b *backendBlock) FindTraceByID(ctx context.Context, traceID common.ID) (_
return nil, nil
}

// todo: combine with open logic from the other search functions
var readerAt io.ReaderAt
rr := NewBackendReaderAt(derivedCtx, b.r, DataFileName, b.meta.BlockID, b.meta.TenantID)
defer func() { span.SetTag("inspectedBytes", rr.TotalBytesRead) }()
defer func() { span.SetTag("inspectedBytes", rr.TotalBytesRead.Load()) }()

br := tempo_io.NewBufferedReaderAt(rr, int64(b.meta.Size), 512*1024, 32)
readerAt = rr
if opts.ReadBufferCount > 0 {
br := tempo_io.NewBufferedReaderAt(rr, int64(b.meta.Size), opts.ReadBufferSize, opts.ReadBufferCount)

// todo: disabling by default but we should make cache settings configurable here
or := newParquetOptimizedReaderAt(br, rr, int64(b.meta.Size), b.meta.FooterSize, common.CacheControl{Footer: false, ColumnIndex: false, OffsetIndex: false})
or := newParquetOptimizedReaderAt(br, rr, int64(b.meta.Size), b.meta.FooterSize, opts.CacheControl)
readerAt = or
}

pf, err := parquet.OpenFile(or, int64(b.meta.Size))
pf, err := parquet.OpenFile(readerAt, int64(b.meta.Size))
if err != nil {
return nil, errors.Wrap(err, "error opening file in FindTraceByID")
}
Expand Down
4 changes: 2 additions & 2 deletions tempodb/encoding/vparquet/block_findtracebyid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestBackendBlockFindTraceByID(t *testing.T) {
wantProto, err := parquetTraceToTempopbTrace(tr)
require.NoError(t, err)

gotProto, err := b.FindTraceByID(ctx, tr.TraceID)
gotProto, err := b.FindTraceByID(ctx, tr.TraceID, common.SearchOptions{})
require.NoError(t, err)

require.Equal(t, wantProto, gotProto)
Expand Down Expand Up @@ -134,7 +134,7 @@ func TestBackendBlockFindTraceByID_TestData(t *testing.T) {
// fmt.Println(tr)
// fmt.Println("going to search for traceID", util.TraceIDToHexString(tr.TraceID))

protoTr, err := b.FindTraceByID(ctx, tr.TraceID)
protoTr, err := b.FindTraceByID(ctx, tr.TraceID, common.SearchOptions{})
require.NoError(t, err)
require.NotNil(t, protoTr)
}
Expand Down
4 changes: 2 additions & 2 deletions tempodb/encoding/vparquet/block_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (b *backendBlock) Search(ctx context.Context, req *tempopb.SearchRequest, o
defer span.Finish()

rr := NewBackendReaderAt(derivedCtx, b.r, DataFileName, b.meta.BlockID, b.meta.TenantID)
defer func() { span.SetTag("inspectedBytes", rr.TotalBytesRead) }()
defer func() { span.SetTag("inspectedBytes", rr.TotalBytesRead.Load()) }()

br := tempo_io.NewBufferedReaderAt(rr, int64(b.meta.Size), opts.ReadBufferSize, opts.ReadBufferCount)

Expand Down Expand Up @@ -73,7 +73,7 @@ func (b *backendBlock) Search(ctx context.Context, req *tempopb.SearchRequest, o
// TODO: error handling
results := searchParquetFile(derivedCtx, pf, req, rgs)
results.Metrics.InspectedBlocks++
results.Metrics.InspectedBytes += rr.TotalBytesRead
results.Metrics.InspectedBytes += rr.TotalBytesRead.Load()

return results, nil
}
Expand Down
7 changes: 4 additions & 3 deletions tempodb/encoding/vparquet/readers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"

"github.com/google/uuid"
"go.uber.org/atomic"

"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/encoding/common"
Expand All @@ -18,17 +19,17 @@ type BackendReaderAt struct {
blockID uuid.UUID
tenantID string

TotalBytesRead uint64
TotalBytesRead atomic.Uint64
}

var _ io.ReaderAt = (*BackendReaderAt)(nil)

func NewBackendReaderAt(ctx context.Context, r backend.Reader, name string, blockID uuid.UUID, tenantID string) *BackendReaderAt {
return &BackendReaderAt{ctx, r, name, blockID, tenantID, 0}
return &BackendReaderAt{ctx, r, name, blockID, tenantID, atomic.Uint64{}}
}

func (b *BackendReaderAt) ReadAt(p []byte, off int64) (int, error) {
b.TotalBytesRead += uint64(len(p))
b.TotalBytesRead.Add(uint64(len(p)))
err := b.r.ReadRange(b.ctx, b.name, b.blockID, b.tenantID, uint64(off), p, false)
return len(p), err
}
Expand Down
10 changes: 8 additions & 2 deletions tempodb/tempodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,11 @@ func (rw *readerWriter) Find(ctx context.Context, tenantID string, id common.ID,
return nil, nil, nil
}

opts := common.SearchOptions{}
if rw.cfg != nil && rw.cfg.Search != nil {
rw.cfg.Search.ApplyToOptions(&opts)
}

curTime := time.Now()
partialTraces, funcErrs, err := rw.pool.RunJobs(ctx, copiedBlocklist, func(ctx context.Context, payload interface{}) (interface{}, error) {
meta := payload.(*backend.BlockMeta)
Expand All @@ -325,7 +330,7 @@ func (rw *readerWriter) Find(ctx context.Context, tenantID string, id common.ID,
return nil, errors.Wrap(err, fmt.Sprintf("error opening block for reading, blockID: %s", meta.BlockID.String()))
}

foundObject, err := block.FindTraceByID(ctx, id)
foundObject, err := block.FindTraceByID(ctx, id, opts)
if err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("error finding trace by id, blockID: %s", meta.BlockID.String()))
}
Expand Down Expand Up @@ -390,7 +395,8 @@ func (rw *readerWriter) EnableCompaction(cfg *CompactorConfig, c CompactorSharde
}

// EnablePolling activates the polling loop. Pass nil if this component
// should never be a tenant index builder.
//
// should never be a tenant index builder.
func (rw *readerWriter) EnablePolling(sharder blocklist.JobSharder) {
if sharder == nil {
sharder = blocklist.OwnsNothingSharder
Expand Down
2 changes: 1 addition & 1 deletion tempodb/tempodb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ func TestCompleteBlock(t *testing.T) {
require.NoError(t, err, "unexpected error completing block")

for i, id := range ids {
found, err := complete.FindTraceByID(context.TODO(), id)
found, err := complete.FindTraceByID(context.TODO(), id, common.SearchOptions{})
require.NoError(t, err)
require.True(t, proto.Equal(found, reqs[i]))
}
Expand Down
7 changes: 4 additions & 3 deletions tempodb/wal/local_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,13 @@ func NewLocalBlock(ctx context.Context, existingBlock common.BackendBlock, l *lo
return c, nil
}

func (c *LocalBlock) FindTraceByID(ctx context.Context, id common.ID) (*tempopb.Trace, error) {
return c.BackendBlock.FindTraceByID(ctx, id)
func (c *LocalBlock) FindTraceByID(ctx context.Context, id common.ID, opts common.SearchOptions) (*tempopb.Trace, error) {
return c.BackendBlock.FindTraceByID(ctx, id, opts)
}

// FlushedTime returns the time the block was flushed. Will return 0
// if the block was never flushed
//
// if the block was never flushed
func (c *LocalBlock) FlushedTime() time.Time {
unixTime := c.flushedTime.Load()
if unixTime == 0 {
Expand Down

0 comments on commit e177ad0

Please sign in to comment.