Skip to content

Commit

Permalink
[rhythm] Make ID generator more robust (#4416) (#4507)
Browse files Browse the repository at this point in the history
* Make ID generator more robust

* Simplify
  • Loading branch information
mapno authored Jan 3, 2025
1 parent f04eeb3 commit 01828c3
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 36 deletions.
2 changes: 1 addition & 1 deletion modules/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func (b *BlockBuilder) consumePartitionSection(ctx context.Context, partition in
}(time.Now())

// TODO - Review what endTimestamp is used here
writer := newPartitionSectionWriter(b.logger, int64(partition), sectionEndTime.UnixMilli(), b.cfg.BlockConfig, b.overrides, b.wal, b.enc)
writer := newPartitionSectionWriter(b.logger, uint64(partition), uint64(sectionEndTime.UnixMilli()), b.cfg.BlockConfig, b.overrides, b.wal, b.enc)

// We always rewind the partition's offset to the commit offset by reassigning the partition to the client (this triggers partition assignment).
// This is so the cycle started exactly at the commit offset, and not at what was (potentially over-) consumed previously.
Expand Down
4 changes: 2 additions & 2 deletions modules/blockbuilder/partition_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type writer struct {
logger log.Logger

blockCfg BlockConfig
partition, cycleEndTs int64
partition, cycleEndTs uint64

overrides Overrides
wal *wal.WAL
Expand All @@ -36,7 +36,7 @@ type writer struct {
m map[string]*tenantStore
}

func newPartitionSectionWriter(logger log.Logger, partition, cycleEndTs int64, blockCfg BlockConfig, overrides Overrides, wal *wal.WAL, enc encoding.VersionedEncoding) *writer {
func newPartitionSectionWriter(logger log.Logger, partition, cycleEndTs uint64, blockCfg BlockConfig, overrides Overrides, wal *wal.WAL, enc encoding.VersionedEncoding) *writer {
return &writer{
logger: logger,
partition: partition,
Expand Down
2 changes: 1 addition & 1 deletion modules/blockbuilder/tenant_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type tenantStore struct {
walBlocks []common.WALBlock
}

func newTenantStore(tenantID string, partitionID, endTimestamp int64, cfg BlockConfig, logger log.Logger, wal *wal.WAL, enc encoding.VersionedEncoding, o Overrides) (*tenantStore, error) {
func newTenantStore(tenantID string, partitionID, endTimestamp uint64, cfg BlockConfig, logger log.Logger, wal *wal.WAL, enc encoding.VersionedEncoding, o Overrides) (*tenantStore, error) {
s := &tenantStore{
tenantID: tenantID,
idGenerator: util.NewDeterministicIDGenerator(tenantID, partitionID, endTimestamp),
Expand Down
55 changes: 26 additions & 29 deletions modules/blockbuilder/util/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,61 +3,58 @@ package util
import (
"crypto/sha1"
"encoding/binary"
"hash"

"github.com/google/uuid"
"github.com/grafana/tempo/tempodb/backend"
"go.uber.org/atomic"
)

var (
ns = uuid.MustParse("28840903-6eb5-4ffb-8880-93a4fa98dbcb") // Random UUID
hash = sha1.New()
const (
sha1Version5 = 5
)

var ns = uuid.MustParse("28840903-6eb5-4ffb-8880-93a4fa98dbcb") // Random UUID

type IDGenerator interface {
NewID() backend.UUID
}

var _ IDGenerator = (*DeterministicIDGenerator)(nil)

type DeterministicIDGenerator struct {
seeds []int64
seq *atomic.Int64
buf []byte
seq *atomic.Uint64
hash hash.Hash
}

func NewDeterministicIDGenerator(tenantID string, seeds ...int64) *DeterministicIDGenerator {
seeds = append(seeds, int64(binary.LittleEndian.Uint64(stringToBytes(tenantID))))
func NewDeterministicIDGenerator(tenantID string, seeds ...uint64) *DeterministicIDGenerator {
return &DeterministicIDGenerator{
seeds: seeds,
seq: atomic.NewInt64(0),
buf: newBuf([]byte(tenantID), seeds),
seq: atomic.NewUint64(0),
hash: sha1.New(),
}
}

func (d *DeterministicIDGenerator) NewID() backend.UUID {
seq := d.seq.Inc()
seeds := append(d.seeds, seq)
return backend.UUID(newDeterministicID(seeds))
}
func newBuf(tenantID []byte, seeds []uint64) []byte {
dl, sl := len(tenantID), len(seeds)
data := make([]byte, dl+sl*8+8) // tenantID bytes + 8 bytes per uint64 + 8 bytes for seq
copy(tenantID, data)

func newDeterministicID(seeds []int64) uuid.UUID {
b := int64ToBytes(seeds...)
for i, seed := range seeds {
binary.LittleEndian.PutUint64(data[dl+i*8:], seed)
}

return uuid.NewHash(hash, ns, b, 5)
return data
}

// TODO - Try to avoid allocs here
func stringToBytes(s string) []byte {
return []byte(s)
func (d *DeterministicIDGenerator) NewID() backend.UUID {
return backend.UUID(newDeterministicID(d.hash, d.buf, d.seq.Inc()))
}

func int64ToBytes(seeds ...int64) []byte {
l := len(seeds)
bytes := make([]byte, l*8)

// Use binary.LittleEndian or binary.BigEndian depending on your requirement
for i, seed := range seeds {
binary.LittleEndian.PutUint64(bytes[i*8:], uint64(seed))
}
func newDeterministicID(hash hash.Hash, data []byte, seq uint64) uuid.UUID {
// update last 8 bytes of data with seq
binary.LittleEndian.PutUint64(data[len(data)-8:], seq)

return bytes
return uuid.NewHash(hash, ns, data, sha1Version5)
}
25 changes: 22 additions & 3 deletions modules/blockbuilder/util/id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
func TestDeterministicIDGenerator(t *testing.T) {
ts := time.Now().UnixMilli()

gen := NewDeterministicIDGenerator(util.FakeTenantID, 0, ts)
gen := NewDeterministicIDGenerator(util.FakeTenantID, 0, uint64(ts))

firstPassIDs := make(map[backend.UUID]struct{})
for seq := int64(0); seq < 10; seq++ {
Expand All @@ -27,7 +27,7 @@ func TestDeterministicIDGenerator(t *testing.T) {
assert.NoError(t, err)
}

gen = NewDeterministicIDGenerator(util.FakeTenantID, 0, ts)
gen = NewDeterministicIDGenerator(util.FakeTenantID, 0, uint64(ts))
for seq := int64(0); seq < 10; seq++ {
id := gen.NewID()
if _, ok := firstPassIDs[id]; !ok {
Expand All @@ -36,9 +36,28 @@ func TestDeterministicIDGenerator(t *testing.T) {
}
}

func FuzzDeterministicIDGenerator(f *testing.F) {
f.Skip()

f.Add(util.FakeTenantID, uint64(42), uint64(100))
f.Fuzz(func(t *testing.T, tenantID string, seed1, seed2 uint64) {
gen := NewDeterministicIDGenerator(tenantID, seed1, seed2)

for i := 0; i < 3; i++ {
id := gen.NewID()
_, err := uuid.Parse(id.String())
if err != nil {
t.Fatalf("failed to parse UUID: %v", err)
}
}
})
}

func BenchmarkDeterministicID(b *testing.B) {
tenant := util.FakeTenantID
ts := time.Now().UnixMilli()
gen := NewDeterministicIDGenerator(util.FakeTenantID, ts)
partitionID := uint64(0)
gen := NewDeterministicIDGenerator(tenant, partitionID, uint64(ts))
for i := 0; i < b.N; i++ {
_ = gen.NewID()
}
Expand Down

0 comments on commit 01828c3

Please sign in to comment.