Skip to content

Commit

Permalink
Add new list compaction-summary command to tempo-cli (#588)
Browse files Browse the repository at this point in the history
* Add new list compaction-summary command to tempo-cli

* Update changelog
  • Loading branch information
mdisibio authored Mar 11, 2021
1 parent 73546da commit cbd6102
Show file tree
Hide file tree
Showing 5 changed files with 266 additions and 136 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* [ENHANCEMENT] Add a Shutdown handler to flush data to backend, at "/shutdown". [#526](https://github.com/grafana/tempo/pull/526)
* [ENHANCEMENT] Queriers now query all (healthy) ingesters for a trace to mitigate 404s on ingester rollouts/scaleups.
This is a **breaking change** and will likely result in query errors on rollout as the query signature b/n QueryFrontend & Querier has changed. [#557](https://github.com/grafana/tempo/pull/557)
* [ENHANCEMENT] Add list compaction-summary command to tempo-cli [#588](https://github.com/grafana/tempo/pull/588)
* [BUGFIX] Fixes permissions errors on startup in GCS. [#554](https://github.com/grafana/tempo/pull/554)
* [BUGFIX] Fixes error where Dell ECS cannot list objects. [#561](https://github.com/grafana/tempo/pull/561)
* [BUGFIX] Fixes listing blocks in S3 when the list is truncated. [#567](https://github.com/grafana/tempo/pull/567)
Expand Down
76 changes: 1 addition & 75 deletions cmd/tempo-cli/cmd-list-blocks.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,18 @@
package main

import (
"context"
"fmt"
"os"
"sort"
"strconv"
"time"

"github.com/dustin/go-humanize"
"github.com/google/uuid"
"github.com/grafana/tempo/pkg/boundedwaitgroup"
tempodb_backend "github.com/grafana/tempo/tempodb/backend"
"github.com/olekukonko/tablewriter"
)

type listBlocksCmd struct {
TenantID string `arg:"" help:"tenant-id within the bucket"`
IncludeCompacted bool `help:"include compacted blocks"`

backendOptions
}

Expand All @@ -36,76 +30,8 @@ func (l *listBlocksCmd) Run(ctx *globalOptions) error {
}

displayResults(results, windowDuration, l.IncludeCompacted)
return nil
}

type blockStats struct {
unifiedBlockMeta
}

func loadBucket(r tempodb_backend.Reader, c tempodb_backend.Compactor, tenantID string, windowRange time.Duration, includeCompacted bool) ([]blockStats, error) {
blockIDs, err := r.Blocks(context.Background(), tenantID)
if err != nil {
return nil, err
}

fmt.Println("total blocks: ", len(blockIDs))

// Load in parallel
wg := boundedwaitgroup.New(10)
resultsCh := make(chan blockStats, len(blockIDs))

for _, id := range blockIDs {
wg.Add(1)

go func(id2 uuid.UUID) {
defer wg.Done()

b, err := loadBlock(r, c, tenantID, id2, windowRange, includeCompacted)
if err != nil {
fmt.Println("Error loading block:", id2, err)
return
}

if b != nil {
resultsCh <- *b
}
}(id)
}

wg.Wait()
close(resultsCh)

results := make([]blockStats, 0)
for b := range resultsCh {
results = append(results, b)
}

sort.Slice(results, func(i, j int) bool {
return results[i].end.Before(results[j].end)
})

return results, nil
}

func loadBlock(r tempodb_backend.Reader, c tempodb_backend.Compactor, tenantID string, id uuid.UUID, windowRange time.Duration, includeCompacted bool) (*blockStats, error) {
fmt.Print(".")

meta, err := r.BlockMeta(context.Background(), id, tenantID)
if err == tempodb_backend.ErrMetaDoesNotExist && !includeCompacted {
return nil, nil
} else if err != nil && err != tempodb_backend.ErrMetaDoesNotExist {
return nil, err
}

compactedMeta, err := c.CompactedBlockMeta(id, tenantID)
if err != nil && err != tempodb_backend.ErrMetaDoesNotExist {
return nil, err
}

return &blockStats{
unifiedBlockMeta: getMeta(meta, compactedMeta, windowRange),
}, nil
return nil
}

func displayResults(results []blockStats, windowDuration time.Duration, includeCompacted bool) {
Expand Down
124 changes: 124 additions & 0 deletions cmd/tempo-cli/cmd-list-compactionsummary.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package main

import (
"fmt"
"os"
"sort"
"strconv"
"time"

"github.com/dustin/go-humanize"
"github.com/olekukonko/tablewriter"
)

type listCompactionSummaryCmd struct {
TenantID string `arg:"" help:"tenant-id within the bucket"`
backendOptions
}

func (l *listCompactionSummaryCmd) Run(ctx *globalOptions) error {
r, c, err := loadBackend(&l.backendOptions, ctx)
if err != nil {
return err
}

windowDuration := time.Hour

results, err := loadBucket(r, c, l.TenantID, windowDuration, false)
if err != nil {
return err
}

displayCompactionSummary(results)

return nil
}

func displayCompactionSummary(results []blockStats) {
fmt.Println()
fmt.Println("Stats by compaction level:")
resultsByLevel := make(map[int][]blockStats)
var levels []int
for _, r := range results {
l := int(r.compactionLevel)

s, ok := resultsByLevel[l]
if !ok {
s = make([]blockStats, 0)
levels = append(levels, l)
}

s = append(s, r)
resultsByLevel[l] = s
}

sort.Ints(levels)

columns := []string{"lvl", "blocks", "total", "smallest block", "largest block", "earliest", "latest"}

out := make([][]string, 0)

for _, l := range levels {
sizeSum := uint64(0)
sizeMin := uint64(0)
sizeMax := uint64(0)
countSum := 0
countMin := 0
countMax := 0
var newest time.Time
var oldest time.Time
for _, r := range resultsByLevel[l] {
sizeSum += r.size
countSum += r.objects

if r.size < sizeMin || sizeMin == 0 {
sizeMin = r.size
}
if r.size > sizeMax {
sizeMax = r.size
}
if r.objects < countMin || countMin == 0 {
countMin = r.objects
}
if r.objects > countMax {
countMax = r.objects
}
if r.start.Before(oldest) || oldest.IsZero() {
oldest = r.start
}
if r.end.After(newest) {
newest = r.end
}
}

line := make([]string, 0)

for _, c := range columns {
s := ""
switch c {
case "lvl":
s = strconv.Itoa(l)
case "blocks":
s = fmt.Sprintf("%d (%d %%)", len(resultsByLevel[l]), len(resultsByLevel[l])*100/len(results))
case "total":
s = fmt.Sprintf("%s objects (%s)", humanize.Comma(int64(countSum)), humanize.Bytes(sizeSum))
case "smallest block":
s = fmt.Sprintf("%s objects (%s)", humanize.Comma(int64(countMin)), humanize.Bytes(sizeMin))
case "largest block":
s = fmt.Sprintf("%s objects (%s)", humanize.Comma(int64(countMax)), humanize.Bytes(sizeMax))
case "earliest":
s = fmt.Sprint(time.Since(oldest).Round(time.Second), " ago")
case "latest":
s = fmt.Sprint(time.Since(newest).Round(time.Second), " ago")
}
line = append(line, s)
}
out = append(out, line)
}

fmt.Println()
w := tablewriter.NewWriter(os.Stdout)
w.SetHeader(columns)
w.AppendBulk(out)
w.Render()
}
67 changes: 6 additions & 61 deletions cmd/tempo-cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,9 @@ import (
"flag"
"fmt"
"io/ioutil"
"time"

"github.com/google/uuid"
"github.com/grafana/tempo/cmd/tempo/app"
"github.com/grafana/tempo/tempodb/backend"
tempodb_backend "github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/backend/local"
"gopkg.in/yaml.v2"

Expand All @@ -36,8 +33,9 @@ var cli struct {
globalOptions

List struct {
Block listBlockCmd `cmd:"" help:"List information about a block"`
Blocks listBlocksCmd `cmd:"" help:"List information about all blocks in a bucket"`
Block listBlockCmd `cmd:"" help:"List information about a block"`
Blocks listBlocksCmd `cmd:"" help:"List information about all blocks in a bucket"`
CompactionSummary listCompactionSummaryCmd `cmd:"" help:"List summary of data by compaction level"`
} `cmd:""`

Query queryCmd `cmd:"" help:"query tempo api"`
Expand All @@ -54,7 +52,7 @@ func main() {
ctx.FatalIfErrorf(err)
}

func loadBackend(b *backendOptions, g *globalOptions) (tempodb_backend.Reader, tempodb_backend.Compactor, error) {
func loadBackend(b *backendOptions, g *globalOptions) (backend.Reader, backend.Compactor, error) {
// Defaults
cfg := app.Config{}
cfg.RegisterFlagsAndApplyDefaults("", &flag.FlagSet{})
Expand Down Expand Up @@ -89,8 +87,8 @@ func loadBackend(b *backendOptions, g *globalOptions) (tempodb_backend.Reader, t
}

var err error
var r tempodb_backend.Reader
var c tempodb_backend.Compactor
var r backend.Reader
var c backend.Compactor

switch cfg.StorageConfig.Trace.Backend {
case "local":
Expand All @@ -111,56 +109,3 @@ func loadBackend(b *backendOptions, g *globalOptions) (tempodb_backend.Reader, t

return r, c, nil
}

type unifiedBlockMeta struct {
id uuid.UUID
compactionLevel uint8
objects int
size uint64
window int64
start time.Time
end time.Time
compacted bool
version string
encoding string
}

func getMeta(meta *backend.BlockMeta, compactedMeta *backend.CompactedBlockMeta, windowRange time.Duration) unifiedBlockMeta {
if meta != nil {
return unifiedBlockMeta{
id: meta.BlockID,
compactionLevel: meta.CompactionLevel,
objects: meta.TotalObjects,
size: meta.Size,
window: meta.EndTime.Unix() / int64(windowRange/time.Second),
start: meta.StartTime,
end: meta.EndTime,
compacted: false,
version: meta.Version,
encoding: meta.Encoding.String(),
}
}
if compactedMeta != nil {
return unifiedBlockMeta{
id: compactedMeta.BlockID,
compactionLevel: compactedMeta.CompactionLevel,
objects: compactedMeta.TotalObjects,
size: compactedMeta.Size,
window: compactedMeta.EndTime.Unix() / int64(windowRange/time.Second),
start: compactedMeta.StartTime,
end: compactedMeta.EndTime,
compacted: true,
version: compactedMeta.Version,
encoding: compactedMeta.Encoding.String(),
}
}
return unifiedBlockMeta{
id: uuid.UUID{},
compactionLevel: 0,
objects: -1,
window: -1,
start: time.Unix(0, 0),
end: time.Unix(0, 0),
compacted: false,
}
}
Loading

0 comments on commit cbd6102

Please sign in to comment.