Skip to content

Commit

Permalink
Multiple fixes in block-builder (grafana#4364)
Browse files Browse the repository at this point in the history
  • Loading branch information
mapno committed Jan 10, 2025
1 parent e810eab commit d27b489
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 56 deletions.
4 changes: 2 additions & 2 deletions modules/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,8 @@ func (b *BlockBuilder) consumePartitionSection(ctx context.Context, partition in
"lag", lag.Lag,
)

// TODO - Review what ts is used here
writer := newPartitionSectionWriter(b.logger, sectionEndTime.UnixMilli(), b.cfg.blockConfig, b.overrides, b.wal, b.enc)
// TODO - Review what endTimestamp is used here
writer := newPartitionSectionWriter(b.logger, int64(partition), sectionEndTime.UnixMilli(), b.cfg.blockConfig, b.overrides, b.wal, b.enc)

// We always rewind the partition's offset to the commit offset by reassigning the partition to the client (this triggers partition assignment).
// This is so the cycle started exactly at the commit offset, and not at what was (potentially over-) consumed previously.
Expand Down
2 changes: 2 additions & 0 deletions modules/blockbuilder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/grafana/tempo/pkg/ingest"
"github.com/grafana/tempo/tempodb/encoding"
"github.com/grafana/tempo/tempodb/encoding/common"
)

Expand All @@ -23,6 +24,7 @@ func (c *BlockConfig) RegisterFlags(f *flag.FlagSet) {
func (c *BlockConfig) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) {
f.Uint64Var(&c.MaxBlockBytes, prefix+".max-block-bytes", 20*1024*1024, "Maximum size of a block.") // TODO - Review default

c.BlockCfg.Version = encoding.DefaultEncoding().Version()
c.BlockCfg.RegisterFlagsAndApplyDefaults(prefix, f)
}

Expand Down
18 changes: 12 additions & 6 deletions modules/blockbuilder/partition_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package blockbuilder
import (
"context"
"fmt"
"sync"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand All @@ -22,25 +23,27 @@ type partitionSectionWriter interface {
type writer struct {
logger log.Logger

blockCfg BlockConfig
cycleEndTs int64
blockCfg BlockConfig
partition, cycleEndTs int64

overrides Overrides
wal *wal.WAL
enc encoding.VersionedEncoding

// TODO - Lock
m map[string]*tenantStore
mtx sync.Mutex
m map[string]*tenantStore
}

func newPartitionSectionWriter(logger log.Logger, cycleEndTs int64, blockCfg BlockConfig, overrides Overrides, wal *wal.WAL, enc encoding.VersionedEncoding) *writer {
func newPartitionSectionWriter(logger log.Logger, partition, cycleEndTs int64, blockCfg BlockConfig, overrides Overrides, wal *wal.WAL, enc encoding.VersionedEncoding) *writer {
return &writer{
logger: logger,
partition: partition,
cycleEndTs: cycleEndTs,
blockCfg: blockCfg,
overrides: overrides,
wal: wal,
enc: enc,
mtx: sync.Mutex{},
m: make(map[string]*tenantStore),
}
}
Expand Down Expand Up @@ -84,11 +87,14 @@ func (p *writer) flush(ctx context.Context, store tempodb.Writer) error {
}

func (p *writer) instanceForTenant(tenant string) (*tenantStore, error) {
p.mtx.Lock()
defer p.mtx.Unlock()

if i, ok := p.m[tenant]; ok {
return i, nil
}

i, err := newTenantStore(tenant, p.cycleEndTs, p.blockCfg, p.logger, p.wal, p.enc, p.overrides)
i, err := newTenantStore(tenant, p.partition, p.cycleEndTs, p.blockCfg, p.logger, p.wal, p.enc, p.overrides)
if err != nil {
return nil, err
}
Expand Down
97 changes: 61 additions & 36 deletions modules/blockbuilder/tenant_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package blockbuilder

import (
"context"
"sync"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/google/uuid"
"github.com/grafana/tempo/modules/blockbuilder/util"
"github.com/grafana/tempo/pkg/model"
"github.com/grafana/tempo/pkg/tempopb"
Expand All @@ -28,53 +30,69 @@ var metricBlockBuilderFlushedBlocks = promauto.NewCounterVec(

// TODO - This needs locking
type tenantStore struct {
tenantID string
ts int64

cfg BlockConfig
logger log.Logger
tenantID string
idGenerator util.IDGenerator

cfg BlockConfig
logger log.Logger
overrides Overrides
wal *wal.WAL
headBlock common.WALBlock
walBlocks []common.WALBlock
enc encoding.VersionedEncoding

wal *wal.WAL

headBlockMtx sync.Mutex
headBlock common.WALBlock

blocksMtx sync.Mutex
walBlocks []common.WALBlock
}

func newTenantStore(tenantID string, ts int64, cfg BlockConfig, logger log.Logger, wal *wal.WAL, enc encoding.VersionedEncoding, o Overrides) (*tenantStore, error) {
func newTenantStore(tenantID string, partitionID, endTimestamp int64, cfg BlockConfig, logger log.Logger, wal *wal.WAL, enc encoding.VersionedEncoding, o Overrides) (*tenantStore, error) {
s := &tenantStore{
tenantID: tenantID,
ts: ts,
cfg: cfg,
logger: logger,
overrides: o,
wal: wal,
enc: enc,
tenantID: tenantID,
idGenerator: util.NewDeterministicIDGenerator(partitionID, endTimestamp),
cfg: cfg,
logger: logger,
overrides: o,
wal: wal,
headBlockMtx: sync.Mutex{},
blocksMtx: sync.Mutex{},
enc: enc,
}

return s, s.resetHeadBlock()
}

func (s *tenantStore) cutHeadBlock() error {
// Flush the current head block if it exists
if s.headBlock != nil {
if err := s.headBlock.Flush(); err != nil {
return err
}
s.walBlocks = append(s.walBlocks, s.headBlock)
s.headBlock = nil
// TODO - periodically flush
func (s *tenantStore) cutHeadBlock(immediate bool) error {
s.headBlockMtx.Lock()
defer s.headBlockMtx.Unlock()

dataLen := s.headBlock.DataLength()

if s.headBlock == nil || dataLen == 0 {
return nil
}

return nil
}
if !immediate && dataLen < s.cfg.MaxBlockBytes {
return nil
}

s.blocksMtx.Lock()
defer s.blocksMtx.Unlock()

func (s *tenantStore) newUUID() backend.UUID {
return backend.UUID(util.NewDeterministicID(s.ts, int64(len(s.walBlocks))))
if err := s.headBlock.Flush(); err != nil {
return err
}
s.walBlocks = append(s.walBlocks, s.headBlock)
s.headBlock = nil

return s.resetHeadBlock()
}

func (s *tenantStore) resetHeadBlock() error {
meta := &backend.BlockMeta{
BlockID: s.newUUID(),
BlockID: s.idGenerator.NewID(),
TenantID: s.tenantID,
DedicatedColumns: s.overrides.DedicatedColumns(s.tenantID),
ReplicationFactor: backend.MetricsGeneratorReplicationFactor,
Expand All @@ -88,11 +106,9 @@ func (s *tenantStore) resetHeadBlock() error {
}

func (s *tenantStore) AppendTrace(traceID []byte, tr *tempopb.Trace, start, end uint32) error {
// TODO - Do this async? This slows down consumption, but we need to be precise
if s.headBlock.DataLength() > s.cfg.MaxBlockBytes {
if err := s.resetHeadBlock(); err != nil {
return err
}
// TODO - Do this async, it slows down consumption
if err := s.cutHeadBlock(false); err != nil {
return err
}

return s.headBlock.AppendTrace(traceID, tr, start, end)
Expand All @@ -102,9 +118,13 @@ func (s *tenantStore) Flush(ctx context.Context, store tempodb.Writer) error {
// TODO - Advance some of this work if possible

// Cut head block
if err := s.cutHeadBlock(); err != nil {
if err := s.cutHeadBlock(true); err != nil {
return err
}

s.blocksMtx.Lock()
defer s.blocksMtx.Unlock()

completeBlocks := make([]tempodb.WriteableBlock, 0, len(s.walBlocks))
// Write all blocks
for _, block := range s.walBlocks {
Expand All @@ -126,9 +146,14 @@ func (s *tenantStore) Flush(ctx context.Context, store tempodb.Writer) error {
}

// Clear the blocks
for _, block := range s.walBlocks {
if err := s.wal.LocalBackend().ClearBlock((uuid.UUID)(block.BlockMeta().BlockID), s.tenantID); err != nil {
return err
}
}
s.walBlocks = s.walBlocks[:0]

return s.resetHeadBlock()
return nil
}

func (s *tenantStore) buildWriteableBlock(ctx context.Context, b common.WALBlock) (tempodb.WriteableBlock, error) {
Expand Down
41 changes: 34 additions & 7 deletions modules/blockbuilder/util/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,53 @@ import (
"encoding/binary"

"github.com/google/uuid"
"github.com/grafana/tempo/tempodb/backend"
"go.uber.org/atomic"
)

var (
ns = uuid.MustParse("28840903-6eb5-4ffb-8880-93a4fa98dbcb") // Random UUID
hash = sha1.New()
)

func NewDeterministicID(ts, seq int64) uuid.UUID {
b := int64ToBytes(ts, seq)
type IDGenerator interface {
NewID() backend.UUID
}

var _ IDGenerator = (*DeterministicIDGenerator)(nil)

type DeterministicIDGenerator struct {
seeds []int64
seq *atomic.Int64
}

func NewDeterministicIDGenerator(seeds ...int64) *DeterministicIDGenerator {
return &DeterministicIDGenerator{
seeds: seeds,
seq: atomic.NewInt64(0),
}
}

func (d *DeterministicIDGenerator) NewID() backend.UUID {
seq := d.seq.Inc()
seeds := append(d.seeds, seq)
return backend.UUID(newDeterministicID(seeds))
}

func newDeterministicID(seeds []int64) uuid.UUID {
b := int64ToBytes(seeds...)

return uuid.NewHash(hash, ns, b, 5)
}

func int64ToBytes(val1, val2 int64) []byte {
// 16 bytes = 8 bytes (int64) + 8 bytes (int64)
bytes := make([]byte, 16)
func int64ToBytes(seeds ...int64) []byte {
l := len(seeds)
bytes := make([]byte, l*8)

// Use binary.LittleEndian or binary.BigEndian depending on your requirement
binary.LittleEndian.PutUint64(bytes[0:8], uint64(val1))
binary.LittleEndian.PutUint64(bytes[8:16], uint64(val2))
for i, seed := range seeds {
binary.LittleEndian.PutUint64(bytes[i*8:], uint64(seed))
}

return bytes
}
15 changes: 10 additions & 5 deletions modules/blockbuilder/util/id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,23 @@ import (
"time"

"github.com/google/uuid"
"github.com/grafana/tempo/tempodb/backend"
)

func TestDeterministicID(t *testing.T) {
func TestDeterministicIDGenerator(t *testing.T) {
ts := time.Now().UnixMilli()

firstPassIDs := make(map[uuid.UUID]struct{})
gen := NewDeterministicIDGenerator(ts)

firstPassIDs := make(map[backend.UUID]struct{})
for seq := int64(0); seq < 10; seq++ {
id := NewDeterministicID(ts, seq)
id := gen.NewID()
firstPassIDs[id] = struct{}{}
}

gen = NewDeterministicIDGenerator(ts)
for seq := int64(0); seq < 10; seq++ {
id := NewDeterministicID(ts, seq)
id := gen.NewID()
if _, ok := firstPassIDs[id]; !ok {
t.Errorf("ID %s not found in first pass IDs", id)
}
Expand All @@ -26,8 +30,9 @@ func TestDeterministicID(t *testing.T) {

func BenchmarkDeterministicID(b *testing.B) {
ts := time.Now().UnixMilli()
gen := NewDeterministicIDGenerator(ts)
for i := 0; i < b.N; i++ {
_ = NewDeterministicID(ts, int64(i))
_ = gen.NewID()
}
}

Expand Down

0 comments on commit d27b489

Please sign in to comment.