Skip to content

Commit

Permalink
Compression Updates (grafana#961)
Browse files Browse the repository at this point in the history
* fixed write benchmark

Signed-off-by: Joe Elliott <[email protected]>

* benchmarks work

Signed-off-by: Joe Elliott <[email protected]>

* klauspost snappy

Signed-off-by: Joe Elliott <[email protected]>

* add s2

Signed-off-by: Joe Elliott <[email protected]>

* disable compression

Signed-off-by: Joe Elliott <[email protected]>

* Revert "disable compression"

This reverts commit b68b038.

* removed jpes

Signed-off-by: Joe Elliott <[email protected]>

* docs/changelog

Signed-off-by: Joe Elliott <[email protected]>

* vendor-check

Signed-off-by: Joe Elliott <[email protected]>

* fixed test

Signed-off-by: Joe Elliott <[email protected]>

* maxEncoding?

Signed-off-by: Joe Elliott <[email protected]>
  • Loading branch information
joe-elliott authored Sep 15, 2021
1 parent e976a08 commit b89f2ae
Show file tree
Hide file tree
Showing 38 changed files with 22,271 additions and 52 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
* [ENHANCEMENT] Added traceid to `trace too large message`. [#888](https://github.com/grafana/tempo/pull/888) (@mritunjaysharma394)
* [ENHANCEMENT] Add support to tempo workloads to `overrides` from single configmap in microservice mode. [#896](https://github.com/grafana/tempo/pull/896) (@kavirajk)
* [ENHANCEMENT] Make `overrides_config` block name consistent with Loki and Cortex in microservice mode. [#906](https://github.com/grafana/tempo/pull/906) (@kavirajk)
* [ENHANCEMENT] Updated config defaults to reflect better capture operational knowledge. [#913](https://github.com/grafana/tempo/pull/913) (@joe-elliott)
* [ENHANCEMENT] Changes the metrics name from `cortex_runtime_config_last_reload_successful` to `tempo_runtime_config_last_reload_successful` [#945](https://github.com/grafana/tempo/pull/945) (@kavirajk)
* [ENHANCEMENT] Updated config defaults to reflect better capture operational knowledge. [#913](https://github.com/grafana/tempo/pull/913) (@joe-elliott)
```
ingester:
trace_idle_period: 30s => 10s # reduce ingester memory requirements with little impact on querying
Expand All @@ -36,6 +36,7 @@
* [ENHANCEMENT] Improve zstd read throughput using zstd.Decoder [#948](https://github.com/grafana/tempo/pull/948) (@joe-elliott)
* [ENHANCEMENT] Dedupe search records while replaying WAL [#940](https://github.com/grafana/tempo/pull/940) (@annanay25)
* [ENHANCEMENT] Add status endpoint to list the available endpoints [#938](https://github.com/grafana/tempo/pull/938) (@zalegrala)
* [ENHANCEMENT] Compression updates: Added s2, improved snappy performance [#961](https://github.com/grafana/tempo/pull/961) (@joe-elliott)
* [ENHANCEMENT] Add search block headers [#943](https://github.com/grafana/tempo/pull/943) (@mdisibio)
* [CHANGE] Renamed CLI flag from `--storage.trace.maintenance-cycle` to `--storage.trace.blocklist_poll`. This is a **breaking change** [#897](https://github.com/grafana/tempo/pull/897) (@mritunjaysharma394)
* [CHANGE] update jsonnet alerts and recording rules to use `job_selectors` and `cluster_selectors` for configurable unique identifier labels [#935](https://github.com/grafana/tempo/pull/935) (@kevinschoonover)
Expand Down
4 changes: 2 additions & 2 deletions docs/tempo/website/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ storage:
[path: <string>]
# wal encoding/compression.
# options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd
# options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2
# (default: snappy)
[encoding: <string>]
Expand All @@ -542,7 +542,7 @@ storage:
# (default: 1MiB)
[index_downsample_bytes: <uint64>]
# block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd
# block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2
[encoding: <string>]
```

Expand Down
1 change: 1 addition & 0 deletions docs/tempo/website/configuration/compression.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ The following options are supported:
- lz4
- snappy
- zstd
- s2

It is important to note that although all of these compression formats are supported in Tempo, at Grafana
we use zstd and it's possible/probable that the other compression algorithms may have issue at scale. Please
Expand Down
4 changes: 2 additions & 2 deletions example/docker-compose/azure/tempo-azure.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ storage:
block:
bloom_filter_false_positive: .05 # bloom filter false positive rate. lower values create larger filters but fewer false positives
index_downsample_bytes: 1000 # number of bytes per index record
encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd
encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2
wal:
path: /tmp/tempo/wal # where to store the the wal locally
encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd
encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2
azure:
container-name: tempo # how to store data in azure
endpoint-suffix: azurite:10000
Expand Down
2 changes: 1 addition & 1 deletion example/docker-compose/distributed/tempo-distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ storage:
insecure: true
wal:
path: /tmp/tempo/wal # where to store the the wal locally
encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd
encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2
local:
path: /tmp/tempo/blocks
pool:
Expand Down
4 changes: 2 additions & 2 deletions example/docker-compose/gcs/tempo-gcs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ storage:
block:
bloom_filter_false_positive: .05 # bloom filter false positive rate. lower values create larger filters but fewer false positives
index_downsample_bytes: 1000 # number of bytes per index record
encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd
encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2
wal:
path: /tmp/tempo/wal # where to store the the wal locally
encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd
encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2
gcs:
bucket_name: tempo
endpoint: https://gcs:4443/storage/v1/
Expand Down
4 changes: 2 additions & 2 deletions example/docker-compose/local/tempo-local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ storage:
block:
bloom_filter_false_positive: .05 # bloom filter false positive rate. lower values create larger filters but fewer false positives
index_downsample_bytes: 1000 # number of bytes per index record
encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd
encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2
wal:
path: /tmp/tempo/wal # where to store the the wal locally
encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd
encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2
local:
path: /tmp/tempo/blocks
pool:
Expand Down
4 changes: 2 additions & 2 deletions example/docker-compose/s3/tempo-s3.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ storage:
block:
bloom_filter_false_positive: .05 # bloom filter false positive rate. lower values create larger filters but fewer false positives
index_downsample_bytes: 1000 # number of bytes per index record
encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd
encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2
wal:
path: /tmp/tempo/wal # where to store the the wal locally
encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd
encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2
s3:
bucket: tempo # how to store data in s3
endpoint: minio:9000
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ require (
github.com/gogo/protobuf v1.3.2
github.com/gogo/status v1.1.0
github.com/golang/protobuf v1.5.2
github.com/golang/snappy v0.0.4
github.com/golang/snappy v0.0.4 // indirect
github.com/google/flatbuffers v2.0.0+incompatible
github.com/google/go-cmp v0.5.6
github.com/google/uuid v1.2.0
Expand Down
4 changes: 4 additions & 0 deletions tempodb/backend/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
EncLZ4_4M
EncSnappy
EncZstd
EncS2
)

// SupportedEncoding is a slice of all supported encodings
Expand All @@ -33,6 +34,7 @@ var SupportedEncoding = []Encoding{
EncLZ4_4M,
EncSnappy,
EncZstd,
EncS2,
}

func (e Encoding) String() string {
Expand All @@ -53,6 +55,8 @@ func (e Encoding) String() string {
return "snappy"
case EncZstd:
return "zstd"
case EncS2:
return "s2"
default:
return "unsupported"
}
Expand Down
72 changes: 38 additions & 34 deletions tempodb/encoding/streaming_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io/ioutil"
"math/rand"
"os"
"path"
"sort"
"testing"
"time"
Expand Down Expand Up @@ -301,7 +302,7 @@ func streamingBlock(t *testing.T, cfg *BlockConfig, w backend.Writer) (*Streamin
return block, ids, reqs
}

const benchDownsample = 200
const benchDownsample = 1024 * 1024

func BenchmarkWriteGzip(b *testing.B) {
benchmarkCompressBlock(b, backend.EncGZIP, benchDownsample, false)
Expand All @@ -319,10 +320,12 @@ func BenchmarkWriteLZ41M(b *testing.B) {
func BenchmarkWriteNone(b *testing.B) {
benchmarkCompressBlock(b, backend.EncNone, benchDownsample, false)
}

func BenchmarkWriteZstd(b *testing.B) {
benchmarkCompressBlock(b, backend.EncZstd, benchDownsample, false)
}
func BenchmarkWriteS2(b *testing.B) {
benchmarkCompressBlock(b, backend.EncS2, benchDownsample, false)
}

func BenchmarkReadGzip(b *testing.B) {
benchmarkCompressBlock(b, backend.EncGZIP, benchDownsample, true)
Expand All @@ -339,10 +342,12 @@ func BenchmarkReadLZ41M(b *testing.B) {
func BenchmarkReadNone(b *testing.B) {
benchmarkCompressBlock(b, backend.EncNone, benchDownsample, true)
}

func BenchmarkReadZstd(b *testing.B) {
benchmarkCompressBlock(b, backend.EncZstd, benchDownsample, true)
}
func BenchmarkReadS2(b *testing.B) {
benchmarkCompressBlock(b, backend.EncS2, benchDownsample, true)
}

// Download a block from your backend and place in ./benchmark_block/<tenant id>/<guid>
//nolint:unparam
Expand All @@ -357,7 +362,7 @@ func benchmarkCompressBlock(b *testing.B, encoding backend.Encoding, indexDownsa
require.NoError(b, err, "error creating backend")

r := backend.NewReader(rawR)
meta, err := r.BlockMeta(context.Background(), uuid.MustParse("00006e9d-94f0-4487-8e62-99f951be9349"), "1")
meta, err := r.BlockMeta(context.Background(), uuid.MustParse("20a614f8-8cda-4b9d-9789-cb626f9fab28"), "1")
require.NoError(b, err)

backendBlock, err := NewBackendBlock(meta, r)
Expand All @@ -380,13 +385,13 @@ func benchmarkCompressBlock(b *testing.B, encoding backend.Encoding, indexDownsa
b.ResetTimer()
}

originatingMeta := backend.NewBlockMeta(testTenantID, uuid.New(), "should_be_ignored", encoding, "")
block, err := NewStreamingBlock(&BlockConfig{
IndexDownsampleBytes: indexDownsample,
BloomFP: .05,
Encoding: encoding,
IndexPageSizeBytes: 10 * 1024 * 1024,
}, originatingMeta.BlockID, originatingMeta.TenantID, []*backend.BlockMeta{originatingMeta}, originatingMeta.TotalObjects)
BloomShardSizeBytes: 100000,
}, uuid.New(), meta.TenantID, []*backend.BlockMeta{meta}, meta.TotalObjects)
require.NoError(b, err, "unexpected error completing block")

ctx := context.Background()
Expand All @@ -395,8 +400,7 @@ func benchmarkCompressBlock(b *testing.B, encoding backend.Encoding, indexDownsa
if err != io.EOF {
require.NoError(b, err)
}

if id == nil {
if err == io.EOF {
break
}

Expand All @@ -416,30 +420,30 @@ func benchmarkCompressBlock(b *testing.B, encoding backend.Encoding, indexDownsa
return
}

// todo: restore read benchmarks
// b.ResetTimer()

// file, err := os.Open(block.fullFilename())
// require.NoError(b, err)
// pr, err := v2.NewDataReader(backend.NewContextReaderWithAllReader(file), encoding)
// require.NoError(b, err)

// var tempBuffer []byte
// o := v2.NewObjectReaderWriter()
// for {
// tempBuffer, _, err = pr.NextPage(tempBuffer)
// if err == io.EOF {
// break
// }
// require.NoError(b, err)

// bufferReader := bytes.NewReader(tempBuffer)

// for {
// _, _, err = o.UnmarshalObjectFromReader(bufferReader)
// if err == io.EOF {
// break
// }
// }
// }
b.ResetTimer()

fullFilename := path.Join(backendTmpDir, block.compactedMeta.TenantID, block.compactedMeta.BlockID.String(), "data")
file, err := os.Open(fullFilename)
require.NoError(b, err)
pr, err := v2.NewDataReader(backend.NewContextReaderWithAllReader(file), encoding)
require.NoError(b, err)

var tempBuffer []byte
o := v2.NewObjectReaderWriter()
for {
tempBuffer, _, err = pr.NextPage(tempBuffer)
if err == io.EOF {
break
}
require.NoError(b, err)

bufferReader := bytes.NewReader(tempBuffer)

for {
_, _, err = o.UnmarshalObjectFromReader(bufferReader)
if err == io.EOF {
break
}
}
}
}
70 changes: 65 additions & 5 deletions tempodb/encoding/v2/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ import (
"io"
"sync"

"github.com/golang/snappy"
"github.com/grafana/tempo/tempodb/backend"
"github.com/klauspost/compress/gzip"
"github.com/klauspost/compress/s2"
"github.com/klauspost/compress/snappy"
"github.com/klauspost/compress/zstd"
"github.com/pierrec/lz4/v4"
"github.com/prometheus/prometheus/pkg/pool"
)

const maxEncoding = backend.EncZstd
const maxEncoding = backend.EncS2

// WriterPool is a pool of io.Writer
// This is used by every chunk to avoid unnecessary allocations.
Expand Down Expand Up @@ -49,6 +50,8 @@ var (
Noop NoopPool
// Zstd Pool
Zstd = ZstdPool{}
// S2 Pool
S2 = S2Pool{}

// BytesBufferPool is a bytes buffer used for lines decompressed.
// Buckets [0.5KB,1KB,2KB,4KB,8KB]
Expand Down Expand Up @@ -82,6 +85,8 @@ func getReaderPool(enc backend.Encoding) (ReaderPool, error) {
return &Snappy, nil
case backend.EncZstd:
return &Zstd, nil
case backend.EncS2:
return &S2, nil
default:
return nil, fmt.Errorf("Unknown pool encoding %d", enc)
}
Expand Down Expand Up @@ -257,7 +262,7 @@ func (pool *SnappyPool) Encoding() backend.Encoding {
// GetReader gets or creates a new CompressionReader and reset it to read from src
func (pool *SnappyPool) GetReader(src io.Reader) (io.Reader, error) {
if r := pool.readers.Get(); r != nil {
reader := r.(*snappy.Reader)
reader := r.(*s2.Reader)
reader.Reset(src)
return reader, nil
}
Expand All @@ -271,15 +276,15 @@ func (pool *SnappyPool) PutReader(reader io.Reader) {

// ResetReader implements ReaderPool
func (pool *SnappyPool) ResetReader(src io.Reader, resetReader io.Reader) (io.Reader, error) {
reader := resetReader.(*snappy.Reader)
reader := resetReader.(*s2.Reader)
reader.Reset(src)
return reader, nil
}

// GetWriter gets or creates a new CompressionWriter and reset it to write to dst
func (pool *SnappyPool) GetWriter(dst io.Writer) (io.WriteCloser, error) {
if w := pool.writers.Get(); w != nil {
writer := w.(*snappy.Writer)
writer := w.(*s2.Writer)
writer.Reset(dst)
return writer, nil
}
Expand Down Expand Up @@ -394,3 +399,58 @@ func (pool *ZstdPool) ResetWriter(dst io.Writer, resetWriter io.WriteCloser) (io
writer.Reset(dst)
return writer, nil
}

// S2Pool is one s short of s3
type S2Pool struct {
readers sync.Pool
writers sync.Pool
}

// Encoding implements WriterPool and ReaderPool
func (pool *S2Pool) Encoding() backend.Encoding {
return backend.EncS2
}

// GetReader gets or creates a new CompressionReader and reset it to read from src
func (pool *S2Pool) GetReader(src io.Reader) (io.Reader, error) {
if r := pool.readers.Get(); r != nil {
reader := r.(*s2.Reader)
reader.Reset(src)
return reader, nil
}
return s2.NewReader(src), nil
}

// PutReader places back in the pool a CompressionReader
func (pool *S2Pool) PutReader(reader io.Reader) {
pool.readers.Put(reader)
}

// ResetReader implements ReaderPool
func (pool *S2Pool) ResetReader(src io.Reader, resetReader io.Reader) (io.Reader, error) {
reader := resetReader.(*s2.Reader)
reader.Reset(src)
return reader, nil
}

// GetWriter gets or creates a new CompressionWriter and reset it to write to dst
func (pool *S2Pool) GetWriter(dst io.Writer) (io.WriteCloser, error) {
if w := pool.writers.Get(); w != nil {
writer := w.(*s2.Writer)
writer.Reset(dst)
return writer, nil
}
return s2.NewWriter(dst), nil
}

// PutWriter places back in the pool a CompressionWriter
func (pool *S2Pool) PutWriter(writer io.WriteCloser) {
pool.writers.Put(writer)
}

// ResetWriter implements WriterPool
func (pool *S2Pool) ResetWriter(dst io.Writer, resetWriter io.WriteCloser) (io.WriteCloser, error) {
writer := resetWriter.(*s2.Writer)
writer.Reset(dst)
return writer, nil
}
Loading

0 comments on commit b89f2ae

Please sign in to comment.