Skip to content

Commit

Permalink
Log when a trace is too large to compact
Browse files Browse the repository at this point in the history
  • Loading branch information
scalalang2 committed Feb 23, 2023
1 parent 79f0b27 commit df93a32
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* [BUGFIX] Apply `rate()` to bytes/s panel in tenant's dashboard. [#2081](https://github.com/grafana/tempo/pull/2081) (@mapno)
* [BUGFIX] Correctly coalesce trace level data when combining Parquet traces. [#2095](https://github.com/grafana/tempo/pull/2095) (@joe-elliott)
* [CHANGE] Update Go to 1.20 [#2079](https://github.com/grafana/tempo/pull/2079) (@scalalang2)
* [ENHANCEMENT] Log when a trace is too large to compact [#2105](https://github.com/grafana/tempo/pull/2105) (@scalalang2)

## v2.0.0 / 2023-01-31

Expand Down
21 changes: 14 additions & 7 deletions modules/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
tempoUtil "github.com/grafana/tempo/pkg/util"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"

Expand Down Expand Up @@ -236,12 +237,13 @@ func (c *Compactor) Combine(dataEncoding string, tenantID string, objs ...[]byte
return objs[0], wasCombined, nil
}

spansDiscarded := countSpans(dataEncoding, objs[1:]...)
overrides.RecordDiscardedSpans(spansDiscarded, reasonCompactorDiscardedSpans, tenantID)
totalDiscarded := countSpans(dataEncoding, objs[1:]...)
overrides.RecordDiscardedSpans(totalDiscarded, reasonCompactorDiscardedSpans, tenantID)
return objs[0], wasCombined, nil
}

func (c *Compactor) RecordDiscardedSpans(count int, tenantID string) {
func (c *Compactor) RecordDiscardedSpans(count int, tenantID string, traceID string) {
level.Warn(log.Logger).Log("msg", "max size of trace exceeded", "tenant", tenantID, "traceId", traceID, "discarded_span_count", count)
overrides.RecordDiscardedSpans(count, reasonCompactorDiscardedSpans, tenantID)
}

Expand Down Expand Up @@ -293,12 +295,12 @@ func (c *Compactor) OnRingInstanceStopping(lifecycler *ring.BasicLifecycler) {}
func (c *Compactor) OnRingInstanceHeartbeat(lifecycler *ring.BasicLifecycler, ringDesc *ring.Desc, instanceDesc *ring.InstanceDesc) {
}

func countSpans(dataEncoding string, objs ...[]byte) int {
func countSpans(dataEncoding string, objs ...[]byte) (total int) {
var traceID string
decoder, err := model.NewObjectDecoder(dataEncoding)
if err != nil {
return 0
}
spans := 0

for _, o := range objs {
t, err := decoder.PrepareForRead(o)
Expand All @@ -308,10 +310,15 @@ func countSpans(dataEncoding string, objs ...[]byte) int {

for _, b := range t.Batches {
for _, ilm := range b.ScopeSpans {
spans += len(ilm.Spans)
if len(ilm.Spans) > 0 && traceID == "" {
traceID = tempoUtil.TraceIDToHexString(ilm.Spans[0].TraceId)
}
total += len(ilm.Spans)
}
}
}

return spans
level.Debug(log.Logger).Log("msg", "max size of trace exceeded", "traceId", traceID, "discarded_span_count", total)

return
}
12 changes: 7 additions & 5 deletions modules/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,13 @@ func TestCountSpans(t *testing.T) {
b1 := encode(t, t1)
b2 := encode(t, t2)

assert.Equal(t, t1ExpectedSpans, countSpans(model.CurrentEncoding, b1))
assert.Equal(t, t2ExpectedSpans, countSpans(model.CurrentEncoding, b2))
assert.Equal(t,
t1ExpectedSpans+t2ExpectedSpans,
countSpans(model.CurrentEncoding, b1, b2))
b1Total := countSpans(model.CurrentEncoding, b1)
b2Total := countSpans(model.CurrentEncoding, b2)
total := countSpans(model.CurrentEncoding, b1, b2)

assert.Equal(t, t1ExpectedSpans, b1Total)
assert.Equal(t, t2ExpectedSpans, b2Total)
assert.Equal(t, t1ExpectedSpans+t2ExpectedSpans, total)
}

func encode(t *testing.T, tr *tempopb.Trace) []byte {
Expand Down
4 changes: 2 additions & 2 deletions tempodb/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,8 @@ func (rw *readerWriter) compact(blockMetas []*backend.BlockMeta, tenantID string
ObjectsWritten: func(compactionLevel, objs int) {
metricCompactionObjectsWritten.WithLabelValues(strconv.Itoa(compactionLevel)).Add(float64(objs))
},
SpansDiscarded: func(spans int) {
rw.compactorSharder.RecordDiscardedSpans(spans, tenantID)
SpansDiscarded: func(traceId string, spans int) {
rw.compactorSharder.RecordDiscardedSpans(spans, tenantID, traceId)
},
}

Expand Down
2 changes: 1 addition & 1 deletion tempodb/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (m *mockSharder) Combine(dataEncoding string, tenantID string, objs ...[]by
return model.StaticCombiner.Combine(dataEncoding, objs...)
}

func (m *mockSharder) RecordDiscardedSpans(count int, tenantID string) {}
func (m *mockSharder) RecordDiscardedSpans(count int, tenantID string, traceID string) {}

type mockJobSharder struct{}

Expand Down
2 changes: 1 addition & 1 deletion tempodb/encoding/common/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type CompactionOptions struct {
ObjectsCombined func(compactionLevel, objects int)
ObjectsWritten func(compactionLevel, objects int)
BytesWritten func(compactionLevel, bytes int)
SpansDiscarded func(spans int)
SpansDiscarded func(traceID string, spans int)
}

type Iterator interface {
Expand Down
18 changes: 14 additions & 4 deletions tempodb/encoding/vparquet/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/google/uuid"
tempoUtil "github.com/grafana/tempo/pkg/util"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/segmentio/parquet-go"
Expand Down Expand Up @@ -315,18 +316,27 @@ func estimateMarshalledSizeFromParquetRow(row parquet.Row) (size int) {
}

// countSpans counts the number of spans in the given trace in deconstructed
// parquet row format. It simply counts the number of values for span ID, which
// is always present.
func countSpans(schema *parquet.Schema, row parquet.Row) (spans int) {
// parquet row format and returns traceId.
// It simply counts the number of values for span ID, which is always present.
func countSpans(schema *parquet.Schema, row parquet.Row) (traceID string, spans int) {
traceIDColumn, found := schema.Lookup(TraceIDColumnName)
if !found {
return "", 0
}

spanID, found := schema.Lookup("rs", "ils", "Spans", "ID")
if !found {
return 0
return "", 0
}

for _, v := range row {
if v.Column() == spanID.ColumnIndex {
spans++
}

if v.Column() == traceIDColumn.ColumnIndex {
traceID = tempoUtil.TraceIDToHexString(v.ByteArray())
}
}

return
Expand Down
25 changes: 24 additions & 1 deletion tempodb/encoding/vparquet/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ package vparquet

import (
"context"
crand "crypto/rand"
"encoding/binary"
"math/rand"
"time"

"testing"

"github.com/go-kit/log"
"github.com/google/uuid"
tempoUtil "github.com/grafana/tempo/pkg/util"
"github.com/segmentio/parquet-go"

tempo_io "github.com/grafana/tempo/pkg/io"
Expand Down Expand Up @@ -97,7 +100,7 @@ func BenchmarkCompactorDupes(b *testing.B) {
FlushSizeBytes: 30_000_000,
MaxBytesPerTrace: 50_000_000,
ObjectsCombined: func(compactionLevel, objects int) {},
SpansDiscarded: func(spans int) {},
SpansDiscarded: func(traceID string, spans int) {},
})

_, err = c.Compact(ctx, l, r, func(*backend.BlockMeta, time.Time) backend.Writer { return w }, inputs)
Expand Down Expand Up @@ -142,3 +145,23 @@ func createTestBlock(t testing.TB, ctx context.Context, cfg *common.BlockConfig,
func TestValueAlloc(t *testing.T) {
_ = make([]parquet.Value, 1_000_000)
}

func TestCountSpans(t *testing.T) {
batchSize := rand.Intn(1000) + 1
spansEach := rand.Intn(1000) + 1

sch := parquet.SchemaOf(new(Trace))
traceID := make([]byte, 16)
_, err := crand.Read(traceID)
require.NoError(t, err)

// make Trace and convert to parquet.Row
tr := test.MakeTraceWithSpanCount(batchSize, spansEach, traceID)
trp := traceToParquet(traceID, tr, nil)
row := sch.Deconstruct(nil, trp)

// count spans for generated rows.
tID, spans := countSpans(sch, row)
require.Equal(t, tID, tempoUtil.TraceIDToHexString(traceID))
require.Equal(t, spans, batchSize*spansEach)
}
2 changes: 1 addition & 1 deletion tempodb/tempodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ type Compactor interface {
type CompactorSharder interface {
Combine(dataEncoding string, tenantID string, objs ...[]byte) ([]byte, bool, error)
Owns(hash string) bool
RecordDiscardedSpans(count int, tenantID string)
RecordDiscardedSpans(count int, tenantID string, traceID string)
}

type CompactorOverrides interface {
Expand Down

0 comments on commit df93a32

Please sign in to comment.