Skip to content

Commit

Permalink
GH-38860: [C++][Parquet] Using length to optimize bloom filter read (#…
Browse files Browse the repository at this point in the history
…38863)

### Rationale for this change

Parquet supports a bloom_filter_length in 2.10[1]. We'd like to using this length for read.

The current implemention [2] using the code below:

1. Using a "guessed" header length to read the header. The header is likely to be 40B, but we use a larger value to avoid it evolves
2. From the header, we get the bloom filter length, and load it from input.

Now, we can directly load the whole bloom-filter, without reading twice. We shouldn't remove the stale code because we need to read the stale file.

We also need to generate a new parquet-testing file ( I can do this ASAP )

[1] https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift#L824
[2] https://github.com/apache/arrow/blob/main/cpp/src/parquet/bloom_filter.cc#L117

### What changes are included in this PR?

* [x] Support Basic read with `bloom_filter_length`
* [x] Enhance the JsonPrinter
* [x] testing

### Are these changes tested?

* [x] testing using parquet-testing

### Are there any user-facing changes?

* Closes: #38860

Lead-authored-by: mwish <[email protected]>
Co-authored-by: mwish <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
  • Loading branch information
mapleFU and pitrou authored Nov 29, 2023
1 parent 9aff60a commit be1dcdb
Show file tree
Hide file tree
Showing 8 changed files with 131 additions and 48 deletions.
28 changes: 22 additions & 6 deletions cpp/src/parquet/bloom_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,24 @@ static ::arrow::Status ValidateBloomFilterHeader(
}

BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(
const ReaderProperties& properties, ArrowInputStream* input) {
// NOTE: we don't know the bloom filter header size upfront, and we can't rely on
// InputStream::Peek() which isn't always implemented. Therefore, we must first
// Read() with an upper bound estimate of the header size, then once we know
// the bloom filter data size, we can Read() the exact number of remaining data bytes.
const ReaderProperties& properties, ArrowInputStream* input,
std::optional<int64_t> bloom_filter_length) {
ThriftDeserializer deserializer(properties);
format::BloomFilterHeader header;
int64_t bloom_filter_header_read_size = 0;
if (bloom_filter_length.has_value()) {
bloom_filter_header_read_size = bloom_filter_length.value();
} else {
// NOTE: we don't know the bloom filter header size upfront without
// bloom_filter_length, and we can't rely on InputStream::Peek() which isn't always
// implemented. Therefore, we must first Read() with an upper bound estimate of the
// header size, then once we know the bloom filter data size, we can Read() the exact
// number of remaining data bytes.
bloom_filter_header_read_size = kBloomFilterHeaderSizeGuess;
}

// Read and deserialize bloom filter header
PARQUET_ASSIGN_OR_THROW(auto header_buf, input->Read(kBloomFilterHeaderSizeGuess));
PARQUET_ASSIGN_OR_THROW(auto header_buf, input->Read(bloom_filter_header_read_size));
// This gets used, then set by DeserializeThriftMsg
uint32_t header_size = static_cast<uint32_t>(header_buf->size());
try {
Expand All @@ -136,6 +144,14 @@ BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(
bloom_filter.Init(header_buf->data() + header_size, bloom_filter_size);
return bloom_filter;
}
if (bloom_filter_length && *bloom_filter_length != bloom_filter_size + header_size) {
// We know the bloom filter data size, but the real size is different.
std::stringstream ss;
ss << "Bloom filter length (" << bloom_filter_length.value()
<< ") does not match the actual bloom filter (size: "
<< bloom_filter_size + header_size << ").";
throw ParquetException(ss.str());
}
// We have read a part of the bloom filter already, copy it to the target buffer
// and read the remaining part from the InputStream.
auto buffer = AllocateBuffer(properties.memory_pool(), bloom_filter_size);
Expand Down
9 changes: 6 additions & 3 deletions cpp/src/parquet/bloom_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -310,10 +310,13 @@ class PARQUET_EXPORT BlockSplitBloomFilter : public BloomFilter {
/// a Bloom filter from a parquet filter.
///
/// @param properties The parquet reader properties.
/// @param input_stream The input stream from which to construct the Bloom filter.
/// @param input_stream The input stream from which to construct the bloom filter.
/// @param bloom_filter_length The length of the serialized bloom filter including
/// header.
/// @return The BlockSplitBloomFilter.
static BlockSplitBloomFilter Deserialize(const ReaderProperties& properties,
ArrowInputStream* input_stream);
static BlockSplitBloomFilter Deserialize(
const ReaderProperties& properties, ArrowInputStream* input_stream,
std::optional<int64_t> bloom_filter_length = std::nullopt);

private:
inline void InsertHashImpl(uint64_t hash);
Expand Down
13 changes: 12 additions & 1 deletion cpp/src/parquet/bloom_filter_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,20 @@ std::unique_ptr<BloomFilter> RowGroupBloomFilterReaderImpl::GetColumnBloomFilter
if (file_size <= *bloom_filter_offset) {
throw ParquetException("file size less or equal than bloom offset");
}
std::optional<int64_t> bloom_filter_length = col_chunk->bloom_filter_length();
if (bloom_filter_length.has_value()) {
if (*bloom_filter_length < 0) {
throw ParquetException("bloom_filter_length less than 0");
}
if (*bloom_filter_length + *bloom_filter_offset > file_size) {
throw ParquetException(
"bloom filter length + bloom filter offset greater than file size");
}
}
auto stream = ::arrow::io::RandomAccessFile::GetStream(
input_, *bloom_filter_offset, file_size - *bloom_filter_offset);
auto bloom_filter = BlockSplitBloomFilter::Deserialize(properties_, stream->get());
auto bloom_filter =
BlockSplitBloomFilter::Deserialize(properties_, stream->get(), bloom_filter_length);
return std::make_unique<BlockSplitBloomFilter>(std::move(bloom_filter));
}

Expand Down
57 changes: 34 additions & 23 deletions cpp/src/parquet/bloom_filter_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

#include <gmock/gmock.h>
#include <gtest/gtest.h>

#include "parquet/bloom_filter.h"
Expand All @@ -25,31 +26,41 @@
namespace parquet::test {

TEST(BloomFilterReader, ReadBloomFilter) {
std::string dir_string(parquet::test::get_data_dir());
std::string path = dir_string + "/data_index_bloom_encoding_stats.parquet";
auto reader = ParquetFileReader::OpenFile(path, false);
auto file_metadata = reader->metadata();
EXPECT_FALSE(file_metadata->is_encryption_algorithm_set());
auto& bloom_filter_reader = reader->GetBloomFilterReader();
auto row_group_0 = bloom_filter_reader.RowGroup(0);
ASSERT_NE(nullptr, row_group_0);
EXPECT_THROW(bloom_filter_reader.RowGroup(1), ParquetException);
auto bloom_filter = row_group_0->GetColumnBloomFilter(0);
ASSERT_NE(nullptr, bloom_filter);
EXPECT_THROW(row_group_0->GetColumnBloomFilter(1), ParquetException);
std::vector<std::string> files = {"data_index_bloom_encoding_stats.parquet",
"data_index_bloom_encoding_with_length.parquet"};
for (const auto& test_file : files) {
std::string dir_string(parquet::test::get_data_dir());
std::string path = dir_string + "/" + test_file;
auto reader = ParquetFileReader::OpenFile(path, /*memory_map=*/false);
auto file_metadata = reader->metadata();
EXPECT_FALSE(file_metadata->is_encryption_algorithm_set());
auto& bloom_filter_reader = reader->GetBloomFilterReader();
auto row_group_0 = bloom_filter_reader.RowGroup(0);
ASSERT_NE(nullptr, row_group_0);
EXPECT_THROW_THAT(
[&]() { bloom_filter_reader.RowGroup(1); }, ParquetException,
::testing::Property(&ParquetException::what,
::testing::HasSubstr("Invalid row group ordinal")));
auto bloom_filter = row_group_0->GetColumnBloomFilter(0);
ASSERT_NE(nullptr, bloom_filter);
EXPECT_THROW_THAT([&]() { row_group_0->GetColumnBloomFilter(1); }, ParquetException,
::testing::Property(&ParquetException::what,
::testing::HasSubstr(
"Invalid column index at column ordinal")));

// assert exists
{
std::string_view sv = "Hello";
ByteArray ba{sv};
EXPECT_TRUE(bloom_filter->FindHash(bloom_filter->Hash(&ba)));
}
// assert exists
{
std::string_view sv = "Hello";
ByteArray ba{sv};
EXPECT_TRUE(bloom_filter->FindHash(bloom_filter->Hash(&ba)));
}

// no exists
{
std::string_view sv = "NOT_EXISTS";
ByteArray ba{sv};
EXPECT_FALSE(bloom_filter->FindHash(bloom_filter->Hash(&ba)));
// no exists
{
std::string_view sv = "NOT_EXISTS";
ByteArray ba{sv};
EXPECT_FALSE(bloom_filter->FindHash(bloom_filter->Hash(&ba)));
}
}
}

Expand Down
32 changes: 17 additions & 15 deletions cpp/src/parquet/bloom_filter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,24 +107,26 @@ TEST(BasicTest, TestBloomFilter) {

// Deserialize Bloom filter from memory
ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
::arrow::io::BufferReader source(buffer);

ReaderProperties reader_properties;
BlockSplitBloomFilter de_bloom =
BlockSplitBloomFilter::Deserialize(reader_properties, &source);

// Lookup previously inserted values
for (const auto v : kIntInserts) {
EXPECT_TRUE(de_bloom.FindHash(de_bloom.Hash(v)));
}
for (const auto v : kFloatInserts) {
EXPECT_TRUE(de_bloom.FindHash(de_bloom.Hash(v)));
for (std::optional<int64_t> bloom_filter_length :
std::vector<std::optional<int64_t>>{std::nullopt, buffer->size()}) {
::arrow::io::BufferReader source(buffer);
BlockSplitBloomFilter de_bloom = BlockSplitBloomFilter::Deserialize(
reader_properties, &source, bloom_filter_length);
// Lookup previously inserted values
for (const auto v : kIntInserts) {
EXPECT_TRUE(de_bloom.FindHash(de_bloom.Hash(v)));
}
for (const auto v : kFloatInserts) {
EXPECT_TRUE(de_bloom.FindHash(de_bloom.Hash(v)));
}
false_positives = 0;
for (const auto v : kNegativeIntLookups) {
false_positives += de_bloom.FindHash(de_bloom.Hash(v));
}
EXPECT_LE(false_positives, 2);
}
false_positives = 0;
for (const auto v : kNegativeIntLookups) {
false_positives += de_bloom.FindHash(de_bloom.Hash(v));
}
EXPECT_LE(false_positives, 2);
}
}

Expand Down
11 changes: 11 additions & 0 deletions cpp/src/parquet/metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,13 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
return std::nullopt;
}

inline std::optional<int64_t> bloom_filter_length() const {
if (column_metadata_->__isset.bloom_filter_length) {
return column_metadata_->bloom_filter_length;
}
return std::nullopt;
}

inline bool has_dictionary_page() const {
return column_metadata_->__isset.dictionary_page_offset;
}
Expand Down Expand Up @@ -399,6 +406,10 @@ std::optional<int64_t> ColumnChunkMetaData::bloom_filter_offset() const {
return impl_->bloom_filter_offset();
}

std::optional<int64_t> ColumnChunkMetaData::bloom_filter_length() const {
return impl_->bloom_filter_length();
}

bool ColumnChunkMetaData::has_dictionary_page() const {
return impl_->has_dictionary_page();
}
Expand Down
1 change: 1 addition & 0 deletions cpp/src/parquet/metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ class PARQUET_EXPORT ColumnChunkMetaData {
const std::vector<Encoding::type>& encodings() const;
const std::vector<PageEncodingStats>& encoding_stats() const;
std::optional<int64_t> bloom_filter_offset() const;
std::optional<int64_t> bloom_filter_length() const;
bool has_dictionary_page() const;
int64_t dictionary_page_offset() const;
int64_t data_page_offset() const;
Expand Down
28 changes: 28 additions & 0 deletions cpp/src/parquet/printer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,34 @@ void ParquetFilePrinter::JSONPrint(std::ostream& stream, std::list<int> selected
<< "\"UncompressedSize\": \"" << column_chunk->total_uncompressed_size()
<< "\", \"CompressedSize\": \"" << column_chunk->total_compressed_size();

if (column_chunk->bloom_filter_offset()) {
// Output BloomFilter {offset, length}
stream << "\", BloomFilter {"
<< "\"offset\": \"" << column_chunk->bloom_filter_offset().value();
if (column_chunk->bloom_filter_length()) {
stream << "\", \"length\": \"" << column_chunk->bloom_filter_length().value();
}
stream << "\"}";
}

if (column_chunk->GetColumnIndexLocation()) {
auto location = column_chunk->GetColumnIndexLocation().value();
// Output ColumnIndex {offset, length}
stream << "\", ColumnIndex {"
<< "\"offset\": \"" << location.offset;
stream << "\", \"length\": \"" << location.length;
stream << "\"}";
}

if (column_chunk->GetOffsetIndexLocation()) {
auto location = column_chunk->GetOffsetIndexLocation().value();
// Output OffsetIndex {offset, length}
stream << "\", OffsetIndex {"
<< "\"offset\": \"" << location.offset;
stream << "\", \"length\": \"" << location.length;
stream << "\"}";
}

// end of a ColumnChunk
stream << "\" }";
c1++;
Expand Down

0 comments on commit be1dcdb

Please sign in to comment.