Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cli] Add command to generate index/bloom from a data file #903

Merged
merged 14 commits into from
Aug 30, 2021
163 changes: 163 additions & 0 deletions cmd/tempo-cli/cmd-gen-bloom.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package main

import (
"bytes"
"context"
"fmt"
"io"
"os"
"strconv"

"github.com/google/uuid"
willf_bloom "github.com/willf/bloom"

"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/encoding"
"github.com/grafana/tempo/tempodb/encoding/common"
)

type bloomCmd struct {
TenantID string `arg:"" help:"tenant-id within the bucket"`
BlockID string `arg:"" help:"block ID to list"`
BloomFP float64 `arg:"" help:"bloom filter false positive rate (use prod settings!)"`
BloomShardSize int `arg:"" help:"bloom filter shard size (use prod settings!)"`
backendOptions
}

type forEachRecord func(id common.ID) error

func ReplayBlockAndDoForEachRecord(meta *backend.BlockMeta, filepath string, forEach forEachRecord) error {
v, err := encoding.FromVersion(meta.Version)
if err != nil {
return err
}

// replay file to extract records
f, err := os.OpenFile(filepath, os.O_RDONLY, 0644)
if err != nil {
return err
}

dataReader, err := v.NewDataReader(backend.NewContextReaderWithAllReader(f), meta.Encoding)
if err != nil {
return fmt.Errorf("error creating data reader: %w", err)
}
defer dataReader.Close()

var buffer []byte
objectRW := v.NewObjectReaderWriter()
for {
buffer, _, err := dataReader.NextPage(buffer)
if err == io.EOF {
break
}
if err != nil {
return fmt.Errorf("error reading page from datareader: %w", err)
}

iter := encoding.NewIterator(bytes.NewReader(buffer), objectRW)
var iterErr error
for {
var id common.ID
id, _, iterErr = iter.Next(context.TODO())
if iterErr != nil {
break
}
err := forEach(id)
if err != nil {
return fmt.Errorf("error adding to bloom filter: %w", err)
}
}

if iterErr != io.EOF {
return iterErr
}
}

return nil
}

func (cmd *bloomCmd) Run(ctx *globalOptions) error {
blockID, err := uuid.Parse(cmd.BlockID)
if err != nil {
return err
}

r, w, _, err := loadBackend(&cmd.backendOptions, ctx)
if err != nil {
return err
}

meta, err := r.BlockMeta(context.TODO(), blockID, cmd.TenantID)
if err != nil {
return err
}

// replay file and add records to bloom filter
bloom := common.NewBloom(cmd.BloomFP, uint(cmd.BloomShardSize), uint(meta.TotalObjects))
if bloom.GetShardCount() != int(meta.BloomShardCount) {
err := fmt.Errorf("shards in generated bloom filter do not match block meta, please use prod settings for bloom shard size and FP")
fmt.Println(err.Error())
return err
}

addToBloom := func(id common.ID) error {
bloom.Add(id)
return nil
}

err = ReplayBlockAndDoForEachRecord(meta, cmd.backendOptions.Bucket+cmd.TenantID+"/"+cmd.BlockID+"/"+dataFilename, addToBloom)
if err != nil {
fmt.Println("error replaying block", err)
return err
}

// write to the local backend
bloomBytes, err := bloom.Marshal()
if err != nil {
fmt.Println("error marshalling bloom filter")
return err
}

for i := 0; i < len(bloomBytes); i++ {
err = w.Write(context.TODO(), bloomFilePrefix+strconv.Itoa(i), blockID, cmd.TenantID, bloomBytes[i], false)
if err != nil {
fmt.Println("error writing bloom filter to backend", err)
return err
}
}

fmt.Println("bloom written to backend successfully")

// verify generated bloom
shardedBloomFilter := make([]*willf_bloom.BloomFilter, meta.BloomShardCount)
for i := 0; i < int(meta.BloomShardCount); i++ {
bloomBytes, err := r.Read(context.TODO(), bloomFilePrefix+strconv.Itoa(i), blockID, cmd.TenantID, false)
if err != nil {
fmt.Println("error reading bloom from backend")
return nil
}
shardedBloomFilter[i] = &willf_bloom.BloomFilter{}
_, err = shardedBloomFilter[i].ReadFrom(bytes.NewReader(bloomBytes))
if err != nil {
fmt.Println("error parsing bloom")
return nil
}
}

testBloom := func(id common.ID) error {
key := common.ShardKeyForTraceID(id, int(meta.BloomShardCount))
if !shardedBloomFilter[key].Test(id) {
return fmt.Errorf("id not added to bloom, filter is likely corrupt")
}
return nil
}
err = ReplayBlockAndDoForEachRecord(meta, cmd.backendOptions.Bucket+cmd.TenantID+"/"+cmd.BlockID+"/"+dataFilename, testBloom)
if err != nil {
fmt.Println("error replaying block", err)
return err
}

fmt.Println("bloom filter verified")
return nil
}
195 changes: 195 additions & 0 deletions cmd/tempo-cli/cmd-gen-index.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
package main

import (
"bytes"
"context"
"fmt"
"io"
"os"

"github.com/google/uuid"

"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/encoding"
"github.com/grafana/tempo/tempodb/encoding/common"
)

type indexCmd struct {
TenantID string `arg:"" help:"tenant-id within the bucket"`
BlockID string `arg:"" help:"block ID to list"`
backendOptions
}

func ReplayBlockAndGetRecords(meta *backend.BlockMeta, filepath string) ([]common.Record, error, error) {
v, err := encoding.FromVersion(meta.Version)
if err != nil {
return nil, nil, err
}

var replayError error
// replay file to extract records
f, err := os.OpenFile(filepath, os.O_RDONLY, 0644)
if err != nil {
return nil, nil, err
}

dataReader, err := v.NewDataReader(backend.NewContextReaderWithAllReader(f), meta.Encoding)
if err != nil {
return nil, nil, err
}
defer dataReader.Close()

var buffer []byte
var records []common.Record
objectRW := v.NewObjectReaderWriter()
currentOffset := uint64(0)
for {
buffer, pageLen, err := dataReader.NextPage(buffer)
if err == io.EOF {
break
}
if err != nil {
replayError = err
break
}

iter := encoding.NewIterator(bytes.NewReader(buffer), objectRW)
var lastID common.ID
var iterErr error
for {
var id common.ID
id, _, iterErr = iter.Next(context.TODO())
if iterErr != nil {
break
}
lastID = id
}

if iterErr != io.EOF {
replayError = iterErr
break
}

// make a copy so we don't hold onto the iterator buffer
recordID := append([]byte(nil), lastID...)
records = append(records, common.Record{
ID: recordID,
Start: currentOffset,
Length: pageLen,
})
currentOffset += uint64(pageLen)
}

return records, replayError, nil
}

func VerifyIndex(indexReader common.IndexReader, dataReader common.DataReader) error {
for i := 0; ; i++ {
record, err := indexReader.At(context.TODO(), i)
if err != nil {
return err
}

if record == nil {
break
}

// read data file at record position
_, _, err = dataReader.Read(context.TODO(), []common.Record{*record}, nil)
if err != nil {
fmt.Println("index/data is corrupt, record/data mismatch")
return err
}
}
return nil
}

func (cmd *indexCmd) Run(ctx *globalOptions) error {
blockID, err := uuid.Parse(cmd.BlockID)
if err != nil {
return err
}

r, w, _, err := loadBackend(&cmd.backendOptions, ctx)
if err != nil {
return err
}

meta, err := r.BlockMeta(context.TODO(), blockID, cmd.TenantID)
if err != nil {
return err
}

// replay file to extract records
records, replayError, err := ReplayBlockAndGetRecords(meta, cmd.backendOptions.Bucket+cmd.TenantID+"/"+cmd.BlockID+"/"+dataFilename)
if replayError != nil {
fmt.Println("error replaying block. data file likely corrupt", replayError)
return replayError
}
if err != nil {
fmt.Println("error accessing data/meta file")
return err
}

// write using IndexWriter
v, err := encoding.FromVersion(meta.Version)
if err != nil {
fmt.Println("error creating versioned encoding", err)
return err
}

indexWriter := v.NewIndexWriter(int(meta.IndexPageSize))
indexBytes, err := indexWriter.Write(records)
if err != nil {
fmt.Println("error writing records to indexWriter", err)
return err
}

// write to the local backend
err = w.Write(context.TODO(), "index", blockID, cmd.TenantID, indexBytes, false)
if err != nil {
fmt.Println("error writing index to backend", err)
return err
}

fmt.Println("index written to backend successfully")

// verify generated index

// get index file with records
indexFilePath := cmd.backendOptions.Bucket + cmd.TenantID + "/" + cmd.BlockID + "/" + indexFilename
indexFile, err := os.OpenFile(indexFilePath, os.O_RDONLY, 0644)
if err != nil {
fmt.Println("error opening index file")
return err
}

indexReader, err := v.NewIndexReader(backend.NewContextReaderWithAllReader(indexFile), int(meta.IndexPageSize), len(records))
if err != nil {
fmt.Println("error reading index file")
return err
}

// data reader
dataFilePath := cmd.backendOptions.Bucket + cmd.TenantID + "/" + cmd.BlockID + "/" + dataFilename
dataFile, err := os.OpenFile(dataFilePath, os.O_RDONLY, 0644)
if err != nil {
fmt.Println("error opening data file")
return err
}

dataReader, err := v.NewDataReader(backend.NewContextReaderWithAllReader(dataFile), meta.Encoding)
if err != nil {
fmt.Println("error reading data file")
return err
}
defer dataReader.Close()

err = VerifyIndex(indexReader, dataReader)
if err != nil {
return err
}

fmt.Println("index verified!")
return nil
}
2 changes: 1 addition & 1 deletion cmd/tempo-cli/cmd-list-block.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type listBlockCmd struct {
}

func (cmd *listBlockCmd) Run(ctx *globalOptions) error {
r, c, err := loadBackend(&cmd.backendOptions, ctx)
r, _, c, err := loadBackend(&cmd.backendOptions, ctx)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/tempo-cli/cmd-list-blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type listBlocksCmd struct {
}

func (l *listBlocksCmd) Run(ctx *globalOptions) error {
r, c, err := loadBackend(&l.backendOptions, ctx)
r, _, c, err := loadBackend(&l.backendOptions, ctx)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/tempo-cli/cmd-list-cachesummary.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type listCacheSummaryCmd struct {
}

func (l *listCacheSummaryCmd) Run(ctx *globalOptions) error {
r, c, err := loadBackend(&l.backendOptions, ctx)
r, _, c, err := loadBackend(&l.backendOptions, ctx)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/tempo-cli/cmd-list-compactionsummary.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type listCompactionSummaryCmd struct {
}

func (l *listCompactionSummaryCmd) Run(ctx *globalOptions) error {
r, c, err := loadBackend(&l.backendOptions, ctx)
r, _, c, err := loadBackend(&l.backendOptions, ctx)
if err != nil {
return err
}
Expand Down
Loading