diff --git a/cpp/src/parquet/bloom_filter.cc b/cpp/src/parquet/bloom_filter.cc index 427e73b9e6428..5201d2f6c514a 100644 --- a/cpp/src/parquet/bloom_filter.cc +++ b/cpp/src/parquet/bloom_filter.cc @@ -105,16 +105,24 @@ static ::arrow::Status ValidateBloomFilterHeader( } BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize( - const ReaderProperties& properties, ArrowInputStream* input) { - // NOTE: we don't know the bloom filter header size upfront, and we can't rely on - // InputStream::Peek() which isn't always implemented. Therefore, we must first - // Read() with an upper bound estimate of the header size, then once we know - // the bloom filter data size, we can Read() the exact number of remaining data bytes. + const ReaderProperties& properties, ArrowInputStream* input, + std::optional bloom_filter_length) { ThriftDeserializer deserializer(properties); format::BloomFilterHeader header; + int64_t bloom_filter_header_read_size = 0; + if (bloom_filter_length.has_value()) { + bloom_filter_header_read_size = bloom_filter_length.value(); + } else { + // NOTE: we don't know the bloom filter header size upfront without + // bloom_filter_length, and we can't rely on InputStream::Peek() which isn't always + // implemented. Therefore, we must first Read() with an upper bound estimate of the + // header size, then once we know the bloom filter data size, we can Read() the exact + // number of remaining data bytes. + bloom_filter_header_read_size = kBloomFilterHeaderSizeGuess; + } // Read and deserialize bloom filter header - PARQUET_ASSIGN_OR_THROW(auto header_buf, input->Read(kBloomFilterHeaderSizeGuess)); + PARQUET_ASSIGN_OR_THROW(auto header_buf, input->Read(bloom_filter_header_read_size)); // This gets used, then set by DeserializeThriftMsg uint32_t header_size = static_cast(header_buf->size()); try { @@ -136,6 +144,14 @@ BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize( bloom_filter.Init(header_buf->data() + header_size, bloom_filter_size); return bloom_filter; } + if (bloom_filter_length && *bloom_filter_length != bloom_filter_size + header_size) { + // We know the bloom filter data size, but the real size is different. + std::stringstream ss; + ss << "Bloom filter length (" << bloom_filter_length.value() + << ") does not match the actual bloom filter (size: " + << bloom_filter_size + header_size << ")."; + throw ParquetException(ss.str()); + } // We have read a part of the bloom filter already, copy it to the target buffer // and read the remaining part from the InputStream. auto buffer = AllocateBuffer(properties.memory_pool(), bloom_filter_size); diff --git a/cpp/src/parquet/bloom_filter.h b/cpp/src/parquet/bloom_filter.h index e8ef5c0bd60db..909563d013fed 100644 --- a/cpp/src/parquet/bloom_filter.h +++ b/cpp/src/parquet/bloom_filter.h @@ -310,10 +310,13 @@ class PARQUET_EXPORT BlockSplitBloomFilter : public BloomFilter { /// a Bloom filter from a parquet filter. /// /// @param properties The parquet reader properties. - /// @param input_stream The input stream from which to construct the Bloom filter. + /// @param input_stream The input stream from which to construct the bloom filter. + /// @param bloom_filter_length The length of the serialized bloom filter including + /// header. /// @return The BlockSplitBloomFilter. - static BlockSplitBloomFilter Deserialize(const ReaderProperties& properties, - ArrowInputStream* input_stream); + static BlockSplitBloomFilter Deserialize( + const ReaderProperties& properties, ArrowInputStream* input_stream, + std::optional bloom_filter_length = std::nullopt); private: inline void InsertHashImpl(uint64_t hash); diff --git a/cpp/src/parquet/bloom_filter_reader.cc b/cpp/src/parquet/bloom_filter_reader.cc index 4e27a940c2f5e..3518d2ba1eb76 100644 --- a/cpp/src/parquet/bloom_filter_reader.cc +++ b/cpp/src/parquet/bloom_filter_reader.cc @@ -63,9 +63,20 @@ std::unique_ptr RowGroupBloomFilterReaderImpl::GetColumnBloomFilter if (file_size <= *bloom_filter_offset) { throw ParquetException("file size less or equal than bloom offset"); } + std::optional bloom_filter_length = col_chunk->bloom_filter_length(); + if (bloom_filter_length.has_value()) { + if (*bloom_filter_length < 0) { + throw ParquetException("bloom_filter_length less than 0"); + } + if (*bloom_filter_length + *bloom_filter_offset > file_size) { + throw ParquetException( + "bloom filter length + bloom filter offset greater than file size"); + } + } auto stream = ::arrow::io::RandomAccessFile::GetStream( input_, *bloom_filter_offset, file_size - *bloom_filter_offset); - auto bloom_filter = BlockSplitBloomFilter::Deserialize(properties_, stream->get()); + auto bloom_filter = + BlockSplitBloomFilter::Deserialize(properties_, stream->get(), bloom_filter_length); return std::make_unique(std::move(bloom_filter)); } diff --git a/cpp/src/parquet/bloom_filter_reader_test.cc b/cpp/src/parquet/bloom_filter_reader_test.cc index e297ab7045120..f732b4a8e22b7 100644 --- a/cpp/src/parquet/bloom_filter_reader_test.cc +++ b/cpp/src/parquet/bloom_filter_reader_test.cc @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +#include #include #include "parquet/bloom_filter.h" @@ -25,31 +26,41 @@ namespace parquet::test { TEST(BloomFilterReader, ReadBloomFilter) { - std::string dir_string(parquet::test::get_data_dir()); - std::string path = dir_string + "/data_index_bloom_encoding_stats.parquet"; - auto reader = ParquetFileReader::OpenFile(path, false); - auto file_metadata = reader->metadata(); - EXPECT_FALSE(file_metadata->is_encryption_algorithm_set()); - auto& bloom_filter_reader = reader->GetBloomFilterReader(); - auto row_group_0 = bloom_filter_reader.RowGroup(0); - ASSERT_NE(nullptr, row_group_0); - EXPECT_THROW(bloom_filter_reader.RowGroup(1), ParquetException); - auto bloom_filter = row_group_0->GetColumnBloomFilter(0); - ASSERT_NE(nullptr, bloom_filter); - EXPECT_THROW(row_group_0->GetColumnBloomFilter(1), ParquetException); + std::vector files = {"data_index_bloom_encoding_stats.parquet", + "data_index_bloom_encoding_with_length.parquet"}; + for (const auto& test_file : files) { + std::string dir_string(parquet::test::get_data_dir()); + std::string path = dir_string + "/" + test_file; + auto reader = ParquetFileReader::OpenFile(path, /*memory_map=*/false); + auto file_metadata = reader->metadata(); + EXPECT_FALSE(file_metadata->is_encryption_algorithm_set()); + auto& bloom_filter_reader = reader->GetBloomFilterReader(); + auto row_group_0 = bloom_filter_reader.RowGroup(0); + ASSERT_NE(nullptr, row_group_0); + EXPECT_THROW_THAT( + [&]() { bloom_filter_reader.RowGroup(1); }, ParquetException, + ::testing::Property(&ParquetException::what, + ::testing::HasSubstr("Invalid row group ordinal"))); + auto bloom_filter = row_group_0->GetColumnBloomFilter(0); + ASSERT_NE(nullptr, bloom_filter); + EXPECT_THROW_THAT([&]() { row_group_0->GetColumnBloomFilter(1); }, ParquetException, + ::testing::Property(&ParquetException::what, + ::testing::HasSubstr( + "Invalid column index at column ordinal"))); - // assert exists - { - std::string_view sv = "Hello"; - ByteArray ba{sv}; - EXPECT_TRUE(bloom_filter->FindHash(bloom_filter->Hash(&ba))); - } + // assert exists + { + std::string_view sv = "Hello"; + ByteArray ba{sv}; + EXPECT_TRUE(bloom_filter->FindHash(bloom_filter->Hash(&ba))); + } - // no exists - { - std::string_view sv = "NOT_EXISTS"; - ByteArray ba{sv}; - EXPECT_FALSE(bloom_filter->FindHash(bloom_filter->Hash(&ba))); + // no exists + { + std::string_view sv = "NOT_EXISTS"; + ByteArray ba{sv}; + EXPECT_FALSE(bloom_filter->FindHash(bloom_filter->Hash(&ba))); + } } } diff --git a/cpp/src/parquet/bloom_filter_test.cc b/cpp/src/parquet/bloom_filter_test.cc index b7d93bce4d37b..ff83b97302274 100644 --- a/cpp/src/parquet/bloom_filter_test.cc +++ b/cpp/src/parquet/bloom_filter_test.cc @@ -107,24 +107,26 @@ TEST(BasicTest, TestBloomFilter) { // Deserialize Bloom filter from memory ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish()); - ::arrow::io::BufferReader source(buffer); ReaderProperties reader_properties; - BlockSplitBloomFilter de_bloom = - BlockSplitBloomFilter::Deserialize(reader_properties, &source); - - // Lookup previously inserted values - for (const auto v : kIntInserts) { - EXPECT_TRUE(de_bloom.FindHash(de_bloom.Hash(v))); - } - for (const auto v : kFloatInserts) { - EXPECT_TRUE(de_bloom.FindHash(de_bloom.Hash(v))); + for (std::optional bloom_filter_length : + std::vector>{std::nullopt, buffer->size()}) { + ::arrow::io::BufferReader source(buffer); + BlockSplitBloomFilter de_bloom = BlockSplitBloomFilter::Deserialize( + reader_properties, &source, bloom_filter_length); + // Lookup previously inserted values + for (const auto v : kIntInserts) { + EXPECT_TRUE(de_bloom.FindHash(de_bloom.Hash(v))); + } + for (const auto v : kFloatInserts) { + EXPECT_TRUE(de_bloom.FindHash(de_bloom.Hash(v))); + } + false_positives = 0; + for (const auto v : kNegativeIntLookups) { + false_positives += de_bloom.FindHash(de_bloom.Hash(v)); + } + EXPECT_LE(false_positives, 2); } - false_positives = 0; - for (const auto v : kNegativeIntLookups) { - false_positives += de_bloom.FindHash(de_bloom.Hash(v)); - } - EXPECT_LE(false_positives, 2); } } diff --git a/cpp/src/parquet/metadata.cc b/cpp/src/parquet/metadata.cc index f43187c2dd4e5..d651ea5db0f18 100644 --- a/cpp/src/parquet/metadata.cc +++ b/cpp/src/parquet/metadata.cc @@ -286,6 +286,13 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl { return std::nullopt; } + inline std::optional bloom_filter_length() const { + if (column_metadata_->__isset.bloom_filter_length) { + return column_metadata_->bloom_filter_length; + } + return std::nullopt; + } + inline bool has_dictionary_page() const { return column_metadata_->__isset.dictionary_page_offset; } @@ -399,6 +406,10 @@ std::optional ColumnChunkMetaData::bloom_filter_offset() const { return impl_->bloom_filter_offset(); } +std::optional ColumnChunkMetaData::bloom_filter_length() const { + return impl_->bloom_filter_length(); +} + bool ColumnChunkMetaData::has_dictionary_page() const { return impl_->has_dictionary_page(); } diff --git a/cpp/src/parquet/metadata.h b/cpp/src/parquet/metadata.h index 6609cff48bac2..e47c45ff0492a 100644 --- a/cpp/src/parquet/metadata.h +++ b/cpp/src/parquet/metadata.h @@ -168,6 +168,7 @@ class PARQUET_EXPORT ColumnChunkMetaData { const std::vector& encodings() const; const std::vector& encoding_stats() const; std::optional bloom_filter_offset() const; + std::optional bloom_filter_length() const; bool has_dictionary_page() const; int64_t dictionary_page_offset() const; int64_t data_page_offset() const; diff --git a/cpp/src/parquet/printer.cc b/cpp/src/parquet/printer.cc index 2c81abb9eee79..f11397ab96ed8 100644 --- a/cpp/src/parquet/printer.cc +++ b/cpp/src/parquet/printer.cc @@ -314,6 +314,34 @@ void ParquetFilePrinter::JSONPrint(std::ostream& stream, std::list selected << "\"UncompressedSize\": \"" << column_chunk->total_uncompressed_size() << "\", \"CompressedSize\": \"" << column_chunk->total_compressed_size(); + if (column_chunk->bloom_filter_offset()) { + // Output BloomFilter {offset, length} + stream << "\", BloomFilter {" + << "\"offset\": \"" << column_chunk->bloom_filter_offset().value(); + if (column_chunk->bloom_filter_length()) { + stream << "\", \"length\": \"" << column_chunk->bloom_filter_length().value(); + } + stream << "\"}"; + } + + if (column_chunk->GetColumnIndexLocation()) { + auto location = column_chunk->GetColumnIndexLocation().value(); + // Output ColumnIndex {offset, length} + stream << "\", ColumnIndex {" + << "\"offset\": \"" << location.offset; + stream << "\", \"length\": \"" << location.length; + stream << "\"}"; + } + + if (column_chunk->GetOffsetIndexLocation()) { + auto location = column_chunk->GetOffsetIndexLocation().value(); + // Output OffsetIndex {offset, length} + stream << "\", OffsetIndex {" + << "\"offset\": \"" << location.offset; + stream << "\", \"length\": \"" << location.length; + stream << "\"}"; + } + // end of a ColumnChunk stream << "\" }"; c1++;