Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update blockbuilder to periodically flush wals and sort traces #4550

Merged
merged 6 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 53 additions & 5 deletions modules/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,29 @@ const (
blockBuilderServiceName = "block-builder"
ConsumerGroup = "block-builder"
pollTimeout = 2 * time.Second
cutTime = 10 * time.Second
)

var (
metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "tempo",
Subsystem: "block_builder",
Name: "fetch_duration_seconds",
Help: "Time spent fetching from Kafka.",
NativeHistogramBucketFactor: 1.1,
}, []string{"partition"})
metricFetchBytesTotal = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "tempo",
Subsystem: "block_builder",
Name: "fetch_bytes_total",
Help: "Total number of bytes fetched from Kafka",
}, []string{"partition"})
metricFetchRecordsTotal = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "tempo",
Subsystem: "block_builder",
Name: "fetch_records_total",
Help: "Total number of records fetched from Kafka",
}, []string{"partition"})
mapno marked this conversation as resolved.
Show resolved Hide resolved
metricPartitionLag = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "tempo",
Subsystem: "block_builder",
Expand Down Expand Up @@ -180,6 +200,12 @@ func (b *BlockBuilder) consume(ctx context.Context) error {
level.Info(b.logger).Log("msg", "starting consume cycle", "cycle_end", end, "active_partitions", partitions)
defer func(t time.Time) { metricConsumeCycleDuration.Observe(time.Since(t).Seconds()) }(time.Now())

// Clear all previous remnants
err := b.wal.Clear()
if err != nil {
return err
}

for _, partition := range partitions {
// Consume partition while data remains.
// TODO - round-robin one consumption per partition instead to equalize catch-up time.
Expand Down Expand Up @@ -207,10 +233,12 @@ func (b *BlockBuilder) consumePartition(ctx context.Context, partition int32, ov
dur = b.cfg.ConsumeCycleDuration
topic = b.cfg.IngestStorageConfig.Kafka.Topic
group = b.cfg.IngestStorageConfig.Kafka.ConsumerGroup
partLabel = strconv.Itoa(int(partition))
startOffset kgo.Offset
init bool
writer *writer
lastRec *kgo.Record
nextCut time.Time
end time.Time
)

Expand Down Expand Up @@ -247,6 +275,7 @@ func (b *BlockBuilder) consumePartition(ctx context.Context, partition int32, ov
outer:
for {
fetches := func() kgo.Fetches {
defer func(t time.Time) { metricFetchDuration.WithLabelValues(partLabel).Observe(time.Since(t).Seconds()) }(time.Now())
ctx2, cancel := context.WithTimeout(ctx, pollTimeout)
defer cancel()
return b.kafkaClient.PollFetches(ctx2)
Expand All @@ -257,7 +286,7 @@ outer:
// No more data
break
}
metricFetchErrors.WithLabelValues(strconv.Itoa(int(partition))).Inc()
metricFetchErrors.WithLabelValues(partLabel).Inc()
return false, err
}

Expand All @@ -267,19 +296,23 @@ outer:

for iter := fetches.RecordIter(); !iter.Done(); {
rec := iter.Next()
metricFetchBytesTotal.WithLabelValues(partLabel).Add(float64(len(rec.Value)))
metricFetchRecordsTotal.WithLabelValues(partLabel).Inc()

level.Debug(b.logger).Log(
"msg", "processing record",
"partition", rec.Partition,
"offset", rec.Offset,
"timestamp", rec.Timestamp,
"len", len(rec.Value),
)

// Initialize on first record
if !init {
end = rec.Timestamp.Add(dur) // When block will be cut
metricPartitionLagSeconds.WithLabelValues(strconv.Itoa(int(partition))).Set(time.Since(rec.Timestamp).Seconds())
metricPartitionLagSeconds.WithLabelValues(partLabel).Set(time.Since(rec.Timestamp).Seconds())
writer = newPartitionSectionWriter(b.logger, uint64(partition), uint64(rec.Offset), b.cfg.BlockConfig, b.overrides, b.wal, b.enc)
nextCut = rec.Timestamp.Add(cutTime)
init = true
}

Expand All @@ -295,7 +328,16 @@ outer:
break outer
}

err := b.pushTraces(rec.Key, rec.Value, writer)
if rec.Timestamp.After(nextCut) {
// Cut before appending this trace
err = writer.cutidle(rec.Timestamp.Add(-cutTime), false)
if err != nil {
return false, err
}
nextCut = rec.Timestamp.Add(cutTime)
}

err := b.pushTraces(rec.Timestamp, rec.Key, rec.Value, writer)
if err != nil {
return false, err
}
Expand All @@ -313,6 +355,12 @@ outer:
return false, nil
}

// Cut any remaining
err = writer.cutidle(time.Time{}, true)
if err != nil {
return false, err
}

err = writer.flush(ctx, b.writer)
if err != nil {
return false, err
Expand Down Expand Up @@ -370,14 +418,14 @@ func (b *BlockBuilder) stopping(err error) error {
return err
}

func (b *BlockBuilder) pushTraces(tenantBytes, reqBytes []byte, p partitionSectionWriter) error {
func (b *BlockBuilder) pushTraces(ts time.Time, tenantBytes, reqBytes []byte, p partitionSectionWriter) error {
req, err := b.decoder.Decode(reqBytes)
if err != nil {
return fmt.Errorf("failed to decode trace: %w", err)
}
defer b.decoder.Reset()

return p.pushBytes(string(tenantBytes), req)
return p.pushBytes(ts, string(tenantBytes), req)
}

func (b *BlockBuilder) getAssignedActivePartitions() []int32 {
Expand Down
51 changes: 46 additions & 5 deletions modules/blockbuilder/blockbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func TestBlockbuilder_committingFails(t *testing.T) {
requireLastCommitEquals(t, ctx, client, producedRecords[len(producedRecords)-1].Offset+1)
}

func blockbuilderConfig(t *testing.T, address string) Config {
func blockbuilderConfig(t testing.TB, address string) Config {
cfg := Config{}
flagext.DefaultValues(&cfg)

Expand All @@ -327,7 +327,7 @@ type ownEverythingSharder struct{}

func (o *ownEverythingSharder) Owns(string) bool { return true }

func newStore(ctx context.Context, t *testing.T) storage.Store {
func newStore(ctx context.Context, t testing.TB) storage.Store {
tmpDir := t.TempDir()
s, err := storage.NewStore(storage.Config{
Trace: tempodb.Config{
Expand Down Expand Up @@ -404,7 +404,7 @@ type mockOverrides struct {

func (m *mockOverrides) DedicatedColumns(_ string) backend.DedicatedColumns { return m.dc }

func newKafkaClient(t *testing.T, config ingest.KafkaConfig) *kgo.Client {
func newKafkaClient(t testing.TB, config ingest.KafkaConfig) *kgo.Client {
writeClient, err := kgo.NewClient(
kgo.SeedBrokers(config.Address),
kgo.AllowAutoTopicCreation(),
Expand All @@ -427,7 +427,7 @@ func countFlushedTraces(store storage.Store) int {
}

// nolint: revive
func sendReq(t *testing.T, ctx context.Context, client *kgo.Client) []*kgo.Record {
func sendReq(t testing.TB, ctx context.Context, client *kgo.Client) []*kgo.Record {
traceID := generateTraceID(t)

req := test.MakePushBytesRequest(t, 10, traceID)
Expand Down Expand Up @@ -463,7 +463,7 @@ func sendTracesFor(t *testing.T, ctx context.Context, client *kgo.Client, dur, i
}
}

func generateTraceID(t *testing.T) []byte {
func generateTraceID(t testing.TB) []byte {
traceID := make([]byte, 16)
_, err := rand.Read(traceID)
require.NoError(t, err)
Expand All @@ -478,3 +478,44 @@ func requireLastCommitEquals(t testing.TB, ctx context.Context, client *kgo.Clie
require.True(t, ok)
require.Equal(t, expectedOffset, offset.At)
}

func BenchmarkBlockBuilder(b *testing.B) {
var (
ctx = context.Background()
_, address = testkafka.CreateCluster(b, 1, testTopic)
store = newStore(ctx, b)
cfg = blockbuilderConfig(b, address)
client = newKafkaClient(b, cfg.IngestStorageConfig.Kafka)
)

cfg.ConsumeCycleDuration = 1 * time.Hour

b.ResetTimer()

for i := 0; i < b.N; i++ {

var records []*kgo.Record

for i := 0; i < 1000; i++ {
records = append(records, sendReq(b, ctx, client)...)
}

var size int
for _, r := range records {
size += len(r.Value)
}

b.ResetTimer()

bb := New(cfg, test.NewTestingLogger(b), newPartitionRingReader(), &mockOverrides{}, store)

// Startup (without starting the background consume cycle)
err := bb.starting(ctx)
require.NoError(b, err)

err = bb.consume(ctx)
require.NoError(b, err)

b.SetBytes(int64(size))
}
}
28 changes: 10 additions & 18 deletions modules/blockbuilder/partition_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
)

type partitionSectionWriter interface {
pushBytes(tenant string, req *tempopb.PushBytesRequest) error
pushBytes(ts time.Time, tenant string, req *tempopb.PushBytesRequest) error
flush(ctx context.Context, store tempodb.Writer) error
}

Expand Down Expand Up @@ -50,7 +50,7 @@ func newPartitionSectionWriter(logger log.Logger, partition, cycleEndTs uint64,
}
}

func (p *writer) pushBytes(tenant string, req *tempopb.PushBytesRequest) error {
func (p *writer) pushBytes(ts time.Time, tenant string, req *tempopb.PushBytesRequest) error {
level.Debug(p.logger).Log(
"msg", "pushing bytes",
"tenant", tenant,
Expand All @@ -69,28 +69,20 @@ func (p *writer) pushBytes(tenant string, req *tempopb.PushBytesRequest) error {
return fmt.Errorf("failed to unmarshal trace: %w", err)
}

var start, end uint64
for _, b := range tr.ResourceSpans {
for _, ss := range b.ScopeSpans {
for _, s := range ss.Spans {
if start == 0 || s.StartTimeUnixNano < start {
start = s.StartTimeUnixNano
}
if s.EndTimeUnixNano > end {
end = s.EndTimeUnixNano
}
}
}
if err := i.AppendTrace(req.Ids[j], tr, ts); err != nil {
return err
}
}

startSeconds := uint32(start / uint64(time.Second))
endSeconds := uint32(end / uint64(time.Second))
return nil
}

if err := i.AppendTrace(req.Ids[j], tr, startSeconds, endSeconds); err != nil {
func (p *writer) cutidle(since time.Time, immediate bool) error {
for _, i := range p.m {
if err := i.CutIdle(since, immediate); err != nil {
return err
}
}

return nil
}

Expand Down
Loading