Skip to content

Commit

Permalink
block checksums XOR individual file header checksums
Browse files Browse the repository at this point in the history
Signed-off-by: Owen Diehl <[email protected]>
  • Loading branch information
owen-d committed Feb 2, 2024
1 parent 0cc60d7 commit 2abb381
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 33 deletions.
20 changes: 18 additions & 2 deletions pkg/storage/bloom/v1/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ func (b *Block) LoadHeaders() error {
return errors.Wrap(err, "getting index reader")
}

if err := b.index.DecodeHeaders(idx); err != nil {
indexChecksum, err := b.index.DecodeHeaders(idx)
if err != nil {
return errors.Wrap(err, "decoding index")
}

Expand All @@ -57,16 +58,31 @@ func (b *Block) LoadHeaders() error {
if err != nil {
return errors.Wrap(err, "getting blooms reader")
}
if err := b.blooms.DecodeHeaders(blooms); err != nil {
bloomChecksum, err := b.blooms.DecodeHeaders(blooms)
if err != nil {
return errors.Wrap(err, "decoding blooms")
}
b.initialized = true

if !b.metadata.Options.Schema.Compatible(b.blooms.schema) {
return fmt.Errorf(
"schema mismatch: index (%v) vs blooms (%v)",
b.metadata.Options.Schema, b.blooms.schema,
)
}

b.metadata.Checksum = combineChecksums(indexChecksum, bloomChecksum)
}
return nil

}

// XOR checksums as a simple checksum combiner with the benefit that
// each part can be recomputed by XORing the result against the other
func combineChecksums(index, blooms uint32) uint32 {
return index ^ blooms
}

// convenience method
func (b *Block) Querier() *BlockQuerier {
return NewBlockQuerier(b)
Expand Down
19 changes: 10 additions & 9 deletions pkg/storage/bloom/v1/bloom.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,9 @@ func NewBloomBlock(encoding chunkenc.Encoding) BloomBlock {
}
}

func (b *BloomBlock) DecodeHeaders(r io.ReadSeeker) error {
func (b *BloomBlock) DecodeHeaders(r io.ReadSeeker) (uint32, error) {
if err := b.schema.DecodeFrom(r); err != nil {
return errors.Wrap(err, "decoding schema")
return 0, errors.Wrap(err, "decoding schema")
}

var (
Expand All @@ -182,35 +182,36 @@ func (b *BloomBlock) DecodeHeaders(r io.ReadSeeker) error {
)
// last 12 bytes are (headers offset: 8 byte u64, checksum: 4 byte u32)
if _, err := r.Seek(-12, io.SeekEnd); err != nil {
return errors.Wrap(err, "seeking to bloom headers metadata")
return 0, errors.Wrap(err, "seeking to bloom headers metadata")
}
dec.B, err = io.ReadAll(r)
if err != nil {
return errors.Wrap(err, "reading bloom headers metadata")
return 0, errors.Wrap(err, "reading bloom headers metadata")
}

headerOffset := dec.Be64()
checksum := dec.Be32()

if _, err := r.Seek(int64(headerOffset), io.SeekStart); err != nil {
return errors.Wrap(err, "seeking to bloom headers")
return 0, errors.Wrap(err, "seeking to bloom headers")
}
dec.B, err = io.ReadAll(r)
if err != nil {
return errors.Wrap(err, "reading bloom page headers")
return 0, errors.Wrap(err, "reading bloom page headers")
}

if err := dec.CheckCrc(castagnoliTable); err != nil {
return errors.Wrap(err, "checksumming page headers")
return 0, errors.Wrap(err, "checksumming page headers")
}

b.pageHeaders = make([]BloomPageHeader, dec.Uvarint())
for i := 0; i < len(b.pageHeaders); i++ {
header := &b.pageHeaders[i]
if err := header.Decode(&dec); err != nil {
return errors.Wrapf(err, "decoding %dth series header", i)
return 0, errors.Wrapf(err, "decoding %dth series header", i)
}
}
return nil
return checksum, nil
}

func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int) (*BloomPageDecoder, error) {
Expand Down
26 changes: 14 additions & 12 deletions pkg/storage/bloom/v1/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,19 @@ func (b *BlockBuilder) BuildFrom(itr Iterator[SeriesWithBloom]) (uint32, error)
return 0, errors.Wrap(err, "iterating series with blooms")
}

checksum, err := b.blooms.Close()
return b.Close()
}

func (b *BlockBuilder) Close() (uint32, error) {
bloomChecksum, err := b.blooms.Close()
if err != nil {
return 0, errors.Wrap(err, "closing bloom file")
}
if err := b.index.Close(); err != nil {
indexCheckSum, err := b.index.Close()
if err != nil {
return 0, errors.Wrap(err, "closing series file")
}
return checksum, nil
return combineChecksums(indexCheckSum, bloomChecksum), nil
}

func (b *BlockBuilder) AddSeries(series SeriesWithBloom) error {
Expand Down Expand Up @@ -457,10 +462,10 @@ func (b *IndexBuilder) flushPage() error {
return nil
}

func (b *IndexBuilder) Close() error {
func (b *IndexBuilder) Close() (uint32, error) {
if b.page.Count() > 0 {
if err := b.flushPage(); err != nil {
return errors.Wrap(err, "flushing final series page")
return 0, errors.Wrap(err, "flushing final series page")
}
}

Expand All @@ -480,9 +485,9 @@ func (b *IndexBuilder) Close() error {
b.scratch.PutHash(crc32Hash)
_, err := b.writer.Write(b.scratch.Get())
if err != nil {
return errors.Wrap(err, "writing series page headers")
return 0, errors.Wrap(err, "writing series page headers")
}
return errors.Wrap(b.writer.Close(), "closing series writer")
return crc32Hash.Sum32(), errors.Wrap(b.writer.Close(), "closing series writer")
}

// Simplistic implementation of a merge builder that builds a single block
Expand Down Expand Up @@ -585,12 +590,9 @@ func (mb *MergeBuilder) Build(builder *BlockBuilder) (uint32, error) {
}
}

checksum, err := builder.blooms.Close()
checksum, err := builder.Close()
if err != nil {
return 0, errors.Wrap(err, "closing bloom file")
}
if err := builder.index.Close(); err != nil {
return 0, errors.Wrap(err, "closing series file")
return 0, errors.Wrap(err, "closing block")
}
return checksum, nil
}
2 changes: 1 addition & 1 deletion pkg/storage/bloom/v1/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) {

checksum, err := mb.Build(builder)
require.Nil(t, err)
require.Equal(t, uint32(0x2ec4fd6a), checksum)
require.Equal(t, uint32(0xe306ec6e), checksum)

// ensure the new block contains one copy of all the data
// by comparing it against an iterator over the source data
Expand Down
24 changes: 15 additions & 9 deletions pkg/storage/bloom/v1/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package v1

import (
"bytes"
"fmt"
"io"

"github.com/pkg/errors"
Expand All @@ -17,6 +18,10 @@ type Schema struct {
nGramLength, nGramSkip uint64
}

func (s Schema) String() string {
return fmt.Sprintf("v%d,encoding=%s,ngram=%d,skip=%d", s.version, s.encoding, s.nGramLength, s.nGramSkip)
}

func (s Schema) Compatible(other Schema) bool {
return s == other
}
Expand Down Expand Up @@ -94,9 +99,9 @@ type BlockIndex struct {
pageHeaders []SeriesPageHeaderWithOffset // headers for each series page
}

func (b *BlockIndex) DecodeHeaders(r io.ReadSeeker) error {
func (b *BlockIndex) DecodeHeaders(r io.ReadSeeker) (uint32, error) {
if err := b.opts.DecodeFrom(r); err != nil {
return errors.Wrap(err, "decoding block options")
return 0, errors.Wrap(err, "decoding block options")
}

var (
Expand All @@ -106,24 +111,25 @@ func (b *BlockIndex) DecodeHeaders(r io.ReadSeeker) error {

// last 12 bytes are (headers offset: 8 byte u64, checksum: 4 byte u32)
if _, err := r.Seek(-12, io.SeekEnd); err != nil {
return errors.Wrap(err, "seeking to bloom headers metadata")
return 0, errors.Wrap(err, "seeking to bloom headers metadata")
}
dec.B, err = io.ReadAll(r)
if err != nil {
return errors.Wrap(err, "reading bloom headers metadata")
return 0, errors.Wrap(err, "reading bloom headers metadata")
}

headerOffset := dec.Be64()
checksum := dec.Be32()
if _, err := r.Seek(int64(headerOffset), io.SeekStart); err != nil {
return errors.Wrap(err, "seeking to index headers")
return 0, errors.Wrap(err, "seeking to index headers")
}
dec.B, err = io.ReadAll(r)
if err != nil {
return errors.Wrap(err, "reading index page headers")
return 0, errors.Wrap(err, "reading index page headers")
}

if err := dec.CheckCrc(castagnoliTable); err != nil {
return errors.Wrap(err, "checksumming page headers")
return 0, errors.Wrap(err, "checksumming page headers")
}

b.pageHeaders = make(
Expand All @@ -134,12 +140,12 @@ func (b *BlockIndex) DecodeHeaders(r io.ReadSeeker) error {
for i := 0; i < len(b.pageHeaders); i++ {
var s SeriesPageHeaderWithOffset
if err := s.Decode(&dec); err != nil {
return errors.Wrapf(err, "decoding %dth series header", i)
return 0, errors.Wrapf(err, "decoding %dth series header", i)
}
b.pageHeaders[i] = s
}

return nil
return checksum, nil
}

// decompress page and return an iterator over the bytes
Expand Down

0 comments on commit 2abb381

Please sign in to comment.