Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[rhythm] Make ID generator more robust #4416

Merged
merged 7 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion modules/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func (b *BlockBuilder) consumePartitionSection(ctx context.Context, partition in
}(time.Now())

// TODO - Review what endTimestamp is used here
writer := newPartitionSectionWriter(b.logger, int64(partition), sectionEndTime.UnixMilli(), b.cfg.BlockConfig, b.overrides, b.wal, b.enc)
writer := newPartitionSectionWriter(b.logger, uint64(partition), uint64(sectionEndTime.UnixMilli()), b.cfg.BlockConfig, b.overrides, b.wal, b.enc)

// We always rewind the partition's offset to the commit offset by reassigning the partition to the client (this triggers partition assignment).
// This is so the cycle started exactly at the commit offset, and not at what was (potentially over-) consumed previously.
Expand Down
4 changes: 2 additions & 2 deletions modules/blockbuilder/partition_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type writer struct {
logger log.Logger

blockCfg BlockConfig
partition, cycleEndTs int64
partition, cycleEndTs uint64

overrides Overrides
wal *wal.WAL
Expand All @@ -36,7 +36,7 @@ type writer struct {
m map[string]*tenantStore
}

func newPartitionSectionWriter(logger log.Logger, partition, cycleEndTs int64, blockCfg BlockConfig, overrides Overrides, wal *wal.WAL, enc encoding.VersionedEncoding) *writer {
func newPartitionSectionWriter(logger log.Logger, partition, cycleEndTs uint64, blockCfg BlockConfig, overrides Overrides, wal *wal.WAL, enc encoding.VersionedEncoding) *writer {
return &writer{
logger: logger,
partition: partition,
Expand Down
2 changes: 1 addition & 1 deletion modules/blockbuilder/tenant_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type tenantStore struct {
walBlocks []common.WALBlock
}

func newTenantStore(tenantID string, partitionID, endTimestamp int64, cfg BlockConfig, logger log.Logger, wal *wal.WAL, enc encoding.VersionedEncoding, o Overrides) (*tenantStore, error) {
func newTenantStore(tenantID string, partitionID, endTimestamp uint64, cfg BlockConfig, logger log.Logger, wal *wal.WAL, enc encoding.VersionedEncoding, o Overrides) (*tenantStore, error) {
s := &tenantStore{
tenantID: tenantID,
idGenerator: util.NewDeterministicIDGenerator(tenantID, partitionID, endTimestamp),
Expand Down
55 changes: 26 additions & 29 deletions modules/blockbuilder/util/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,61 +3,58 @@ package util
import (
"crypto/sha1"
"encoding/binary"
"hash"

"github.com/google/uuid"
"github.com/grafana/tempo/tempodb/backend"
"go.uber.org/atomic"
)

var (
ns = uuid.MustParse("28840903-6eb5-4ffb-8880-93a4fa98dbcb") // Random UUID
hash = sha1.New()
const (
sha1Version5 = 5
)

var ns = uuid.MustParse("28840903-6eb5-4ffb-8880-93a4fa98dbcb") // Random UUID

type IDGenerator interface {
NewID() backend.UUID
}

var _ IDGenerator = (*DeterministicIDGenerator)(nil)

type DeterministicIDGenerator struct {
seeds []int64
seq *atomic.Int64
buf []byte
seq *atomic.Uint64
hash hash.Hash
}

func NewDeterministicIDGenerator(tenantID string, seeds ...int64) *DeterministicIDGenerator {
seeds = append(seeds, int64(binary.LittleEndian.Uint64(stringToBytes(tenantID))))
func NewDeterministicIDGenerator(tenantID string, seeds ...uint64) *DeterministicIDGenerator {
return &DeterministicIDGenerator{
seeds: seeds,
seq: atomic.NewInt64(0),
buf: newBuf([]byte(tenantID), seeds),
seq: atomic.NewUint64(0),
hash: sha1.New(),
}
}

func (d *DeterministicIDGenerator) NewID() backend.UUID {
seq := d.seq.Inc()
seeds := append(d.seeds, seq)
return backend.UUID(newDeterministicID(seeds))
}
func newBuf(tenantID []byte, seeds []uint64) []byte {
dl, sl := len(tenantID), len(seeds)
data := make([]byte, dl+sl*8+8) // tenantID bytes + 8 bytes per uint64 + 8 bytes for seq
copy(tenantID, data)

func newDeterministicID(seeds []int64) uuid.UUID {
b := int64ToBytes(seeds...)
for i, seed := range seeds {
binary.LittleEndian.PutUint64(data[dl+i*8:], seed)
}

return uuid.NewHash(hash, ns, b, 5)
return data
}

// TODO - Try to avoid allocs here
func stringToBytes(s string) []byte {
return []byte(s)
func (d *DeterministicIDGenerator) NewID() backend.UUID {
return backend.UUID(newDeterministicID(d.hash, d.buf, d.seq.Inc()))
}

func int64ToBytes(seeds ...int64) []byte {
l := len(seeds)
bytes := make([]byte, l*8)

// Use binary.LittleEndian or binary.BigEndian depending on your requirement
for i, seed := range seeds {
binary.LittleEndian.PutUint64(bytes[i*8:], uint64(seed))
}
func newDeterministicID(hash hash.Hash, data []byte, seq uint64) uuid.UUID {
// update last 8 bytes of data with seq
binary.LittleEndian.PutUint64(data[len(data)-8:], seq)

return bytes
return uuid.NewHash(hash, ns, data, sha1Version5)
}
25 changes: 22 additions & 3 deletions modules/blockbuilder/util/id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
func TestDeterministicIDGenerator(t *testing.T) {
ts := time.Now().UnixMilli()

gen := NewDeterministicIDGenerator(util.FakeTenantID, 0, ts)
gen := NewDeterministicIDGenerator(util.FakeTenantID, 0, uint64(ts))

firstPassIDs := make(map[backend.UUID]struct{})
for seq := int64(0); seq < 10; seq++ {
Expand All @@ -27,7 +27,7 @@ func TestDeterministicIDGenerator(t *testing.T) {
assert.NoError(t, err)
}

gen = NewDeterministicIDGenerator(util.FakeTenantID, 0, ts)
gen = NewDeterministicIDGenerator(util.FakeTenantID, 0, uint64(ts))
for seq := int64(0); seq < 10; seq++ {
id := gen.NewID()
if _, ok := firstPassIDs[id]; !ok {
Expand All @@ -36,9 +36,28 @@ func TestDeterministicIDGenerator(t *testing.T) {
}
}

func FuzzDeterministicIDGenerator(f *testing.F) {
f.Skip()

f.Add(util.FakeTenantID, uint64(42), uint64(100))
f.Fuzz(func(t *testing.T, tenantID string, seed1, seed2 uint64) {
gen := NewDeterministicIDGenerator(tenantID, seed1, seed2)

for i := 0; i < 3; i++ {
id := gen.NewID()
_, err := uuid.Parse(id.String())
if err != nil {
t.Fatalf("failed to parse UUID: %v", err)
}
}
})
}

func BenchmarkDeterministicID(b *testing.B) {
tenant := util.FakeTenantID
ts := time.Now().UnixMilli()
gen := NewDeterministicIDGenerator(util.FakeTenantID, ts)
partitionID := uint64(0)
gen := NewDeterministicIDGenerator(tenant, partitionID, uint64(ts))
for i := 0; i < b.N; i++ {
_ = gen.NewID()
}
Expand Down
Loading