Skip to content

Commit

Permalink
[vParquet2] The next version of Tempo's parquet based block format (#…
Browse files Browse the repository at this point in the history
…2244)

* [vParquet2] Create new block encoding by copying vparquet

* [vParquet2] Rename columns containing timestamps and use suffix *Nano

Make column names using timestamps more consistent and more
similar the names used by OTEL

* [vParquet2] Rename InstrumentationLibrary/il to Scope

Rename columns to match the latest OTEL standard

* Fix capitalization in Span interface function EndTimeUnixNanos

* [vParquet2] Remove span column EndTimeUnixNano and add DurationNano

Increases performance of queries for span duration, as it is no longer
required to search two columns to get the span duration

* [vParquet2] Add columns for parent relation and nested set model

The columns are a prerequisite to improve structural TraceQL queries
The new columns are not populated yet

* [vParquet2] Rename Span.ID to Span.SpanID

This makes the schema more similar to the OTEL standard and makes it
possible to add a numeric span ID later (see ParentID vs ParentSpanID)

* [vParquet2] Annotate repeated groups as nested list types

This improves the compatibility with other tooling using the parquet
format such as parquet-mr/parquet-cli

* [vParquet2] Convert block in vparquet2/test-data to new schema

* [vParquet2] tempo-cli sub command to convert from vParquet to vParquet2

* [vParquet2] changelog entry

* Add vParquet2 to AllEncodings() and use it consistently in tests

* [vParquet2] mention the new block format in the configuration docs

* Change return type of createIntPredicate to predicate interface

This prevents potential bugs with nil predicates where '== nil' is false
because the value pointer is nil but the type pointer is not

* [vParquet2] Optimize duration queries and remove extra duration logic

Since there is now a dedicated duration column, queries for duration can
be treated more like any other integer column

* [vParquet2] Fix duration bug in vparquet 1 to 2 converter
  • Loading branch information
stoewer authored Apr 4, 2023
1 parent 76bf7c7 commit b990eeb
Show file tree
Hide file tree
Showing 48 changed files with 8,787 additions and 85 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

* [CHANGE] tempo-mixin: disable auto refresh every 10 seconds [#2290](https://github.com/grafana/tempo/pull/2290) (@electron0zero)
* [CHANGE] Update tempo-mixin to show request in Resources dashboard [#2281](https://github.com/grafana/tempo/pull/2281) (@electron0zero)
* [FEATURE] New parquet based block format vParquet2 [#2244](https://github.com/grafana/tempo/pull/2244) (@stoewer)
* [FEATURE] Add support for Azure Workload Identity authentication [#2195](https://github.com/grafana/tempo/pull/2195) (@LambArchie)
* [ENHANCEMENT] Add Throughput and SLO Metrics with SLOConfig in Query Frontend [#2008](https://github.com/grafana/tempo/pull/2008) (@electron0zero)
- **BREAKING CHANGE** `query_frontend_result_metrics_inspected_bytes` metric removed in favour of `query_frontend_bytes_processed_per_second`
Expand Down
228 changes: 228 additions & 0 deletions cmd/tempo-cli/cmd-convert-parquet-1to2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
package main

import (
"encoding/binary"
"fmt"
"io"
"os"

"github.com/pkg/errors"
"github.com/segmentio/parquet-go"

"github.com/grafana/tempo/tempodb/encoding/vparquet"
"github.com/grafana/tempo/tempodb/encoding/vparquet2"
)

type convertParquet1to2 struct {
In string `arg:"" help:"The input parquet file to read from"`
Out string `arg:"" help:"The output parquet file to write to"`
}

func (cmd *convertParquet1to2) Run() error {
err := cmd.convert()
if err != nil {
return err
}

return cmd.printFileInfo()
}

func (cmd *convertParquet1to2) printFileInfo() error {
fmt.Printf("Converted file: %s\n", cmd.Out)
out, err := os.Open(cmd.Out)
if err != nil {
return err
}
defer out.Close()

stat, err := out.Stat()
if err != nil {
return err
}
fmt.Printf("File size: %d\n", stat.Size())

buf := make([]byte, 8)
n, err := out.ReadAt(buf, stat.Size()-8)
if err != nil && !errors.Is(err, io.EOF) {
return err
}
if n < 4 {
return errors.New("not enough bytes read to determine footer size")
}
fmt.Printf("Footer size: %d\n", binary.LittleEndian.Uint32(buf[0:4]))

return nil
}

func (cmd *convertParquet1to2) convert() error {
in, err := os.Open(cmd.In)
if err != nil {
return err
}
defer in.Close()

inStat, err := in.Stat()
if err != nil {
return err
}

pf, err := parquet.OpenFile(in, inStat.Size())
if err != nil {
return err
}

out, err := os.OpenFile(cmd.Out, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, inStat.Mode())
if err != nil {
return err
}
defer out.Close()

writer := parquet.NewGenericWriter[vparquet2.Trace](out)
defer writer.Close()

readBuffer := make([]vparquet.Trace, 500)
writeBuffer := make([]vparquet2.Trace, 500)

rowGroups := pf.RowGroups()
fmt.Printf("Total rowgroups: %d\n", len(rowGroups))

for i, rowGroup := range rowGroups {
fmt.Printf("Converting rowgroup: %d\n", i+1)
reader := parquet.NewGenericRowGroupReader[vparquet.Trace](rowGroup)

for {
readCount, err := reader.Read(readBuffer)
if err != nil && !errors.Is(err, io.EOF) {
return err
}
if readCount == 0 {
err = writer.Flush()
if err != nil {
return err
}
break
}

vparquet1to2(readBuffer[:readCount], writeBuffer)

writeCount := 0
for writeCount < readCount {
n, err := writer.Write(writeBuffer[writeCount:readCount])
if err != nil {
return err
}
writeCount += n
}
}
}

return nil
}

func vparquet1to2(in []vparquet.Trace, out []vparquet2.Trace) {
for i, trace := range in {
vparquetTrace1to2(&trace, &out[i])
}
}

func vparquetTrace1to2(trace *vparquet.Trace, v2trace *vparquet2.Trace) {
v2trace.TraceID = trace.TraceID
v2trace.TraceIDText = trace.TraceIDText
v2trace.ResourceSpans = make([]vparquet2.ResourceSpans, len(trace.ResourceSpans))
v2trace.StartTimeUnixNano = trace.StartTimeUnixNano
v2trace.EndTimeUnixNano = trace.EndTimeUnixNano
v2trace.DurationNano = trace.DurationNanos
v2trace.RootServiceName = trace.RootServiceName
v2trace.RootSpanName = trace.RootSpanName

for i, rspan := range trace.ResourceSpans {
vparquetResourceSpans1to2(&rspan, &v2trace.ResourceSpans[i])
}
}

func vparquetResourceSpans1to2(rspan *vparquet.ResourceSpans, v2rspan *vparquet2.ResourceSpans) {
v2rspan.Resource.ServiceName = rspan.Resource.ServiceName
v2rspan.Resource.Cluster = rspan.Resource.Cluster
v2rspan.Resource.Namespace = rspan.Resource.Namespace
v2rspan.Resource.Pod = rspan.Resource.Pod
v2rspan.Resource.Container = rspan.Resource.Container
v2rspan.Resource.K8sClusterName = rspan.Resource.K8sClusterName
v2rspan.Resource.K8sNamespaceName = rspan.Resource.K8sNamespaceName
v2rspan.Resource.K8sPodName = rspan.Resource.K8sPodName
v2rspan.Resource.K8sContainerName = rspan.Resource.K8sContainerName

v2rspan.Resource.Test = rspan.Resource.Test

v2rspan.Resource.Attrs = make([]vparquet2.Attribute, len(rspan.Resource.Attrs))
for i, attr := range rspan.Resource.Attrs {
vparquetAttribute1to2(&attr, &v2rspan.Resource.Attrs[i])
}

v2rspan.ScopeSpans = make([]vparquet2.ScopeSpans, len(rspan.ScopeSpans))
for i, sspan := range rspan.ScopeSpans {
v2sspan := &v2rspan.ScopeSpans[i]
v2sspan.Scope.Name = sspan.Scope.Name
v2sspan.Scope.Version = sspan.Scope.Version

v2sspan.Spans = make([]vparquet2.Span, len(sspan.Spans))
for j, span := range sspan.Spans {
vparquetSpan1to2(&span, &v2sspan.Spans[j])
}
}
}

func vparquetSpan1to2(span *vparquet.Span, v2span *vparquet2.Span) {
v2span.SpanID = span.ID
v2span.Name = span.Name
v2span.Kind = span.Kind
v2span.ParentSpanID = span.ParentSpanID
v2span.TraceState = span.TraceState
v2span.StartTimeUnixNano = span.StartUnixNanos
v2span.DurationNano = span.EndUnixNanos - span.StartUnixNanos
v2span.StatusCode = span.StatusCode
v2span.StatusMessage = span.StatusMessage
v2span.DroppedAttributesCount = span.DroppedAttributesCount
v2span.DroppedEventsCount = span.DroppedEventsCount
v2span.Links = span.Links
v2span.DroppedLinksCount = span.DroppedLinksCount
v2span.HttpMethod = span.HttpMethod
v2span.HttpUrl = span.HttpUrl
v2span.HttpStatusCode = span.HttpStatusCode

v2span.Events = make([]vparquet2.Event, len(span.Events))
for i, event := range span.Events {
vparquetEvent1to2(&event, &v2span.Events[i])
}

v2span.Attrs = make([]vparquet2.Attribute, 0, len(span.Attrs))
for _, attr := range span.Attrs {
var v2attr vparquet2.Attribute
vparquetAttribute1to2(&attr, &v2attr)
v2span.Attrs = append(v2span.Attrs, v2attr)
}
}

func vparquetAttribute1to2(attr *vparquet.Attribute, v2attr *vparquet2.Attribute) {
v2attr.Key = attr.Key
v2attr.Value = attr.Value
v2attr.ValueInt = attr.ValueInt
v2attr.ValueDouble = attr.ValueDouble
v2attr.ValueBool = attr.ValueBool
v2attr.ValueKVList = attr.ValueKVList
v2attr.ValueArray = attr.ValueArray
}

func vparquetEvent1to2(event *vparquet.Event, v2event *vparquet2.Event) {
v2event.TimeUnixNano = event.TimeUnixNano
v2event.Name = event.Name
v2event.DroppedAttributesCount = event.DroppedAttributesCount

v2event.Test = event.Test

v2event.Attrs = make([]vparquet2.EventAttribute, len(event.Attrs))
for i, attr := range event.Attrs {
v2attr := &v2event.Attrs[i]
v2attr.Key = attr.Key
v2attr.Value = attr.Value
}
}
3 changes: 2 additions & 1 deletion cmd/tempo-cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ var cli struct {
} `cmd:""`

Parquet struct {
Convert convertParquet `cmd:"" help:"convert from an existing file to tempodb parquet schema"`
Convert convertParquet `cmd:"" help:"convert from an existing file to tempodb parquet schema"`
Convert1to2 convertParquet1to2 `cmd:"" help:"convert an exiting vParquet file to vParquet2 schema"`
} `cmd:""`

Migrate struct {
Expand Down
2 changes: 1 addition & 1 deletion docs/sources/tempo/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -987,7 +987,7 @@ storage:

# block configuration
block:
# block format version. options: v2, vParquet
# block format version. options: v2, vParquet, vParquet2
[version: <string> | default = vParquet]

# bloom filter false positive rate. lower values create larger filters but fewer false positives
Expand Down
21 changes: 14 additions & 7 deletions docs/sources/tempo/configuration/parquet.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,30 @@ If you install using the new Helm charts, then Parquet is enabled by default.

## Considerations

The new Parquet block is enabled by default in Tempo 2.0. No data conversion or upgrade process is necessary. As soon as the Parquet format is enabled, Tempo starts writing data in that format, leaving existing data as-is.
The Parquet block format is enabled by default in Tempo 2.0. No data conversion or upgrade process is necessary. As soon as the format is enabled, Tempo starts writing data in that format, leaving existing data as-is.

The new Parquet block format requires more CPU and memory resources than the previous v2 format but provides search and TraceQL functionality.
Block formats based on Parquet require more CPU and memory resources than the previous `v2` format but provide search and TraceQL functionality.

## Disable Parquet
## Choose a different block format

It is possible to disable Parquet and use the previous v2 block format. This disables all forms of search, but also reduces resource consumption, and may be desired for a high-throughput cluster that does not need these capabilities. Set the block format option to v2 in the Storage section of the configuration file.
It is possible to disable Parquet and use the previous `v2` block format. This disables all forms of search, but also reduces resource consumption, and may be desired for a high-throughput cluster that does not need these capabilities. Set the block version option to `v2` in the Storage section of the configuration file.

```yaml
# block format version. options: v2, vParquet
# block format version. options: v2, vParquet, vParquet2
[version: v2]
```
To re-enable Parquet, set the block format option to `vParquet` in the Storage section of the configuration file.
There is also a revised version of the Parquet base block format `vParquet2`. This version improves the interoperability with other tools based on Parquet. `vParquet2` is still experimental and not enabled by default yet. To enable it, set the block format version to `vParquet2` in the Storage section of the configuration file.

```yaml
# block format version. options: v2, vParquet
# block format version. options: v2, vParquet, vParquet2
[version: vParquet2]
```

To re-enable Parquet, set the block version option to `vParquet` in the Storage section of the configuration file.

```yaml
# block format version. options: v2, vParquet, vParquet2
[version: vParquet]
```

Expand Down
6 changes: 3 additions & 3 deletions pkg/model/trace/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func SortTrace(t *tempopb.Trace) {
})
}
sort.Slice(b.ScopeSpans, func(i, j int) bool {
return compareIls(b.ScopeSpans[i], b.ScopeSpans[j])
return compareScopeSpans(b.ScopeSpans[i], b.ScopeSpans[j])
})
}
sort.Slice(t.Batches, func(i, j int) bool {
Expand All @@ -28,12 +28,12 @@ func SortTrace(t *tempopb.Trace) {

func compareBatches(a *v1.ResourceSpans, b *v1.ResourceSpans) bool {
if len(a.ScopeSpans) > 0 && len(b.ScopeSpans) > 0 {
return compareIls(a.ScopeSpans[0], b.ScopeSpans[0])
return compareScopeSpans(a.ScopeSpans[0], b.ScopeSpans[0])
}
return false
}

func compareIls(a *v1.ScopeSpans, b *v1.ScopeSpans) bool {
func compareScopeSpans(a *v1.ScopeSpans, b *v1.ScopeSpans) bool {
if len(a.Spans) > 0 && len(b.Spans) > 0 {
return compareSpans(a.Spans[0], b.Spans[0])
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/traceql/ast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ var _ Span = (*mockSpan)(nil)
type mockSpan struct {
id []byte
startTimeUnixNanos uint64
endTimeUnixNanos uint64
durationNanos uint64
attributes map[Attribute]Static
}

Expand All @@ -249,6 +249,6 @@ func (m *mockSpan) ID() []byte {
func (m *mockSpan) StartTimeUnixNanos() uint64 {
return m.startTimeUnixNanos
}
func (m *mockSpan) EndtimeUnixNanos() uint64 {
return m.endTimeUnixNanos
func (m *mockSpan) DurationNanos() uint64 {
return m.durationNanos
}
2 changes: 1 addition & 1 deletion pkg/traceql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (e *Engine) asTraceSearchMetadata(spanset *Spanset) *tempopb.TraceSearchMet
tempopbSpan := &tempopb.Span{
SpanID: util.SpanIDToHexString(span.ID()),
StartTimeUnixNano: span.StartTimeUnixNanos(),
DurationNanos: span.EndtimeUnixNanos() - span.StartTimeUnixNanos(),
DurationNanos: span.DurationNanos(),
Attributes: nil,
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/traceql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestEngine_Execute(t *testing.T) {
&mockSpan{
id: []byte{2},
startTimeUnixNanos: uint64(now.UnixNano()),
endTimeUnixNanos: uint64(now.Add(100 * time.Millisecond).UnixNano()),
durationNanos: uint64((100 * time.Millisecond).Nanoseconds()),
attributes: map[Attribute]Static{
NewAttribute("foo"): NewStaticString("value"),
NewAttribute("bar"): NewStaticString("value"),
Expand Down Expand Up @@ -154,7 +154,7 @@ func TestEngine_asTraceSearchMetadata(t *testing.T) {
&mockSpan{
id: spanID1,
startTimeUnixNanos: uint64(now.UnixNano()),
endTimeUnixNanos: uint64(now.Add(10 * time.Second).UnixNano()),
durationNanos: uint64((10 * time.Second).Nanoseconds()),
attributes: map[Attribute]Static{
NewIntrinsic(IntrinsicName): NewStaticString("HTTP GET"),
NewIntrinsic(IntrinsicStatus): NewStaticStatus(StatusOk),
Expand All @@ -169,7 +169,7 @@ func TestEngine_asTraceSearchMetadata(t *testing.T) {
&mockSpan{
id: spanID2,
startTimeUnixNanos: uint64(now.Add(2 * time.Second).UnixNano()),
endTimeUnixNanos: uint64(now.Add(20 * time.Second).UnixNano()),
durationNanos: uint64((20 * time.Second).Nanoseconds()),
attributes: map[Attribute]Static{},
},
},
Expand Down Expand Up @@ -247,7 +247,7 @@ func TestEngine_asTraceSearchMetadata(t *testing.T) {
{
SpanID: util.SpanIDToHexString(spanID2),
StartTimeUnixNano: uint64(now.Add(2 * time.Second).UnixNano()),
DurationNanos: 18_000_000_000,
DurationNanos: 20_000_000_000,
Attributes: nil,
},
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/traceql/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type Span interface {

ID() []byte
StartTimeUnixNanos() uint64
EndtimeUnixNanos() uint64
DurationNanos() uint64
}

type Spanset struct {
Expand Down
Loading

0 comments on commit b990eeb

Please sign in to comment.