Skip to content

Commit

Permalink
GH-45201: [C++][Parquet] Improve performance of generating size stati…
Browse files Browse the repository at this point in the history
…stics (#45202)

We found out in #45085 that there is a non-trivial overhead when writing size statistics is enabled.

Dramatically reduce overhead by speeding up def/rep levels histogram updates.

Performance results on the author's machine:
```
------------------------------------------------------------------------------------------------------------------------------------------------
Benchmark                                                                                      Time             CPU   Iterations UserCounters...
------------------------------------------------------------------------------------------------------------------------------------------------
BM_WritePrimitiveColumn<SizeStatisticsLevel::None, ::arrow::Int64Type>                   8103053 ns      8098569 ns           86 bytes_per_second=1003.26Mi/s items_per_second=129.477M/s output_size=537.472k page_index_size=33
BM_WritePrimitiveColumn<SizeStatisticsLevel::ColumnChunk, ::arrow::Int64Type>            8153499 ns      8148492 ns           86 bytes_per_second=997.117Mi/s items_per_second=128.683M/s output_size=537.488k page_index_size=33
BM_WritePrimitiveColumn<SizeStatisticsLevel::PageAndColumnChunk, ::arrow::Int64Type>     8212560 ns      8207754 ns           83 bytes_per_second=989.918Mi/s items_per_second=127.754M/s output_size=537.502k page_index_size=47

BM_WritePrimitiveColumn<SizeStatisticsLevel::None, ::arrow::StringType>                 10405020 ns     10400775 ns           67 bytes_per_second=444.142Mi/s items_per_second=100.817M/s output_size=848.305k page_index_size=34
BM_WritePrimitiveColumn<SizeStatisticsLevel::ColumnChunk, ::arrow::StringType>          10464784 ns     10460778 ns           66 bytes_per_second=441.594Mi/s items_per_second=100.239M/s output_size=848.325k page_index_size=34
BM_WritePrimitiveColumn<SizeStatisticsLevel::PageAndColumnChunk, ::arrow::StringType>   10469832 ns     10465739 ns           67 bytes_per_second=441.385Mi/s items_per_second=100.191M/s output_size=848.344k page_index_size=48

BM_WriteListColumn<SizeStatisticsLevel::None, ::arrow::Int64Type>                       13004962 ns     12992678 ns           52 bytes_per_second=657.101Mi/s items_per_second=80.7052M/s output_size=617.464k page_index_size=34
BM_WriteListColumn<SizeStatisticsLevel::ColumnChunk, ::arrow::Int64Type>                13718352 ns     13705599 ns           50 bytes_per_second=622.921Mi/s items_per_second=76.5071M/s output_size=617.486k page_index_size=34
BM_WriteListColumn<SizeStatisticsLevel::PageAndColumnChunk, ::arrow::Int64Type>         13845553 ns     13832138 ns           52 bytes_per_second=617.222Mi/s items_per_second=75.8072M/s output_size=617.506k page_index_size=54

BM_WriteListColumn<SizeStatisticsLevel::None, ::arrow::StringType>                      15715263 ns     15702707 ns           44 bytes_per_second=320.449Mi/s items_per_second=66.7768M/s output_size=927.326k page_index_size=35
BM_WriteListColumn<SizeStatisticsLevel::ColumnChunk, ::arrow::StringType>               16507328 ns     16493800 ns           43 bytes_per_second=305.079Mi/s items_per_second=63.5739M/s output_size=927.352k page_index_size=35
BM_WriteListColumn<SizeStatisticsLevel::PageAndColumnChunk, ::arrow::StringType>        16575359 ns     16561311 ns           42 bytes_per_second=303.836Mi/s items_per_second=63.3148M/s output_size=927.377k page_index_size=55
```

Performance results without this PR:
```
------------------------------------------------------------------------------------------------------------------------------------------------
Benchmark                                                                                      Time             CPU   Iterations UserCounters...
------------------------------------------------------------------------------------------------------------------------------------------------
BM_WritePrimitiveColumn<SizeStatisticsLevel::None, ::arrow::Int64Type>                   8042576 ns      8037678 ns           87 bytes_per_second=1010.86Mi/s items_per_second=130.458M/s output_size=537.472k page_index_size=33
BM_WritePrimitiveColumn<SizeStatisticsLevel::ColumnChunk, ::arrow::Int64Type>            9576627 ns      9571279 ns           73 bytes_per_second=848.894Mi/s items_per_second=109.554M/s output_size=537.488k page_index_size=33
BM_WritePrimitiveColumn<SizeStatisticsLevel::PageAndColumnChunk, ::arrow::Int64Type>     9570204 ns      9563595 ns           73 bytes_per_second=849.576Mi/s items_per_second=109.642M/s output_size=537.502k page_index_size=47

BM_WritePrimitiveColumn<SizeStatisticsLevel::None, ::arrow::StringType>                 10165397 ns     10160868 ns           69 bytes_per_second=454.628Mi/s items_per_second=103.197M/s output_size=848.305k page_index_size=34
BM_WritePrimitiveColumn<SizeStatisticsLevel::ColumnChunk, ::arrow::StringType>          11662568 ns     11657396 ns           60 bytes_per_second=396.265Mi/s items_per_second=89.9494M/s output_size=848.325k page_index_size=34
BM_WritePrimitiveColumn<SizeStatisticsLevel::PageAndColumnChunk, ::arrow::StringType>   11657135 ns     11653063 ns           60 bytes_per_second=396.412Mi/s items_per_second=89.9829M/s output_size=848.344k page_index_size=48

BM_WriteListColumn<SizeStatisticsLevel::None, ::arrow::Int64Type>                       13182006 ns     13168704 ns           51 bytes_per_second=648.318Mi/s items_per_second=79.6264M/s output_size=617.464k page_index_size=34
BM_WriteListColumn<SizeStatisticsLevel::ColumnChunk, ::arrow::Int64Type>                16438205 ns     16421762 ns           43 bytes_per_second=519.89Mi/s items_per_second=63.8528M/s output_size=617.486k page_index_size=34
BM_WriteListColumn<SizeStatisticsLevel::PageAndColumnChunk, ::arrow::Int64Type>         16424615 ns     16409032 ns           42 bytes_per_second=520.293Mi/s items_per_second=63.9024M/s output_size=617.506k page_index_size=54

BM_WriteListColumn<SizeStatisticsLevel::None, ::arrow::StringType>                      15387808 ns     15373086 ns           46 bytes_per_second=327.32Mi/s items_per_second=68.2086M/s output_size=927.326k page_index_size=35
BM_WriteListColumn<SizeStatisticsLevel::ColumnChunk, ::arrow::StringType>               18319628 ns     18302938 ns           37 bytes_per_second=274.924Mi/s items_per_second=57.29M/s output_size=927.352k page_index_size=35
BM_WriteListColumn<SizeStatisticsLevel::PageAndColumnChunk, ::arrow::StringType>        18346665 ns     18329336 ns           37 bytes_per_second=274.528Mi/s items_per_second=57.2075M/s output_size=927.377k page_index_size=55
```

Tested by existing tests, validated by existing benchmarks.

No.

* GitHub Issue: #45201

Authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
  • Loading branch information
pitrou authored and amoeba committed Jan 31, 2025
1 parent e696b26 commit 1b9079c
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 42 deletions.
45 changes: 20 additions & 25 deletions cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1468,42 +1468,43 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
// which case we call back to the dense write path)
std::shared_ptr<::arrow::Array> preserved_dictionary_;

int64_t WriteLevels(int64_t num_values, const int16_t* def_levels,
int64_t WriteLevels(int64_t num_levels, const int16_t* def_levels,
const int16_t* rep_levels) {
// Update histograms now, to maximize cache efficiency.
UpdateLevelHistogram(num_levels, def_levels, rep_levels);

int64_t values_to_write = 0;
// If the field is required and non-repeated, there are no definition levels
if (descr_->max_definition_level() > 0) {
for (int64_t i = 0; i < num_values; ++i) {
for (int64_t i = 0; i < num_levels; ++i) {
if (def_levels[i] == descr_->max_definition_level()) {
++values_to_write;
}
}

WriteDefinitionLevels(num_values, def_levels);
WriteDefinitionLevels(num_levels, def_levels);
} else {
// Required field, write all values
values_to_write = num_values;
values_to_write = num_levels;
}

// Not present for non-repeated fields
if (descr_->max_repetition_level() > 0) {
// A row could include more than one value
// Count the occasions where we start a new row
for (int64_t i = 0; i < num_values; ++i) {
for (int64_t i = 0; i < num_levels; ++i) {
if (rep_levels[i] == 0) {
rows_written_++;
num_buffered_rows_++;
}
}

WriteRepetitionLevels(num_values, rep_levels);
WriteRepetitionLevels(num_levels, rep_levels);
} else {
// Each value is exactly one row
rows_written_ += num_values;
num_buffered_rows_ += num_values;
rows_written_ += num_levels;
num_buffered_rows_ += num_levels;
}

UpdateLevelHistogram(num_values, def_levels, rep_levels);
return values_to_write;
}

Expand Down Expand Up @@ -1575,6 +1576,9 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<

void WriteLevelsSpaced(int64_t num_levels, const int16_t* def_levels,
const int16_t* rep_levels) {
// Update histograms now, to maximize cache efficiency.
UpdateLevelHistogram(num_levels, def_levels, rep_levels);

// If the field is required and non-repeated, there are no definition levels
if (descr_->max_definition_level() > 0) {
WriteDefinitionLevels(num_levels, def_levels);
Expand All @@ -1595,8 +1599,6 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
rows_written_ += num_levels;
num_buffered_rows_ += num_levels;
}

UpdateLevelHistogram(num_levels, def_levels, rep_levels);
}

void UpdateLevelHistogram(int64_t num_levels, const int16_t* def_levels,
Expand All @@ -1614,19 +1616,12 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
::parquet::UpdateLevelHistogram(levels, level_histogram);
};

if (descr_->max_definition_level() > 0) {
add_levels(page_size_statistics_->definition_level_histogram,
{def_levels, static_cast<size_t>(num_levels)});
} else {
page_size_statistics_->definition_level_histogram[0] += num_levels;
}

if (descr_->max_repetition_level() > 0) {
add_levels(page_size_statistics_->repetition_level_histogram,
{rep_levels, static_cast<size_t>(num_levels)});
} else {
page_size_statistics_->repetition_level_histogram[0] += num_levels;
}
add_levels(page_size_statistics_->definition_level_histogram,
{def_levels, static_cast<size_t>(num_levels)},
descr_->max_definition_level());
add_levels(page_size_statistics_->repetition_level_histogram,
{rep_levels, static_cast<size_t>(num_levels)},
descr_->max_repetition_level());
}

// Update the unencoded data bytes for ByteArray only per the specification.
Expand Down
106 changes: 100 additions & 6 deletions cpp/src/parquet/size_statistics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,27 @@
#include "parquet/size_statistics.h"

#include <algorithm>
#include <numeric>
#include <ostream>
#include <string_view>

#include "arrow/util/logging.h"
#include "parquet/exception.h"
#include "parquet/schema.h"

namespace parquet {

namespace {

void MergeLevelHistogram(::arrow::util::span<int64_t> histogram,
::arrow::util::span<const int64_t> other) {
ARROW_DCHECK_EQ(histogram.size(), other.size());
std::transform(histogram.begin(), histogram.end(), other.begin(), histogram.begin(),
std::plus<>());
}

} // namespace

void SizeStatistics::Merge(const SizeStatistics& other) {
if (repetition_level_histogram.size() != other.repetition_level_histogram.size()) {
throw ParquetException("Repetition level histogram size mismatch");
Expand All @@ -36,12 +50,8 @@ void SizeStatistics::Merge(const SizeStatistics& other) {
other.unencoded_byte_array_data_bytes.has_value()) {
throw ParquetException("Unencoded byte array data bytes are not consistent");
}
std::transform(repetition_level_histogram.begin(), repetition_level_histogram.end(),
other.repetition_level_histogram.begin(),
repetition_level_histogram.begin(), std::plus<>());
std::transform(definition_level_histogram.begin(), definition_level_histogram.end(),
other.definition_level_histogram.begin(),
definition_level_histogram.begin(), std::plus<>());
MergeLevelHistogram(repetition_level_histogram, other.repetition_level_histogram);
MergeLevelHistogram(definition_level_histogram, other.definition_level_histogram);
if (unencoded_byte_array_data_bytes.has_value()) {
unencoded_byte_array_data_bytes = unencoded_byte_array_data_bytes.value() +
other.unencoded_byte_array_data_bytes.value();
Expand Down Expand Up @@ -103,4 +113,88 @@ std::unique_ptr<SizeStatistics> SizeStatistics::Make(const ColumnDescriptor* des
return size_stats;
}

std::ostream& operator<<(std::ostream& os, const SizeStatistics& size_stats) {
constexpr std::string_view kComma = ", ";
os << "SizeStatistics{";
std::string_view sep = "";
if (size_stats.unencoded_byte_array_data_bytes.has_value()) {
os << "unencoded_byte_array_data_bytes="
<< *size_stats.unencoded_byte_array_data_bytes;
sep = kComma;
}
auto print_histogram = [&](std::string_view name,
const std::vector<int64_t>& histogram) {
if (!histogram.empty()) {
os << sep << name << "={";
sep = kComma;
std::string_view value_sep = "";
for (int64_t v : histogram) {
os << value_sep << v;
value_sep = kComma;
}
os << "}";
}
};
print_histogram("repetition_level_histogram", size_stats.repetition_level_histogram);
print_histogram("definition_level_histogram", size_stats.definition_level_histogram);
os << "}";
return os;
}

void UpdateLevelHistogram(::arrow::util::span<const int16_t> levels,
::arrow::util::span<int64_t> histogram) {
const int64_t num_levels = static_cast<int64_t>(levels.size());
DCHECK_GE(histogram.size(), 1);
const int16_t max_level = static_cast<int16_t>(histogram.size() - 1);
if (max_level == 0) {
histogram[0] += num_levels;
return;
}

#ifndef NDEBUG
for (auto level : levels) {
ARROW_DCHECK_LE(level, max_level);
}
#endif

if (max_level == 1) {
// Specialize the common case for non-repeated non-nested columns.
// Summing the levels gives us the number of 1s, and the number of 0s follows.
// We do repeated sums in the int16_t space, which the compiler is likely
// to vectorize efficiently.
constexpr int64_t kChunkSize = 1 << 14; // to avoid int16_t overflows
int64_t hist1 = 0;
auto it = levels.begin();
while (it != levels.end()) {
const auto chunk_size = std::min<int64_t>(levels.end() - it, kChunkSize);
hist1 += std::accumulate(levels.begin(), levels.begin() + chunk_size, int16_t{0});
it += chunk_size;
}
histogram[0] += num_levels - hist1;
histogram[1] += hist1;
return;
}

// The generic implementation issues a series of histogram load-stores.
// However, it limits store-to-load dependencies by interleaving partial histogram
// updates.
constexpr int kUnroll = 4;
std::array<std::vector<int64_t>, kUnroll> partial_hist;
for (auto& hist : partial_hist) {
hist.assign(histogram.size(), 0);
}
int64_t i = 0;
for (; i <= num_levels - kUnroll; i += kUnroll) {
for (int j = 0; j < kUnroll; ++j) {
++partial_hist[j][levels[i + j]];
}
}
for (; i < num_levels; ++i) {
++partial_hist[0][levels[i]];
}
for (const auto& hist : partial_hist) {
MergeLevelHistogram(histogram, hist);
}
}

} // namespace parquet
10 changes: 10 additions & 0 deletions cpp/src/parquet/size_statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

#pragma once

#include <cstdint>
#include <iosfwd>
#include <optional>
#include <vector>

#include "arrow/util/span.h"
#include "parquet/platform.h"
#include "parquet/type_fwd.h"

Expand Down Expand Up @@ -89,4 +92,11 @@ struct PARQUET_EXPORT SizeStatistics {
static std::unique_ptr<SizeStatistics> Make(const ColumnDescriptor* descr);
};

PARQUET_EXPORT
std::ostream& operator<<(std::ostream&, const SizeStatistics&);

PARQUET_EXPORT
void UpdateLevelHistogram(::arrow::util::span<const int16_t> levels,
::arrow::util::span<int64_t> histogram);

} // namespace parquet
70 changes: 59 additions & 11 deletions cpp/src/parquet/size_statistics_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,14 @@
#include "gtest/gtest.h"

#include <algorithm>
#include <ostream>
#include <random>

#include "arrow/buffer.h"
#include "arrow/table.h"
#include "arrow/testing/builder.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/span.h"
#include "parquet/arrow/reader.h"
#include "parquet/arrow/reader_internal.h"
#include "parquet/arrow/schema.h"
#include "parquet/arrow/writer.h"
#include "parquet/column_writer.h"
Expand All @@ -42,6 +40,29 @@

namespace parquet {

TEST(SizeStatistics, UpdateLevelHistogram) {
{
// max_level = 1
std::vector<int64_t> histogram(2, 0);
UpdateLevelHistogram(std::vector<int16_t>{0, 1, 1, 1, 0}, histogram);
EXPECT_THAT(histogram, ::testing::ElementsAre(2, 3));
UpdateLevelHistogram(std::vector<int16_t>{1, 1, 0}, histogram);
EXPECT_THAT(histogram, ::testing::ElementsAre(3, 5));
UpdateLevelHistogram(std::vector<int16_t>{}, histogram);
EXPECT_THAT(histogram, ::testing::ElementsAre(3, 5));
}
{
// max_level > 1
std::vector<int64_t> histogram(3, 0);
UpdateLevelHistogram(std::vector<int16_t>{0, 1, 2, 2, 0}, histogram);
EXPECT_THAT(histogram, ::testing::ElementsAre(2, 1, 2));
UpdateLevelHistogram(std::vector<int16_t>{1, 1, 0}, histogram);
EXPECT_THAT(histogram, ::testing::ElementsAre(3, 3, 2));
UpdateLevelHistogram(std::vector<int16_t>{}, histogram);
EXPECT_THAT(histogram, ::testing::ElementsAre(3, 3, 2));
}
}

TEST(SizeStatistics, ThriftSerDe) {
const std::vector<int64_t> kDefLevels = {128, 64, 32, 16};
const std::vector<int64_t> kRepLevels = {100, 80, 60, 40, 20};
Expand Down Expand Up @@ -88,13 +109,38 @@ struct PageSizeStatistics {
}
};

std::ostream& operator<<(std::ostream& os, const PageSizeStatistics& page_stats) {
constexpr std::string_view kComma = ", ";
os << "PageSizeStatistics{";
std::string_view sep = "";
auto print_vector = [&](std::string_view name, const std::vector<int64_t>& values) {
if (!values.empty()) {
os << sep << name << "={";
sep = kComma;
std::string_view value_sep = "";
for (int64_t v : values) {
os << value_sep << v;
value_sep = kComma;
}
os << "}";
}
};
print_vector("def_levels", page_stats.def_levels);
print_vector("rep_levels", page_stats.rep_levels);
print_vector("byte_array_bytes", page_stats.byte_array_bytes);
os << "}";
return os;
}

class SizeStatisticsRoundTripTest : public ::testing::Test {
public:
void WriteFile(SizeStatisticsLevel level,
const std::shared_ptr<::arrow::Table>& table) {
void WriteFile(SizeStatisticsLevel level, const std::shared_ptr<::arrow::Table>& table,
int max_row_group_length, int page_size,
int write_batch_size = DEFAULT_WRITE_BATCH_SIZE) {
auto writer_properties = WriterProperties::Builder()
.max_row_group_length(2) /* every row group has 2 rows */
->data_pagesize(1) /* every page has 1 row */
.max_row_group_length(max_row_group_length)
->data_pagesize(page_size)
->write_batch_size(write_batch_size)
->enable_write_page_index()
->enable_statistics()
->set_size_statistics_level(level)
Expand Down Expand Up @@ -127,6 +173,7 @@ class SizeStatisticsRoundTripTest : public ::testing::Test {
ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer_));

// Read row group size statistics in order.
row_group_stats_.clear();
auto metadata = reader->metadata();
for (int i = 0; i < metadata->num_row_groups(); ++i) {
auto row_group_metadata = metadata->RowGroup(i);
Expand All @@ -138,6 +185,7 @@ class SizeStatisticsRoundTripTest : public ::testing::Test {
}

// Read page size statistics in order.
page_stats_.clear();
auto page_index_reader = reader->GetPageIndexReader();
ASSERT_NE(page_index_reader, nullptr);

Expand Down Expand Up @@ -197,7 +245,7 @@ TEST_F(SizeStatisticsRoundTripTest, EnableSizeStats) {
::arrow::field("a", ::arrow::list(::arrow::list(::arrow::int32()))),
::arrow::field("b", ::arrow::list(::arrow::list(::arrow::utf8()))),
});
// First two rows are in one row group, and the other two rows are in another row group.
// First two rows will be in one row group, and the other two rows in another row group.
auto table = ::arrow::TableFromJSON(schema, {R"([
[ [[1],[1,1],[1,1,1]], [["a"],["a","a"],["a","a","a"]] ],
[ [[0,1,null]], [["foo","bar",null]] ],
Expand All @@ -208,7 +256,7 @@ TEST_F(SizeStatisticsRoundTripTest, EnableSizeStats) {
for (auto size_stats_level :
{SizeStatisticsLevel::None, SizeStatisticsLevel::ColumnChunk,
SizeStatisticsLevel::PageAndColumnChunk}) {
WriteFile(size_stats_level, table);
WriteFile(size_stats_level, table, /*max_row_group_length=*/2, /*page_size=*/1);
ReadSizeStatistics();

if (size_stats_level == SizeStatisticsLevel::None) {
Expand Down Expand Up @@ -261,8 +309,8 @@ TEST_F(SizeStatisticsRoundTripTest, WriteDictionaryArray) {
{::arrow::field("a", ::arrow::dictionary(::arrow::int16(), ::arrow::utf8()))});
WriteFile(
SizeStatisticsLevel::PageAndColumnChunk,
::arrow::TableFromJSON(schema, {R"([["aa"],["aaa"],[null],["a"],["aaa"],["a"]])"}));

::arrow::TableFromJSON(schema, {R"([["aa"],["aaa"],[null],["a"],["aaa"],["a"]])"}),
/*max_row_group_length=*/2, /*page_size=*/1);
ReadSizeStatistics();
EXPECT_THAT(row_group_stats_,
::testing::ElementsAre(SizeStatistics{/*def_levels=*/{0, 2},
Expand Down

0 comments on commit 1b9079c

Please sign in to comment.