Skip to content

Commit

Permalink
[cli] Add command to generate index/bloom from a data file (#903)
Browse files Browse the repository at this point in the history
* [cli] Add command to generate index from a data file

Signed-off-by: Annanay <[email protected]>

* make fmt

Signed-off-by: Annanay <[email protected]>

* Use an iterator over the page and use the last id in the index

Signed-off-by: Annanay <[email protected]>

* Use cmd.backendOptions.Bucket, rename command

Signed-off-by: Annanay <[email protected]>

* Add basic index verifier, lint

Signed-off-by: Annanay <[email protected]>

* refactor

Signed-off-by: Annanay <[email protected]>

* lint

Signed-off-by: Annanay <[email protected]>

* Improve error messages, edit docs about bad blocks

Signed-off-by: Annanay <[email protected]>

* Add gen bloom command

Signed-off-by: Annanay <[email protected]>

* Update docs

Signed-off-by: Annanay <[email protected]>

* Lint

Signed-off-by: Annanay <[email protected]>

* Clarify docs

Signed-off-by: Annanay <[email protected]>

* CHANGELOG

Signed-off-by: Annanay <[email protected]>
  • Loading branch information
annanay25 authored Aug 30, 2021
1 parent b22e84b commit 6f99d44
Show file tree
Hide file tree
Showing 19 changed files with 443 additions and 19 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
## main / unreleased

* [BUGFIX] Update port spec for GCS docker-compose example [#869](https://github.com/grafana/tempo/pull/869) (@zalegrala)
* [BUGFIX] Cortex upgrade to fix an issue where unhealthy compactors can't be forgotten [#878](https://github.com/grafana/tempo/pull/878) (@joe-elliott)
* [ENHANCEMENT] Added "query blocks" cli option. [#876](https://github.com/grafana/tempo/pull/876) (@joe-elliott)
* [ENHANCEMENT] Make s3 backend readError logic more robust [#905](https://github.com/grafana/tempo/pull/905) (@wei840222)
* [ENHANCEMENT] Include additional detail when searching for traces [#916](https://github.com/grafana/tempo/pull/916) (@zalegrala)
* [ENHANCEMENT] Add `gen index` and `gen bloom` commands to tempo-cli. [#903](https://github.com/grafana/tempo/pull/903) (@annanay25)

## v1.1.0 / 2021-08-26

Expand Down
163 changes: 163 additions & 0 deletions cmd/tempo-cli/cmd-gen-bloom.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package main

import (
"bytes"
"context"
"fmt"
"io"
"os"
"strconv"

"github.com/google/uuid"
willf_bloom "github.com/willf/bloom"

"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/encoding"
"github.com/grafana/tempo/tempodb/encoding/common"
)

type bloomCmd struct {
TenantID string `arg:"" help:"tenant-id within the bucket"`
BlockID string `arg:"" help:"block ID to list"`
BloomFP float64 `arg:"" help:"bloom filter false positive rate (use prod settings!)"`
BloomShardSize int `arg:"" help:"bloom filter shard size (use prod settings!)"`
backendOptions
}

type forEachRecord func(id common.ID) error

func ReplayBlockAndDoForEachRecord(meta *backend.BlockMeta, filepath string, forEach forEachRecord) error {
v, err := encoding.FromVersion(meta.Version)
if err != nil {
return err
}

// replay file to extract records
f, err := os.OpenFile(filepath, os.O_RDONLY, 0644)
if err != nil {
return err
}

dataReader, err := v.NewDataReader(backend.NewContextReaderWithAllReader(f), meta.Encoding)
if err != nil {
return fmt.Errorf("error creating data reader: %w", err)
}
defer dataReader.Close()

var buffer []byte
objectRW := v.NewObjectReaderWriter()
for {
buffer, _, err := dataReader.NextPage(buffer)
if err == io.EOF {
break
}
if err != nil {
return fmt.Errorf("error reading page from datareader: %w", err)
}

iter := encoding.NewIterator(bytes.NewReader(buffer), objectRW)
var iterErr error
for {
var id common.ID
id, _, iterErr = iter.Next(context.TODO())
if iterErr != nil {
break
}
err := forEach(id)
if err != nil {
return fmt.Errorf("error adding to bloom filter: %w", err)
}
}

if iterErr != io.EOF {
return iterErr
}
}

return nil
}

func (cmd *bloomCmd) Run(ctx *globalOptions) error {
blockID, err := uuid.Parse(cmd.BlockID)
if err != nil {
return err
}

r, w, _, err := loadBackend(&cmd.backendOptions, ctx)
if err != nil {
return err
}

meta, err := r.BlockMeta(context.TODO(), blockID, cmd.TenantID)
if err != nil {
return err
}

// replay file and add records to bloom filter
bloom := common.NewBloom(cmd.BloomFP, uint(cmd.BloomShardSize), uint(meta.TotalObjects))
if bloom.GetShardCount() != int(meta.BloomShardCount) {
err := fmt.Errorf("shards in generated bloom filter do not match block meta, please use prod settings for bloom shard size and FP")
fmt.Println(err.Error())
return err
}

addToBloom := func(id common.ID) error {
bloom.Add(id)
return nil
}

err = ReplayBlockAndDoForEachRecord(meta, cmd.backendOptions.Bucket+cmd.TenantID+"/"+cmd.BlockID+"/"+dataFilename, addToBloom)
if err != nil {
fmt.Println("error replaying block", err)
return err
}

// write to the local backend
bloomBytes, err := bloom.Marshal()
if err != nil {
fmt.Println("error marshalling bloom filter")
return err
}

for i := 0; i < len(bloomBytes); i++ {
err = w.Write(context.TODO(), bloomFilePrefix+strconv.Itoa(i), blockID, cmd.TenantID, bloomBytes[i], false)
if err != nil {
fmt.Println("error writing bloom filter to backend", err)
return err
}
}

fmt.Println("bloom written to backend successfully")

// verify generated bloom
shardedBloomFilter := make([]*willf_bloom.BloomFilter, meta.BloomShardCount)
for i := 0; i < int(meta.BloomShardCount); i++ {
bloomBytes, err := r.Read(context.TODO(), bloomFilePrefix+strconv.Itoa(i), blockID, cmd.TenantID, false)
if err != nil {
fmt.Println("error reading bloom from backend")
return nil
}
shardedBloomFilter[i] = &willf_bloom.BloomFilter{}
_, err = shardedBloomFilter[i].ReadFrom(bytes.NewReader(bloomBytes))
if err != nil {
fmt.Println("error parsing bloom")
return nil
}
}

testBloom := func(id common.ID) error {
key := common.ShardKeyForTraceID(id, int(meta.BloomShardCount))
if !shardedBloomFilter[key].Test(id) {
return fmt.Errorf("id not added to bloom, filter is likely corrupt")
}
return nil
}
err = ReplayBlockAndDoForEachRecord(meta, cmd.backendOptions.Bucket+cmd.TenantID+"/"+cmd.BlockID+"/"+dataFilename, testBloom)
if err != nil {
fmt.Println("error replaying block", err)
return err
}

fmt.Println("bloom filter verified")
return nil
}
195 changes: 195 additions & 0 deletions cmd/tempo-cli/cmd-gen-index.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
package main

import (
"bytes"
"context"
"fmt"
"io"
"os"

"github.com/google/uuid"

"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/encoding"
"github.com/grafana/tempo/tempodb/encoding/common"
)

type indexCmd struct {
TenantID string `arg:"" help:"tenant-id within the bucket"`
BlockID string `arg:"" help:"block ID to list"`
backendOptions
}

func ReplayBlockAndGetRecords(meta *backend.BlockMeta, filepath string) ([]common.Record, error, error) {
v, err := encoding.FromVersion(meta.Version)
if err != nil {
return nil, nil, err
}

var replayError error
// replay file to extract records
f, err := os.OpenFile(filepath, os.O_RDONLY, 0644)
if err != nil {
return nil, nil, err
}

dataReader, err := v.NewDataReader(backend.NewContextReaderWithAllReader(f), meta.Encoding)
if err != nil {
return nil, nil, err
}
defer dataReader.Close()

var buffer []byte
var records []common.Record
objectRW := v.NewObjectReaderWriter()
currentOffset := uint64(0)
for {
buffer, pageLen, err := dataReader.NextPage(buffer)
if err == io.EOF {
break
}
if err != nil {
replayError = err
break
}

iter := encoding.NewIterator(bytes.NewReader(buffer), objectRW)
var lastID common.ID
var iterErr error
for {
var id common.ID
id, _, iterErr = iter.Next(context.TODO())
if iterErr != nil {
break
}
lastID = id
}

if iterErr != io.EOF {
replayError = iterErr
break
}

// make a copy so we don't hold onto the iterator buffer
recordID := append([]byte(nil), lastID...)
records = append(records, common.Record{
ID: recordID,
Start: currentOffset,
Length: pageLen,
})
currentOffset += uint64(pageLen)
}

return records, replayError, nil
}

func VerifyIndex(indexReader common.IndexReader, dataReader common.DataReader) error {
for i := 0; ; i++ {
record, err := indexReader.At(context.TODO(), i)
if err != nil {
return err
}

if record == nil {
break
}

// read data file at record position
_, _, err = dataReader.Read(context.TODO(), []common.Record{*record}, nil)
if err != nil {
fmt.Println("index/data is corrupt, record/data mismatch")
return err
}
}
return nil
}

func (cmd *indexCmd) Run(ctx *globalOptions) error {
blockID, err := uuid.Parse(cmd.BlockID)
if err != nil {
return err
}

r, w, _, err := loadBackend(&cmd.backendOptions, ctx)
if err != nil {
return err
}

meta, err := r.BlockMeta(context.TODO(), blockID, cmd.TenantID)
if err != nil {
return err
}

// replay file to extract records
records, replayError, err := ReplayBlockAndGetRecords(meta, cmd.backendOptions.Bucket+cmd.TenantID+"/"+cmd.BlockID+"/"+dataFilename)
if replayError != nil {
fmt.Println("error replaying block. data file likely corrupt", replayError)
return replayError
}
if err != nil {
fmt.Println("error accessing data/meta file")
return err
}

// write using IndexWriter
v, err := encoding.FromVersion(meta.Version)
if err != nil {
fmt.Println("error creating versioned encoding", err)
return err
}

indexWriter := v.NewIndexWriter(int(meta.IndexPageSize))
indexBytes, err := indexWriter.Write(records)
if err != nil {
fmt.Println("error writing records to indexWriter", err)
return err
}

// write to the local backend
err = w.Write(context.TODO(), "index", blockID, cmd.TenantID, indexBytes, false)
if err != nil {
fmt.Println("error writing index to backend", err)
return err
}

fmt.Println("index written to backend successfully")

// verify generated index

// get index file with records
indexFilePath := cmd.backendOptions.Bucket + cmd.TenantID + "/" + cmd.BlockID + "/" + indexFilename
indexFile, err := os.OpenFile(indexFilePath, os.O_RDONLY, 0644)
if err != nil {
fmt.Println("error opening index file")
return err
}

indexReader, err := v.NewIndexReader(backend.NewContextReaderWithAllReader(indexFile), int(meta.IndexPageSize), len(records))
if err != nil {
fmt.Println("error reading index file")
return err
}

// data reader
dataFilePath := cmd.backendOptions.Bucket + cmd.TenantID + "/" + cmd.BlockID + "/" + dataFilename
dataFile, err := os.OpenFile(dataFilePath, os.O_RDONLY, 0644)
if err != nil {
fmt.Println("error opening data file")
return err
}

dataReader, err := v.NewDataReader(backend.NewContextReaderWithAllReader(dataFile), meta.Encoding)
if err != nil {
fmt.Println("error reading data file")
return err
}
defer dataReader.Close()

err = VerifyIndex(indexReader, dataReader)
if err != nil {
return err
}

fmt.Println("index verified!")
return nil
}
2 changes: 1 addition & 1 deletion cmd/tempo-cli/cmd-list-block.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type listBlockCmd struct {
}

func (cmd *listBlockCmd) Run(ctx *globalOptions) error {
r, c, err := loadBackend(&cmd.backendOptions, ctx)
r, _, c, err := loadBackend(&cmd.backendOptions, ctx)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/tempo-cli/cmd-list-blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type listBlocksCmd struct {
}

func (l *listBlocksCmd) Run(ctx *globalOptions) error {
r, c, err := loadBackend(&l.backendOptions, ctx)
r, _, c, err := loadBackend(&l.backendOptions, ctx)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/tempo-cli/cmd-list-cachesummary.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type listCacheSummaryCmd struct {
}

func (l *listCacheSummaryCmd) Run(ctx *globalOptions) error {
r, c, err := loadBackend(&l.backendOptions, ctx)
r, _, c, err := loadBackend(&l.backendOptions, ctx)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 6f99d44

Please sign in to comment.