Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Traceql Metrics] PR 3 - Trace ID sharding #3258

Merged
merged 8 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
## main / unreleased

* [FEATURE] TraceQL metrics queries [#3227](https://github.com/grafana/tempo/pull/3227) [#3252](https://github.com/grafana/tempo/pull/3252) (@mdisibio @zalegrala)
* [FEATURE] TraceQL metrics queries [#3227](https://github.com/grafana/tempo/pull/3227) [#3252](https://github.com/grafana/tempo/pull/3252) [#3258](https://github.com/grafana/tempo/pull/3258) (@mdisibio @zalegrala)
* [FEATURE] Add support for multi-tenant queries. [#3087](https://github.com/grafana/tempo/pull/3087) (@electron0zero)
* [BUGFIX] Change exit code if config is successfully verified [#3174](https://github.com/grafana/tempo/pull/3174) (@am3o @agrib-01)
* [BUGFIX] The tempo-cli analyse blocks command no longer fails on compacted blocks [#3183](https://github.com/grafana/tempo/pull/3183) (@stoewer)
Expand Down
91 changes: 91 additions & 0 deletions pkg/util/traceidboundary/traceidboundary.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package traceidboundary

import (
"bytes"

"github.com/grafana/tempo/pkg/blockboundary"
)

type Boundary struct {
Min, Max []byte
}

// Pairs returns the boundaries that match trace IDs in that shard. Internally this is
// similar to how queriers divide the block ID-space, but here it's trace IDs instead.
// The inputs are 1-based because it seems more readable: shard 1 of 10. Most boundaries
// are [,) lower inclusive, upper exclusive. However the last boundary that ends in the
// max value 0xFFFF... is [,] inclusive/inclusive and indicated when the return value
// upperInclusive is set.
// Of course there are some caveats:
// - Trace IDs can be 16 or 8 bytes. If we naively sharded only in 16-byte space it would
// be unbalanced because all 8-byte IDs would land in the first shard. Therefore we
// divide in both 16- and 8-byte spaces and a single shard covers a range in each.
// - Technically 8-byte IDs are only 63 bits, so we account for this
// - The boundaries are inclusive/exclusive: [min, max), except the max of the last shard
// is the valid ID FFFFF... and inclusive/inclusive.
func Pairs(shard, of uint32) (boundaries []Boundary, upperInclusive bool) {
// First pair is 63-bit IDs left-padded with zeroes to make 16-byte divisions
// that matches the 16-byte layout in the block.
// To create 63-bit boundaries we create twice as many as needed,
// then only use the first half. i.e. shaving off the top-most bit.
int63bounds := blockboundary.CreateBlockBoundaries(int(of * 2))

// Adjust last boundary to be inclusive so it matches the other pair.
int63bounds[of] = []byte{0x7F, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}

boundaries = append(boundaries, Boundary{
Min: append([]byte{0, 0, 0, 0, 0, 0, 0, 0}, int63bounds[shard-1][0:8]...),
Max: append([]byte{0, 0, 0, 0, 0, 0, 0, 0}, int63bounds[shard][0:8]...),
})

// Second pair is normal full precision 16-byte IDs.
int128bounds := blockboundary.CreateBlockBoundaries(int(of))

// However there is one caveat - We adjust the very first boundary to ensure it doesn't
// overlap with the 63-bit precision ones. I.e. a minimum of 0x0000.... would
// unintentionally include all 63-bit IDs.
// The first 64-bit ID starts here:
int128bounds[0] = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0x80, 0, 0, 0, 0, 0, 0, 0}

boundaries = append(boundaries, Boundary{
Min: int128bounds[shard-1],
Max: int128bounds[shard],
})

// Top most 0xFFFFF... boundary is inclusive
upperInclusive = shard == of

return
}

// Funcs returns helper functions that match trace IDs in the given shard.
func Funcs(shard, of uint32) (testSingle func([]byte) bool, testRange func([]byte, []byte) bool) {
pairs, upperInclusive := Pairs(shard, of)

upper := -1
if upperInclusive {
upper = 0
}

isMatch := func(id []byte) bool {
for _, p := range pairs {
if bytes.Compare(p.Min, id) <= 0 && bytes.Compare(id, p.Max) <= upper {
// fmt.Printf("TraceID: %16X true\n", id)
return true
}
}
// fmt.Printf("TraceID: %16X false\n", id)
return false
}

withinRange := func(min []byte, max []byte) bool {
for _, p := range pairs {
if bytes.Compare(p.Min, max) <= 0 && bytes.Compare(min, p.Max) <= upper {
return true
}
}
return false
}

return isMatch, withinRange
}
51 changes: 51 additions & 0 deletions pkg/util/traceidboundary/traceidboundary_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package traceidboundary

import (
"fmt"
"testing"

"github.com/stretchr/testify/require"
)

func TestPairs(t *testing.T) {
testCases := []struct {
shard, of uint32
expectedPairs []Boundary
expectedUpper bool
}{
{
1, 2,
[]Boundary{
{
[]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, // Min 63-bit value
[]byte{0, 0, 0, 0, 0, 0, 0, 0, 0x40, 0, 0, 0, 0, 0, 0, 0}, // Half of 63-bit space (exlusive)
},
{
[]byte{0, 0, 0, 0, 0, 0, 0, 0, 0x80, 0, 0, 0, 0, 0, 0, 0}, // Min 64-bit value (not overlapping with max 63-bit value)
[]byte{0x80, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, // Half of 128-bit space (exlusive)
},
},
false,
},
{
2, 2, []Boundary{
{
[]byte{0, 0, 0, 0, 0, 0, 0, 0, 0x40, 0, 0, 0, 0, 0, 0, 0}, // Half of 63-bit space
[]byte{0, 0, 0, 0, 0, 0, 0, 0, 0x7F, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}, // Max 63-bit space (inclusive)
},
{
[]byte{0x80, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, // Half of 128-bit space (not overlapping with max 63-bit value)
[]byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}, // Max 128-bit value (inclusive)
},
}, true,
},
}

for _, tc := range testCases {
t.Run(fmt.Sprintf("%d of %d", tc.shard, tc.of), func(t *testing.T) {
pairs, upper := Pairs(tc.shard, tc.of)
require.Equal(t, tc.expectedPairs, pairs)
require.Equal(t, tc.expectedUpper, upper)
})
}
}
4 changes: 4 additions & 0 deletions tempodb/encoding/vparquet/block_traceql.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,10 @@ func (i *mergeSpansetIterator) Close() {
// V

func fetch(ctx context.Context, req traceql.FetchSpansRequest, pf *parquet.File, opts common.SearchOptions) (*spansetIterator, error) {
if req.ShardCount > 0 {
return nil, errors.New("traceql sharding not supported")
}

iter, err := createAllIterator(ctx, nil, req.Conditions, req.AllConditions, req.StartTimeUnixNanos, req.EndTimeUnixNanos, pf, opts)
if err != nil {
return nil, fmt.Errorf("error creating iterator: %w", err)
Expand Down
35 changes: 26 additions & 9 deletions tempodb/encoding/vparquet2/block_traceql.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1"
"github.com/grafana/tempo/pkg/traceql"
"github.com/grafana/tempo/pkg/util"
"github.com/grafana/tempo/pkg/util/traceidboundary"
"github.com/grafana/tempo/tempodb/encoding/common"
)

Expand Down Expand Up @@ -435,7 +436,9 @@ func (b *backendBlock) Fetch(ctx context.Context, req traceql.FetchSpansRequest,
return traceql.FetchSpansResponse{}, err
}

iter, err := fetch(ctx, req, pf, opts)
rgs := rowGroupsFromFile(pf, opts)

iter, err := fetch(ctx, req, pf, rgs)
if err != nil {
return traceql.FetchSpansResponse{}, fmt.Errorf("creating fetch iter: %w", err)
}
Expand Down Expand Up @@ -868,16 +871,16 @@ func (i *mergeSpansetIterator) Close() {
// |
// V

func fetch(ctx context.Context, req traceql.FetchSpansRequest, pf *parquet.File, opts common.SearchOptions) (*spansetIterator, error) {
iter, err := createAllIterator(ctx, nil, req.Conditions, req.AllConditions, req.StartTimeUnixNanos, req.EndTimeUnixNanos, pf, opts)
func fetch(ctx context.Context, req traceql.FetchSpansRequest, pf *parquet.File, rowGroups []parquet.RowGroup) (*spansetIterator, error) {
iter, err := createAllIterator(ctx, nil, req.Conditions, req.AllConditions, req.StartTimeUnixNanos, req.EndTimeUnixNanos, req.ShardID, req.ShardCount, rowGroups, pf)
if err != nil {
return nil, fmt.Errorf("error creating iterator: %w", err)
}

if req.SecondPass != nil {
iter = newBridgeIterator(newRebatchIterator(iter), req.SecondPass)

iter, err = createAllIterator(ctx, iter, req.SecondPassConditions, false, 0, 0, pf, opts)
iter, err = createAllIterator(ctx, iter, req.SecondPassConditions, false, 0, 0, req.ShardID, req.ShardCount, rowGroups, pf)
if err != nil {
return nil, fmt.Errorf("error creating second pass iterator: %w", err)
}
Expand All @@ -886,7 +889,9 @@ func fetch(ctx context.Context, req traceql.FetchSpansRequest, pf *parquet.File,
return newSpansetIterator(newRebatchIterator(iter)), nil
}

func createAllIterator(ctx context.Context, primaryIter parquetquery.Iterator, conds []traceql.Condition, allConditions bool, start, end uint64, pf *parquet.File, opts common.SearchOptions) (parquetquery.Iterator, error) {
func createAllIterator(ctx context.Context, primaryIter parquetquery.Iterator, conds []traceql.Condition, allConditions bool, start, end uint64,
shardID, shardCount uint32, rgs []parquet.RowGroup, pf *parquet.File,
) (parquetquery.Iterator, error) {
// Categorize conditions into span-level or resource-level
var (
mingledConditions bool
Expand Down Expand Up @@ -928,7 +933,6 @@ func createAllIterator(ctx context.Context, primaryIter parquetquery.Iterator, c
}
}

rgs := rowGroupsFromFile(pf, opts)
makeIter := makeIterFunc(ctx, rgs, pf)

// Global state
Expand Down Expand Up @@ -969,7 +973,7 @@ func createAllIterator(ctx context.Context, primaryIter parquetquery.Iterator, c
return nil, fmt.Errorf("creating resource iterator: %w", err)
}

return createTraceIterator(makeIter, resourceIter, traceConditions, start, end, allConditions)
return createTraceIterator(makeIter, resourceIter, traceConditions, start, end, shardID, shardCount, allConditions)
}

// createSpanIterator iterates through all span-level columns, groups them into rows representing
Expand Down Expand Up @@ -1262,7 +1266,7 @@ func createResourceIterator(makeIter makeIterFn, spanIterator parquetquery.Itera
required, iters, batchCol)
}

func createTraceIterator(makeIter makeIterFn, resourceIter parquetquery.Iterator, conds []traceql.Condition, start, end uint64, allConditions bool) (parquetquery.Iterator, error) {
func createTraceIterator(makeIter makeIterFn, resourceIter parquetquery.Iterator, conds []traceql.Condition, start, end uint64, shardID, shardCount uint32, allConditions bool) (parquetquery.Iterator, error) {
traceIters := make([]parquetquery.Iterator, 0, 3)

var err error
Expand All @@ -1273,7 +1277,7 @@ func createTraceIterator(makeIter makeIterFn, resourceIter parquetquery.Iterator
for _, cond := range conds {
switch cond.Attribute.Intrinsic {
case traceql.IntrinsicTraceID:
traceIters = append(traceIters, makeIter(columnPathTraceID, nil, columnPathTraceID))
traceIters = append(traceIters, makeIter(columnPathTraceID, NewTraceIDShardingPredicate(shardID, shardCount), columnPathTraceID))
case traceql.IntrinsicTraceDuration:
var pred parquetquery.Predicate
if allConditions {
Expand Down Expand Up @@ -2019,3 +2023,16 @@ func orIfNeeded(preds []parquetquery.Predicate) parquetquery.Predicate {
return parquetquery.NewOrPredicate(preds...)
}
}

// NewTraceIDShardingPredicate creates a predicate for the TraceID column to match only IDs
// within the shard. If sharding isn't present, returns nil meaning no predicate.
func NewTraceIDShardingPredicate(shardID, shardCount uint32) parquetquery.Predicate {
if shardCount <= 1 || shardID <= 0 {
return nil
}

isMatch, withinRange := traceidboundary.Funcs(shardID, shardCount)
extract := func(v parquet.Value) []byte { return v.ByteArray() }

return parquetquery.NewGenericPredicate(isMatch, withinRange, extract)
}
2 changes: 1 addition & 1 deletion tempodb/encoding/vparquet2/wal_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ func (b *walBlock) Fetch(ctx context.Context, req traceql.FetchSpansRequest, opt

pf := file.parquetFile

iter, err := fetch(ctx, req, pf, opts)
iter, err := fetch(ctx, req, pf, pf.RowGroups())
if err != nil {
return traceql.FetchSpansResponse{}, fmt.Errorf("creating fetch iter: %w", err)
}
Expand Down
Loading
Loading