Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compression Updates #961

Merged
merged 13 commits into from
Sep 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
* [ENHANCEMENT] Added traceid to `trace too large message`. [#888](https://github.com/grafana/tempo/pull/888) (@mritunjaysharma394)
* [ENHANCEMENT] Add support to tempo workloads to `overrides` from single configmap in microservice mode. [#896](https://github.com/grafana/tempo/pull/896) (@kavirajk)
* [ENHANCEMENT] Make `overrides_config` block name consistent with Loki and Cortex in microservice mode. [#906](https://github.com/grafana/tempo/pull/906) (@kavirajk)
* [ENHANCEMENT] Updated config defaults to reflect better capture operational knowledge. [#913](https://github.com/grafana/tempo/pull/913) (@joe-elliott)
* [ENHANCEMENT] Changes the metrics name from `cortex_runtime_config_last_reload_successful` to `tempo_runtime_config_last_reload_successful` [#945](https://github.com/grafana/tempo/pull/945) (@kavirajk)
* [ENHANCEMENT] Updated config defaults to reflect better capture operational knowledge. [#913](https://github.com/grafana/tempo/pull/913) (@joe-elliott)
```
ingester:
trace_idle_period: 30s => 10s # reduce ingester memory requirements with little impact on querying
Expand All @@ -36,6 +36,7 @@
* [ENHANCEMENT] Improve zstd read throughput using zstd.Decoder [#948](https://github.com/grafana/tempo/pull/948) (@joe-elliott)
* [ENHANCEMENT] Dedupe search records while replaying WAL [#940](https://github.com/grafana/tempo/pull/940) (@annanay25)
* [ENHANCEMENT] Add status endpoint to list the available endpoints [#938](https://github.com/grafana/tempo/pull/938) (@zalegrala)
* [ENHANCEMENT] Compression updates: Added s2, improved snappy performance [#961](https://github.com/grafana/tempo/pull/961) (@joe-elliott)
* [ENHANCEMENT] Add search block headers [#943](https://github.com/grafana/tempo/pull/943) (@mdisibio)
* [CHANGE] Renamed CLI flag from `--storage.trace.maintenance-cycle` to `--storage.trace.blocklist_poll`. This is a **breaking change** [#897](https://github.com/grafana/tempo/pull/897) (@mritunjaysharma394)
* [CHANGE] update jsonnet alerts and recording rules to use `job_selectors` and `cluster_selectors` for configurable unique identifier labels [#935](https://github.com/grafana/tempo/pull/935) (@kevinschoonover)
Expand Down
4 changes: 2 additions & 2 deletions docs/tempo/website/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ storage:
[path: <string>]

# wal encoding/compression.
# options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd
# options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2
# (default: snappy)
[encoding: <string>]

Expand All @@ -542,7 +542,7 @@ storage:
# (default: 1MiB)
[index_downsample_bytes: <uint64>]

# block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd
# block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2
[encoding: <string>]
```

Expand Down
1 change: 1 addition & 0 deletions docs/tempo/website/configuration/compression.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ The following options are supported:
- lz4
- snappy
- zstd
- s2

It is important to note that although all of these compression formats are supported in Tempo, at Grafana
we use zstd and it's possible/probable that the other compression algorithms may have issue at scale. Please
Expand Down
4 changes: 2 additions & 2 deletions example/docker-compose/azure/tempo-azure.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ storage:
block:
bloom_filter_false_positive: .05 # bloom filter false positive rate. lower values create larger filters but fewer false positives
index_downsample_bytes: 1000 # number of bytes per index record
encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd
encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2
wal:
path: /tmp/tempo/wal # where to store the the wal locally
encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd
encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2
azure:
container-name: tempo # how to store data in azure
endpoint-suffix: azurite:10000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ storage:
insecure: true
wal:
path: /tmp/tempo/wal # where to store the the wal locally
encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd
encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2
local:
path: /tmp/tempo/blocks
pool:
Expand Down
4 changes: 2 additions & 2 deletions example/docker-compose/gcs/tempo-gcs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ storage:
block:
bloom_filter_false_positive: .05 # bloom filter false positive rate. lower values create larger filters but fewer false positives
index_downsample_bytes: 1000 # number of bytes per index record
encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd
encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2
wal:
path: /tmp/tempo/wal # where to store the the wal locally
encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd
encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2
gcs:
bucket_name: tempo
endpoint: https://gcs:4443/storage/v1/
Expand Down
4 changes: 2 additions & 2 deletions example/docker-compose/local/tempo-local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ storage:
block:
bloom_filter_false_positive: .05 # bloom filter false positive rate. lower values create larger filters but fewer false positives
index_downsample_bytes: 1000 # number of bytes per index record
encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd
encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2
wal:
path: /tmp/tempo/wal # where to store the the wal locally
encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd
encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2
local:
path: /tmp/tempo/blocks
pool:
Expand Down
4 changes: 2 additions & 2 deletions example/docker-compose/s3/tempo-s3.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ storage:
block:
bloom_filter_false_positive: .05 # bloom filter false positive rate. lower values create larger filters but fewer false positives
index_downsample_bytes: 1000 # number of bytes per index record
encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd
encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2
wal:
path: /tmp/tempo/wal # where to store the the wal locally
encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd
encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2
s3:
bucket: tempo # how to store data in s3
endpoint: minio:9000
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ require (
github.com/gogo/protobuf v1.3.2
github.com/gogo/status v1.1.0
github.com/golang/protobuf v1.5.2
github.com/golang/snappy v0.0.4
github.com/golang/snappy v0.0.4 // indirect
github.com/google/flatbuffers v2.0.0+incompatible
github.com/google/go-cmp v0.5.6
github.com/google/uuid v1.2.0
Expand Down
4 changes: 4 additions & 0 deletions tempodb/backend/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
EncLZ4_4M
EncSnappy
EncZstd
EncS2
)

// SupportedEncoding is a slice of all supported encodings
Expand All @@ -33,6 +34,7 @@ var SupportedEncoding = []Encoding{
EncLZ4_4M,
EncSnappy,
EncZstd,
EncS2,
}

func (e Encoding) String() string {
Expand All @@ -53,6 +55,8 @@ func (e Encoding) String() string {
return "snappy"
case EncZstd:
return "zstd"
case EncS2:
return "s2"
default:
return "unsupported"
}
Expand Down
72 changes: 38 additions & 34 deletions tempodb/encoding/streaming_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io/ioutil"
"math/rand"
"os"
"path"
"sort"
"testing"
"time"
Expand Down Expand Up @@ -301,7 +302,7 @@ func streamingBlock(t *testing.T, cfg *BlockConfig, w backend.Writer) (*Streamin
return block, ids, reqs
}

const benchDownsample = 200
const benchDownsample = 1024 * 1024

func BenchmarkWriteGzip(b *testing.B) {
benchmarkCompressBlock(b, backend.EncGZIP, benchDownsample, false)
Expand All @@ -319,10 +320,12 @@ func BenchmarkWriteLZ41M(b *testing.B) {
func BenchmarkWriteNone(b *testing.B) {
benchmarkCompressBlock(b, backend.EncNone, benchDownsample, false)
}

func BenchmarkWriteZstd(b *testing.B) {
benchmarkCompressBlock(b, backend.EncZstd, benchDownsample, false)
}
func BenchmarkWriteS2(b *testing.B) {
benchmarkCompressBlock(b, backend.EncS2, benchDownsample, false)
}

func BenchmarkReadGzip(b *testing.B) {
benchmarkCompressBlock(b, backend.EncGZIP, benchDownsample, true)
Expand All @@ -339,10 +342,12 @@ func BenchmarkReadLZ41M(b *testing.B) {
func BenchmarkReadNone(b *testing.B) {
benchmarkCompressBlock(b, backend.EncNone, benchDownsample, true)
}

func BenchmarkReadZstd(b *testing.B) {
benchmarkCompressBlock(b, backend.EncZstd, benchDownsample, true)
}
func BenchmarkReadS2(b *testing.B) {
benchmarkCompressBlock(b, backend.EncS2, benchDownsample, true)
}

// Download a block from your backend and place in ./benchmark_block/<tenant id>/<guid>
//nolint:unparam
Expand All @@ -357,7 +362,7 @@ func benchmarkCompressBlock(b *testing.B, encoding backend.Encoding, indexDownsa
require.NoError(b, err, "error creating backend")

r := backend.NewReader(rawR)
meta, err := r.BlockMeta(context.Background(), uuid.MustParse("00006e9d-94f0-4487-8e62-99f951be9349"), "1")
meta, err := r.BlockMeta(context.Background(), uuid.MustParse("20a614f8-8cda-4b9d-9789-cb626f9fab28"), "1")
require.NoError(b, err)

backendBlock, err := NewBackendBlock(meta, r)
Expand All @@ -380,13 +385,13 @@ func benchmarkCompressBlock(b *testing.B, encoding backend.Encoding, indexDownsa
b.ResetTimer()
}

originatingMeta := backend.NewBlockMeta(testTenantID, uuid.New(), "should_be_ignored", encoding, "")
block, err := NewStreamingBlock(&BlockConfig{
IndexDownsampleBytes: indexDownsample,
BloomFP: .05,
Encoding: encoding,
IndexPageSizeBytes: 10 * 1024 * 1024,
}, originatingMeta.BlockID, originatingMeta.TenantID, []*backend.BlockMeta{originatingMeta}, originatingMeta.TotalObjects)
BloomShardSizeBytes: 100000,
}, uuid.New(), meta.TenantID, []*backend.BlockMeta{meta}, meta.TotalObjects)
require.NoError(b, err, "unexpected error completing block")

ctx := context.Background()
Expand All @@ -395,8 +400,7 @@ func benchmarkCompressBlock(b *testing.B, encoding backend.Encoding, indexDownsa
if err != io.EOF {
require.NoError(b, err)
}

if id == nil {
if err == io.EOF {
break
}

Expand All @@ -416,30 +420,30 @@ func benchmarkCompressBlock(b *testing.B, encoding backend.Encoding, indexDownsa
return
}

// todo: restore read benchmarks
// b.ResetTimer()

// file, err := os.Open(block.fullFilename())
// require.NoError(b, err)
// pr, err := v2.NewDataReader(backend.NewContextReaderWithAllReader(file), encoding)
// require.NoError(b, err)

// var tempBuffer []byte
// o := v2.NewObjectReaderWriter()
// for {
// tempBuffer, _, err = pr.NextPage(tempBuffer)
// if err == io.EOF {
// break
// }
// require.NoError(b, err)

// bufferReader := bytes.NewReader(tempBuffer)

// for {
// _, _, err = o.UnmarshalObjectFromReader(bufferReader)
// if err == io.EOF {
// break
// }
// }
// }
b.ResetTimer()

fullFilename := path.Join(backendTmpDir, block.compactedMeta.TenantID, block.compactedMeta.BlockID.String(), "data")
file, err := os.Open(fullFilename)
require.NoError(b, err)
pr, err := v2.NewDataReader(backend.NewContextReaderWithAllReader(file), encoding)
require.NoError(b, err)

var tempBuffer []byte
o := v2.NewObjectReaderWriter()
for {
tempBuffer, _, err = pr.NextPage(tempBuffer)
if err == io.EOF {
break
}
require.NoError(b, err)

bufferReader := bytes.NewReader(tempBuffer)

for {
_, _, err = o.UnmarshalObjectFromReader(bufferReader)
if err == io.EOF {
break
}
}
}
}
70 changes: 65 additions & 5 deletions tempodb/encoding/v2/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ import (
"io"
"sync"

"github.com/golang/snappy"
"github.com/grafana/tempo/tempodb/backend"
"github.com/klauspost/compress/gzip"
"github.com/klauspost/compress/s2"
"github.com/klauspost/compress/snappy"
"github.com/klauspost/compress/zstd"
"github.com/pierrec/lz4/v4"
"github.com/prometheus/prometheus/pkg/pool"
)

const maxEncoding = backend.EncZstd
const maxEncoding = backend.EncS2

// WriterPool is a pool of io.Writer
// This is used by every chunk to avoid unnecessary allocations.
Expand Down Expand Up @@ -49,6 +50,8 @@ var (
Noop NoopPool
// Zstd Pool
Zstd = ZstdPool{}
// S2 Pool
S2 = S2Pool{}

// BytesBufferPool is a bytes buffer used for lines decompressed.
// Buckets [0.5KB,1KB,2KB,4KB,8KB]
Expand Down Expand Up @@ -82,6 +85,8 @@ func getReaderPool(enc backend.Encoding) (ReaderPool, error) {
return &Snappy, nil
case backend.EncZstd:
return &Zstd, nil
case backend.EncS2:
return &S2, nil
default:
return nil, fmt.Errorf("Unknown pool encoding %d", enc)
}
Expand Down Expand Up @@ -257,7 +262,7 @@ func (pool *SnappyPool) Encoding() backend.Encoding {
// GetReader gets or creates a new CompressionReader and reset it to read from src
func (pool *SnappyPool) GetReader(src io.Reader) (io.Reader, error) {
if r := pool.readers.Get(); r != nil {
reader := r.(*snappy.Reader)
reader := r.(*s2.Reader)
reader.Reset(src)
return reader, nil
}
Expand All @@ -271,15 +276,15 @@ func (pool *SnappyPool) PutReader(reader io.Reader) {

// ResetReader implements ReaderPool
func (pool *SnappyPool) ResetReader(src io.Reader, resetReader io.Reader) (io.Reader, error) {
reader := resetReader.(*snappy.Reader)
reader := resetReader.(*s2.Reader)
reader.Reset(src)
return reader, nil
}

// GetWriter gets or creates a new CompressionWriter and reset it to write to dst
func (pool *SnappyPool) GetWriter(dst io.Writer) (io.WriteCloser, error) {
if w := pool.writers.Get(); w != nil {
writer := w.(*snappy.Writer)
writer := w.(*s2.Writer)
writer.Reset(dst)
return writer, nil
}
Expand Down Expand Up @@ -394,3 +399,58 @@ func (pool *ZstdPool) ResetWriter(dst io.Writer, resetWriter io.WriteCloser) (io
writer.Reset(dst)
return writer, nil
}

// S2Pool is one s short of s3
type S2Pool struct {
readers sync.Pool
writers sync.Pool
}

// Encoding implements WriterPool and ReaderPool
func (pool *S2Pool) Encoding() backend.Encoding {
return backend.EncS2
}

// GetReader gets or creates a new CompressionReader and reset it to read from src
func (pool *S2Pool) GetReader(src io.Reader) (io.Reader, error) {
if r := pool.readers.Get(); r != nil {
reader := r.(*s2.Reader)
reader.Reset(src)
return reader, nil
}
return s2.NewReader(src), nil
}

// PutReader places back in the pool a CompressionReader
func (pool *S2Pool) PutReader(reader io.Reader) {
pool.readers.Put(reader)
}

// ResetReader implements ReaderPool
func (pool *S2Pool) ResetReader(src io.Reader, resetReader io.Reader) (io.Reader, error) {
reader := resetReader.(*s2.Reader)
reader.Reset(src)
return reader, nil
}

// GetWriter gets or creates a new CompressionWriter and reset it to write to dst
func (pool *S2Pool) GetWriter(dst io.Writer) (io.WriteCloser, error) {
if w := pool.writers.Get(); w != nil {
writer := w.(*s2.Writer)
writer.Reset(dst)
return writer, nil
}
return s2.NewWriter(dst), nil
}

// PutWriter places back in the pool a CompressionWriter
func (pool *S2Pool) PutWriter(writer io.WriteCloser) {
pool.writers.Put(writer)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a writer.Close() to ensure it is flushed.

}

// ResetWriter implements WriterPool
func (pool *S2Pool) ResetWriter(dst io.Writer, resetWriter io.WriteCloser) (io.WriteCloser, error) {
writer := resetWriter.(*s2.Writer)
writer.Reset(dst)
return writer, nil
}
Loading