Skip to content

Commit

Permalink
feat: tempo-cli: add support for dropping multiple traces in single o…
Browse files Browse the repository at this point in the history
…peration
  • Loading branch information
ndk committed Nov 5, 2024
1 parent e19ab71 commit 1114f31
Show file tree
Hide file tree
Showing 17 changed files with 221 additions and 40 deletions.
104 changes: 66 additions & 38 deletions cmd/tempo-cli/cmd-rewrite-blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"os"
"strconv"
"strings"

"github.com/go-kit/log"
"github.com/google/uuid"
Expand All @@ -19,16 +20,16 @@ import (
"github.com/grafana/tempo/tempodb/encoding/common"
)

type dropTraceCmd struct {
type dropTracesCmd struct {
backendOptions

TraceID string `arg:"" help:"trace ID to retrieve"`
TenantID string `arg:"" help:"tenant ID to search"`
DropTrace bool `name:"drop-trace" help:"actually attempt to drop the trace" default:"false"`
}

func (cmd *dropTraceCmd) Run(ctx *globalOptions) error {
fmt.Printf("beginning process to drop trace %v from tenant %v\n", cmd.TraceID, cmd.TenantID)
func (cmd *dropTracesCmd) Run(opts *globalOptions) error {
fmt.Printf("beginning process to drop traces %v from tenant %v\n", cmd.TraceID, cmd.TenantID)
fmt.Println("**warning**: compaction must be disabled or a compactor may duplicate a block as this process is rewriting it")
fmt.Println("")
if cmd.DropTrace {
Expand All @@ -38,51 +39,72 @@ func (cmd *dropTraceCmd) Run(ctx *globalOptions) error {
fmt.Println("")
}

r, w, c, err := loadBackend(&cmd.backendOptions, ctx)
if err != nil {
return err
}
ctx := context.Background()

id, err := util.HexStringToTraceID(cmd.TraceID)
r, w, c, err := loadBackend(&cmd.backendOptions, opts)
if err != nil {
return err
}

blocks, err := blocksWithTraceID(context.Background(), r, cmd.TenantID, id)
if err != nil {
return err
type pair struct {
traceIDs []common.ID
blockMeta *backend.BlockMeta
}
tracesByBlock := map[backend.UUID]pair{}

if len(blocks) == 0 {
fmt.Println("\ntrace not found in any block. aborting")
return nil
}
// Group trace IDs by blocks
ids := strings.Split(cmd.TraceID, ",")
for _, id := range ids {
traceID, err := util.HexStringToTraceID(id)
if err != nil {
return err
}

// print out blocks that have the trace id
fmt.Println("\n\ntrace found in:")
for _, block := range blocks {
fmt.Printf(" %v sz: %d traces: %d\n", block.BlockID, block.Size_, block.TotalObjects)
}
// It might be significantly improved if common.BackendBlock supported bulk searches.
blocks, err := blocksWithTraceID(ctx, r, cmd.TenantID, traceID)
if err != nil {
return err
}

if !cmd.DropTrace {
fmt.Println("**not dropping trace, use --drop-trace to actually drop**")
return nil
if len(blocks) == 0 {
fmt.Printf("\ntrace %s not found in any block. skipping", traceID)
}
for _, block := range blocks {
p, ok := tracesByBlock[block.BlockID]
if !ok {
p = pair{blockMeta: block}
}
p.traceIDs = append(p.traceIDs, traceID)
tracesByBlock[block.BlockID] = p
}
}

fmt.Println("rewriting blocks:")
for _, block := range blocks {
fmt.Printf(" rewriting %v\n", block.BlockID)
newBlock, err := rewriteBlock(context.Background(), r, w, block, id)
// Remove traces from blocks
for _, p := range tracesByBlock {
// print out trace IDs to be removed in the block
{
traceIDs := make([]string, len(p.traceIDs))
for i, tid := range p.traceIDs {
traceIDs[i] = util.SpanIDToHexString(tid)
}
fmt.Printf("\n\ntraces %v found in:\n", traceIDs)
}
fmt.Printf(" %v sz: %d traces: %d\n", p.blockMeta.BlockID, p.blockMeta.Size_, p.blockMeta.TotalObjects)

if !cmd.DropTrace {
fmt.Println("**not dropping trace, use --drop-trace to actually drop**")
continue
}

fmt.Printf(" rewriting %v\n", p.blockMeta.BlockID)
newMeta, err := rewriteBlock(ctx, r, w, p.blockMeta, p.traceIDs)
if err != nil {
return err
}
fmt.Printf(" rewrote to new block: %v\n", newBlock.BlockID)
}
fmt.Printf(" rewrote to new block: %v\n", newMeta.BlockID)

fmt.Println("marking old blocks compacted")
for _, block := range blocks {
fmt.Printf(" marking %v\n", block.BlockID)
err = c.MarkBlockCompacted((uuid.UUID)(block.BlockID), block.TenantID)
fmt.Printf(" marking %v compacted\n", p.blockMeta.BlockID)
err = c.MarkBlockCompacted((uuid.UUID)(p.blockMeta.BlockID), p.blockMeta.TenantID)
if err != nil {
return err
}
Expand All @@ -93,7 +115,7 @@ func (cmd *dropTraceCmd) Run(ctx *globalOptions) error {
return nil
}

func rewriteBlock(ctx context.Context, r backend.Reader, w backend.Writer, meta *backend.BlockMeta, traceID common.ID) (*backend.BlockMeta, error) {
func rewriteBlock(ctx context.Context, r backend.Reader, w backend.Writer, meta *backend.BlockMeta, traceIDs []common.ID) (*backend.BlockMeta, error) {
enc, err := encoding.FromVersion(meta.Version)
if err != nil {
return nil, fmt.Errorf("error getting encoder: %w", err)
Expand Down Expand Up @@ -131,7 +153,13 @@ func rewriteBlock(ctx context.Context, r backend.Reader, w backend.Writer, meta

// hook to drop the trace
DropObject: func(id common.ID) bool {
return bytes.Equal(id, traceID)
// Should it be replaced with binary search, trie or other ones?
for _, tid := range traceIDs {
if bytes.Equal(id, tid) {
return true
}
}
return false
},

// setting to prevent panics. should we track and report these?
Expand All @@ -157,15 +185,15 @@ func rewriteBlock(ctx context.Context, r backend.Reader, w backend.Writer, meta

newMeta := out[0]

if newMeta.TotalObjects != meta.TotalObjects-1 {
if newMeta.TotalObjects != meta.TotalObjects-int64(len(traceIDs)) {
return nil, fmt.Errorf("expected output to have one less object then in. out: %d in: %d", newMeta.TotalObjects, meta.TotalObjects)
}

return newMeta, nil
}

func blocksWithTraceID(ctx context.Context, r backend.Reader, tenantID string, traceID common.ID) ([]*backend.BlockMeta, error) {
blockIDs, _, err := r.Blocks(context.Background(), tenantID)
blockIDs, _, err := r.Blocks(ctx, tenantID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -210,7 +238,7 @@ func isInBlock(ctx context.Context, r backend.Reader, blockNum int, id uuid.UUID
fmt.Print(strconv.Itoa(blockNum))
}

meta, err := r.BlockMeta(context.Background(), id, tenantID)
meta, err := r.BlockMeta(ctx, id, tenantID)
if err != nil && !errors.Is(err, backend.ErrDoesNotExist) {
return nil, err
}
Expand Down
127 changes: 127 additions & 0 deletions cmd/tempo-cli/cmd-rewrite-blocks_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package main

import (
"context"
"encoding/hex"
"fmt"
"os"
"strings"
"testing"

"github.com/parquet-go/parquet-go"
"github.com/stretchr/testify/require"

tempo_io "github.com/grafana/tempo/pkg/io"
"github.com/grafana/tempo/pkg/parquetquery"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/backend/local"
"github.com/grafana/tempo/tempodb/encoding/common"
"github.com/grafana/tempo/tempodb/encoding/vparquet4"
)

func TestDropTraceCmd(t *testing.T) {
for s := 2; s < 10; s++ {
s := s
t.Run(fmt.Sprintf("drop every %d trace", s), func(t *testing.T) {
tempDir := t.TempDir()
err := os.CopyFS(tempDir, os.DirFS("./test-data2"))
require.NoError(t, err)

cmd := dropTracesCmd{
backendOptions: backendOptions{
Backend: "local",
Bucket: tempDir,
},
TenantID: "single-tenant",
DropTrace: true,
}

before := getAllTraceIDs(t, cmd.backendOptions.Bucket, cmd.TenantID)

var toRemove []string
for i, traceID := range before {
if i%s == 0 {
toRemove = append(toRemove, traceID)
}
}
cmd.TraceID = strings.Join(toRemove, ",")

err = cmd.Run(&globalOptions{})
require.NoError(t, err)

after := getAllTraceIDs(t, cmd.backendOptions.Bucket, cmd.TenantID)
after = append(after, toRemove...)

require.ElementsMatch(t, before, after)
})
}
}

func getAllTraceIDs(t *testing.T, dir string, tenant string) []string {
t.Helper()

rawR, _, _, err := local.New(&local.Config{
Path: dir,
})
require.NoError(t, err)

reader := backend.NewReader(rawR)
ctx := context.Background()

tenants, err := reader.Tenants(ctx)
require.NoError(t, err)
require.Equal(t, []string{tenant}, tenants)

blocks, _, err := reader.Blocks(ctx, tenant)
require.NoError(t, err)

var traceIDs []string
for _, block := range blocks {
meta, err := reader.BlockMeta(ctx, block, tenant)
require.NoError(t, err)
rr := vparquet4.NewBackendReaderAt(ctx, reader, vparquet4.DataFileName, meta)
br := tempo_io.NewBufferedReaderAt(rr, int64(meta.Size_), 2*1024*1024, 64)
parquetSchema := parquet.SchemaOf(&vparquet4.Trace{})
o := []parquet.FileOption{
parquet.SkipBloomFilters(true),
parquet.SkipPageIndex(true),
parquet.FileSchema(parquetSchema),
parquet.FileReadMode(parquet.ReadModeAsync),
}
pf, err := parquet.OpenFile(br, int64(meta.Size_), o...)
require.NoError(t, err)
r := parquet.NewReader(pf, parquetSchema)
defer func() {
err := r.Close()
require.NoError(t, err)
}()
traceIDIndex, _ := parquetquery.GetColumnIndexByPath(pf, vparquet4.TraceIDColumnName)
require.GreaterOrEqual(t, traceIDIndex, 0)
defer func() {
err := r.Close()
require.NoError(t, err)
}()

rows := make([]parquet.Row, r.NumRows())
n, err := r.ReadRows(rows)
require.NoError(t, err)
require.Len(t, rows, n)

getTraceID := func(row parquet.Row) common.ID {
for _, v := range row {
if v.Column() == traceIDIndex {
return v.ByteArray()
}
}

return nil
}

for _, row := range rows {
traceID := getTraceID(row)
traceIDs = append(traceIDs, hex.EncodeToString(traceID))
}
}

return traceIDs
}
3 changes: 2 additions & 1 deletion cmd/tempo-cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ var cli struct {
} `cmd:""`

RewriteBlocks struct {
DropTrace dropTraceCmd `cmd:"" help:"rewrite blocks with a given trace id redacted"`
DropTrace dropTracesCmd `cmd:"" help:"rewrite blocks with a given trace id redacted"`
DropTraces dropTracesCmd `cmd:"" help:"rewrite blocks with a given traces id redacted"`
} `cmd:""`

Parquet struct {
Expand Down
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"rowGroups":["6IyULoGuhkdAfMbAitQ5UQ=="]}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"format":"vParquet4","blockID":"54aa0b42-d9e3-40ae-ab2e-3f96b2036232","tenantID":"single-tenant","startTime":"2024-11-03T12:50:54Z","endTime":"2024-11-03T12:51:57Z","totalObjects":10,"size":59698,"compactionLevel":0,"encoding":"none","indexPageSize":0,"totalRecords":1,"dataEncoding":"","bloomShards":1,"footerSize":17870}
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"rowGroups":["+dkSCCdj5rqDenl14oO13Q=="]}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"format":"vParquet4","blockID":"715bd008-5ae1-4c5d-a020-53b7313c2b0c","tenantID":"single-tenant","startTime":"2024-11-03T12:50:55Z","endTime":"2024-11-03T12:51:52Z","totalObjects":5,"size":47783,"compactionLevel":0,"encoding":"none","indexPageSize":0,"totalRecords":1,"dataEncoding":"","bloomShards":1,"footerSize":17770}
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"rowGroups":["9gWrxA79L52xrT54Fi9mJw=="]}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"format":"vParquet4","blockID":"c498e1b0-3d66-465a-92a7-6e2ad0176465","tenantID":"single-tenant","startTime":"2024-11-03T12:50:56Z","endTime":"2024-11-03T12:51:42Z","totalObjects":5,"size":48843,"compactionLevel":0,"encoding":"none","indexPageSize":0,"totalRecords":1,"dataEncoding":"","bloomShards":1,"footerSize":17718}
1 change: 1 addition & 0 deletions cmd/tempo-cli/test-data2/tempo_cluster_seed.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"UID":"990fa882-530a-4c44-899d-e7011e8d673a","created_at":"2024-11-03T12:50:59.446432341Z","version":{"version":"v2.6.1","revision":"e19ab7152","branch":"main","buildUser":"","buildDate":"","goVersion":"go1.23.2"}}
20 changes: 19 additions & 1 deletion docs/sources/tempo/operations/tempo_cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ Options:
tempo-cli analyse blocks --backend=local --bucket=./cmd/tempo-cli/test-data/ single-tenant
```

## Drop trace by id
## Drop trace by id (deprecated)

Rewrites all blocks for a tenant that contain a specific trace id. The trace is dropped from
the new blocks and the rewritten blocks are marked compacted so they will be cleaned up.
Expand All @@ -523,3 +523,21 @@ Options:
```bash
tempo-cli rewrite-blocks drop-trace --backend=local --bucket=./cmd/tempo-cli/test-data/ single-tenant a188ea38aa3a83d74523774ad6728cc8
```

## Drop traces by id

Rewrites all blocks for a tenant that contain a specific trace ids. The traces are dropped from
the new blocks and the rewritten blocks are marked compacted so they will be cleaned up.

Arguments:
- `tenant-id` The tenant ID. Use `single-tenant` for single tenant setups.
- `trace-ids` The trace ids to drop

Options:
- [Backend options](#backend-options)
- `--drop-traces` By default this command runs in dry run mode. Supplying this argument causes it to actually rewrite blocks with the traces dropped.

**Example:**
```bash
tempo-cli rewrite-blocks drop-trace --backend=local --bucket=./cmd/tempo-cli/test-data/ single-tenant 04d5f549746c96e4f3daed6202571db2,111fa1850042aea83c17cd7e674210b8
```

0 comments on commit 1114f31

Please sign in to comment.