diff --git a/pkg/storage/bloom/v1/block.go b/pkg/storage/bloom/v1/block.go index 1167a98d8d6d6..6d13349855f40 100644 --- a/pkg/storage/bloom/v1/block.go +++ b/pkg/storage/bloom/v1/block.go @@ -40,7 +40,8 @@ func (b *Block) LoadHeaders() error { return errors.Wrap(err, "getting index reader") } - if err := b.index.DecodeHeaders(idx); err != nil { + indexChecksum, err := b.index.DecodeHeaders(idx) + if err != nil { return errors.Wrap(err, "decoding index") } @@ -57,16 +58,31 @@ func (b *Block) LoadHeaders() error { if err != nil { return errors.Wrap(err, "getting blooms reader") } - if err := b.blooms.DecodeHeaders(blooms); err != nil { + bloomChecksum, err := b.blooms.DecodeHeaders(blooms) + if err != nil { return errors.Wrap(err, "decoding blooms") } b.initialized = true + if !b.metadata.Options.Schema.Compatible(b.blooms.schema) { + return fmt.Errorf( + "schema mismatch: index (%v) vs blooms (%v)", + b.metadata.Options.Schema, b.blooms.schema, + ) + } + + b.metadata.Checksum = combineChecksums(indexChecksum, bloomChecksum) } return nil } +// XOR checksums as a simple checksum combiner with the benefit that +// each part can be recomputed by XORing the result against the other +func combineChecksums(index, blooms uint32) uint32 { + return index ^ blooms +} + // convenience method func (b *Block) Querier() *BlockQuerier { return NewBlockQuerier(b) diff --git a/pkg/storage/bloom/v1/bloom.go b/pkg/storage/bloom/v1/bloom.go index be95f96862eac..20c310ef695c0 100644 --- a/pkg/storage/bloom/v1/bloom.go +++ b/pkg/storage/bloom/v1/bloom.go @@ -171,9 +171,9 @@ func NewBloomBlock(encoding chunkenc.Encoding) BloomBlock { } } -func (b *BloomBlock) DecodeHeaders(r io.ReadSeeker) error { +func (b *BloomBlock) DecodeHeaders(r io.ReadSeeker) (uint32, error) { if err := b.schema.DecodeFrom(r); err != nil { - return errors.Wrap(err, "decoding schema") + return 0, errors.Wrap(err, "decoding schema") } var ( @@ -182,35 +182,36 @@ func (b *BloomBlock) DecodeHeaders(r io.ReadSeeker) error { ) // last 12 bytes are (headers offset: 8 byte u64, checksum: 4 byte u32) if _, err := r.Seek(-12, io.SeekEnd); err != nil { - return errors.Wrap(err, "seeking to bloom headers metadata") + return 0, errors.Wrap(err, "seeking to bloom headers metadata") } dec.B, err = io.ReadAll(r) if err != nil { - return errors.Wrap(err, "reading bloom headers metadata") + return 0, errors.Wrap(err, "reading bloom headers metadata") } headerOffset := dec.Be64() + checksum := dec.Be32() if _, err := r.Seek(int64(headerOffset), io.SeekStart); err != nil { - return errors.Wrap(err, "seeking to bloom headers") + return 0, errors.Wrap(err, "seeking to bloom headers") } dec.B, err = io.ReadAll(r) if err != nil { - return errors.Wrap(err, "reading bloom page headers") + return 0, errors.Wrap(err, "reading bloom page headers") } if err := dec.CheckCrc(castagnoliTable); err != nil { - return errors.Wrap(err, "checksumming page headers") + return 0, errors.Wrap(err, "checksumming page headers") } b.pageHeaders = make([]BloomPageHeader, dec.Uvarint()) for i := 0; i < len(b.pageHeaders); i++ { header := &b.pageHeaders[i] if err := header.Decode(&dec); err != nil { - return errors.Wrapf(err, "decoding %dth series header", i) + return 0, errors.Wrapf(err, "decoding %dth series header", i) } } - return nil + return checksum, nil } func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int) (*BloomPageDecoder, error) { diff --git a/pkg/storage/bloom/v1/builder.go b/pkg/storage/bloom/v1/builder.go index d4a35af9ff906..ac7a83baad374 100644 --- a/pkg/storage/bloom/v1/builder.go +++ b/pkg/storage/bloom/v1/builder.go @@ -120,14 +120,19 @@ func (b *BlockBuilder) BuildFrom(itr Iterator[SeriesWithBloom]) (uint32, error) return 0, errors.Wrap(err, "iterating series with blooms") } - checksum, err := b.blooms.Close() + return b.Close() +} + +func (b *BlockBuilder) Close() (uint32, error) { + bloomChecksum, err := b.blooms.Close() if err != nil { return 0, errors.Wrap(err, "closing bloom file") } - if err := b.index.Close(); err != nil { + indexCheckSum, err := b.index.Close() + if err != nil { return 0, errors.Wrap(err, "closing series file") } - return checksum, nil + return combineChecksums(indexCheckSum, bloomChecksum), nil } func (b *BlockBuilder) AddSeries(series SeriesWithBloom) error { @@ -457,10 +462,10 @@ func (b *IndexBuilder) flushPage() error { return nil } -func (b *IndexBuilder) Close() error { +func (b *IndexBuilder) Close() (uint32, error) { if b.page.Count() > 0 { if err := b.flushPage(); err != nil { - return errors.Wrap(err, "flushing final series page") + return 0, errors.Wrap(err, "flushing final series page") } } @@ -480,9 +485,9 @@ func (b *IndexBuilder) Close() error { b.scratch.PutHash(crc32Hash) _, err := b.writer.Write(b.scratch.Get()) if err != nil { - return errors.Wrap(err, "writing series page headers") + return 0, errors.Wrap(err, "writing series page headers") } - return errors.Wrap(b.writer.Close(), "closing series writer") + return crc32Hash.Sum32(), errors.Wrap(b.writer.Close(), "closing series writer") } // Simplistic implementation of a merge builder that builds a single block @@ -585,12 +590,9 @@ func (mb *MergeBuilder) Build(builder *BlockBuilder) (uint32, error) { } } - checksum, err := builder.blooms.Close() + checksum, err := builder.Close() if err != nil { - return 0, errors.Wrap(err, "closing bloom file") - } - if err := builder.index.Close(); err != nil { - return 0, errors.Wrap(err, "closing series file") + return 0, errors.Wrap(err, "closing block") } return checksum, nil } diff --git a/pkg/storage/bloom/v1/builder_test.go b/pkg/storage/bloom/v1/builder_test.go index 9c2e0df338d25..6bf2c26e7b585 100644 --- a/pkg/storage/bloom/v1/builder_test.go +++ b/pkg/storage/bloom/v1/builder_test.go @@ -358,7 +358,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) { checksum, err := mb.Build(builder) require.Nil(t, err) - require.Equal(t, uint32(0x2ec4fd6a), checksum) + require.Equal(t, uint32(0xe306ec6e), checksum) // ensure the new block contains one copy of all the data // by comparing it against an iterator over the source data diff --git a/pkg/storage/bloom/v1/index.go b/pkg/storage/bloom/v1/index.go index e6c045de80d93..e3a14dc5453ea 100644 --- a/pkg/storage/bloom/v1/index.go +++ b/pkg/storage/bloom/v1/index.go @@ -2,6 +2,7 @@ package v1 import ( "bytes" + "fmt" "io" "github.com/pkg/errors" @@ -17,6 +18,10 @@ type Schema struct { nGramLength, nGramSkip uint64 } +func (s Schema) String() string { + return fmt.Sprintf("v%d,encoding=%s,ngram=%d,skip=%d", s.version, s.encoding, s.nGramLength, s.nGramSkip) +} + func (s Schema) Compatible(other Schema) bool { return s == other } @@ -94,9 +99,9 @@ type BlockIndex struct { pageHeaders []SeriesPageHeaderWithOffset // headers for each series page } -func (b *BlockIndex) DecodeHeaders(r io.ReadSeeker) error { +func (b *BlockIndex) DecodeHeaders(r io.ReadSeeker) (uint32, error) { if err := b.opts.DecodeFrom(r); err != nil { - return errors.Wrap(err, "decoding block options") + return 0, errors.Wrap(err, "decoding block options") } var ( @@ -106,24 +111,25 @@ func (b *BlockIndex) DecodeHeaders(r io.ReadSeeker) error { // last 12 bytes are (headers offset: 8 byte u64, checksum: 4 byte u32) if _, err := r.Seek(-12, io.SeekEnd); err != nil { - return errors.Wrap(err, "seeking to bloom headers metadata") + return 0, errors.Wrap(err, "seeking to bloom headers metadata") } dec.B, err = io.ReadAll(r) if err != nil { - return errors.Wrap(err, "reading bloom headers metadata") + return 0, errors.Wrap(err, "reading bloom headers metadata") } headerOffset := dec.Be64() + checksum := dec.Be32() if _, err := r.Seek(int64(headerOffset), io.SeekStart); err != nil { - return errors.Wrap(err, "seeking to index headers") + return 0, errors.Wrap(err, "seeking to index headers") } dec.B, err = io.ReadAll(r) if err != nil { - return errors.Wrap(err, "reading index page headers") + return 0, errors.Wrap(err, "reading index page headers") } if err := dec.CheckCrc(castagnoliTable); err != nil { - return errors.Wrap(err, "checksumming page headers") + return 0, errors.Wrap(err, "checksumming page headers") } b.pageHeaders = make( @@ -134,12 +140,12 @@ func (b *BlockIndex) DecodeHeaders(r io.ReadSeeker) error { for i := 0; i < len(b.pageHeaders); i++ { var s SeriesPageHeaderWithOffset if err := s.Decode(&dec); err != nil { - return errors.Wrapf(err, "decoding %dth series header", i) + return 0, errors.Wrapf(err, "decoding %dth series header", i) } b.pageHeaders[i] = s } - return nil + return checksum, nil } // decompress page and return an iterator over the bytes