Skip to content

Commit

Permalink
Measure the number of Parquet row groups filtered by predicate pushdo…
Browse files Browse the repository at this point in the history
…wn (#17594)

Closes #17164

This PR adds a method to measure the number of remaining row groups after stats and bloom filtering during predicate pushdown.

Authors:
  - Muhammad Haseeb (https://github.com/mhaseeb123)
  - GALI PREM SAGAR (https://github.com/galipremsagar)

Approvers:
  - Vukasin Milovanovic (https://github.com/vuule)
  - MithunR (https://github.com/mythrocks)

URL: #17594
  • Loading branch information
mhaseeb123 authored Jan 31, 2025
1 parent 0ce2d2d commit 10c1fb4
Show file tree
Hide file tree
Showing 9 changed files with 332 additions and 129 deletions.
17 changes: 14 additions & 3 deletions cpp/include/cudf/io/types.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
* Copyright (c) 2019-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -277,13 +277,24 @@ struct column_name_info {
struct table_metadata {
std::vector<column_name_info>
schema_info; //!< Detailed name information for the entire output hierarchy
std::vector<size_t> num_rows_per_source; //!< Number of rows read from each data source.
std::vector<size_t> num_rows_per_source; //!< Number of rows read from each data source
//!< Currently only computed for Parquet readers if no
//!< AST filters being used. Empty vector otherwise.
//!< AST filters being used. Empty vector otherwise
std::map<std::string, std::string> user_data; //!< Format-dependent metadata of the first input
//!< file as key-values pairs (deprecated)
std::vector<std::unordered_map<std::string, std::string>>
per_file_user_data; //!< Per file format-dependent metadata as key-values pairs

// The following variables are currently only computed for Parquet reader
size_type num_input_row_groups{0}; //!< Total number of input row groups across all data sources
std::optional<size_type>
num_row_groups_after_stats_filter; //!< Number of remaining row groups after stats filter.
//!< std::nullopt if no filtering done. Currently only
//!< reported by Parquet readers
std::optional<size_type>
num_row_groups_after_bloom_filter; //!< Number of remaining row groups after bloom filter.
//!< std::nullopt if no filtering done. Currently only
//!< reported by Parquet readers
};

/**
Expand Down
36 changes: 15 additions & 21 deletions cpp/src/io/parquet/bloom_filter_reader.cu
Original file line number Diff line number Diff line change
Expand Up @@ -599,9 +599,11 @@ std::vector<Type> aggregate_reader_metadata::get_parquet_types(
return parquet_types;
}

std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::apply_bloom_filters(
std::pair<std::optional<std::vector<std::vector<size_type>>>, bool>
aggregate_reader_metadata::apply_bloom_filters(
host_span<std::unique_ptr<datasource> const> sources,
host_span<std::vector<size_type> const> input_row_group_indices,
size_type total_row_groups,
host_span<data_type const> output_dtypes,
host_span<int const> output_column_schemas,
std::reference_wrapper<ast::expression const> filter,
Expand All @@ -610,17 +612,6 @@ std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::ap
// Number of input table columns
auto const num_input_columns = static_cast<cudf::size_type>(output_dtypes.size());

// Total number of row groups after StatsAST filtration
auto const total_row_groups = std::accumulate(
input_row_group_indices.begin(),
input_row_group_indices.end(),
size_t{0},
[](size_t sum, auto const& per_file_row_groups) { return sum + per_file_row_groups.size(); });

// Check if we have less than 2B total row groups.
CUDF_EXPECTS(total_row_groups <= std::numeric_limits<cudf::size_type>::max(),
"Total number of row groups exceed the size_type's limit");

// Collect equality literals for each input table column
auto const equality_literals =
equality_literals_collector{filter.get(), num_input_columns}.get_equality_literals();
Expand All @@ -635,7 +626,7 @@ std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::ap
[](auto& eq_literals) { return not eq_literals.empty(); });

// Return early if no column with equality predicate(s)
if (equality_col_schemas.empty()) { return std::nullopt; }
if (equality_col_schemas.empty()) { return {std::nullopt, false}; }

// Required alignment:
// https://github.com/NVIDIA/cuCollections/blob/deab5799f3e4226cb8a49acf2199c03b14941ee4/include/cuco/detail/bloom_filter/bloom_filter_impl.cuh#L55-L67
Expand All @@ -654,8 +645,8 @@ std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::ap
auto bloom_filter_data = read_bloom_filters(
sources, input_row_group_indices, equality_col_schemas, total_row_groups, stream, aligned_mr);

// No bloom filter buffers, return the original row group indices
if (bloom_filter_data.empty()) { return std::nullopt; }
// No bloom filter buffers, return early
if (bloom_filter_data.empty()) { return {std::nullopt, false}; }

// Get parquet types for the predicate columns
auto const parquet_types = get_parquet_types(input_row_group_indices, equality_col_schemas);
Expand All @@ -676,8 +667,10 @@ std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::ap
h_bloom_filter_spans, stream, cudf::get_current_device_resource_ref());

// Create a bloom filter query table caster
bloom_filter_caster const bloom_filter_col{
bloom_filter_spans, parquet_types, total_row_groups, equality_col_schemas.size()};
bloom_filter_caster const bloom_filter_col{bloom_filter_spans,
parquet_types,
static_cast<size_t>(total_row_groups),
equality_col_schemas.size()};

// Converts bloom filter membership for equality predicate columns to a table
// containing a column for each `col[i] == literal` predicate to be evaluated.
Expand Down Expand Up @@ -714,10 +707,11 @@ std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::ap

// Filter bloom filter membership table with the BloomfilterAST expression and collect
// filtered row group indices
return collect_filtered_row_group_indices(bloom_filter_membership_table,
bloom_filter_expr.get_bloom_filter_expr(),
input_row_group_indices,
stream);
return {collect_filtered_row_group_indices(bloom_filter_membership_table,
bloom_filter_expr.get_bloom_filter_expr(),
input_row_group_indices,
stream),
true};
}

} // namespace cudf::io::parquet::detail
75 changes: 39 additions & 36 deletions cpp/src/io/parquet/predicate_pushdown.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -388,40 +388,17 @@ class stats_expression_converter : public ast::detail::expression_transformer {
};
} // namespace

std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::filter_row_groups(
std::pair<std::optional<std::vector<std::vector<size_type>>>, surviving_row_group_metrics>
aggregate_reader_metadata::filter_row_groups(
host_span<std::unique_ptr<datasource> const> sources,
host_span<std::vector<size_type> const> row_group_indices,
host_span<std::vector<size_type> const> input_row_group_indices,
size_type total_row_groups,
host_span<data_type const> output_dtypes,
host_span<int const> output_column_schemas,
std::reference_wrapper<ast::expression const> filter,
rmm::cuda_stream_view stream) const
{
auto mr = cudf::get_current_device_resource_ref();
// Create row group indices.
std::vector<std::vector<size_type>> all_row_group_indices;
host_span<std::vector<size_type> const> input_row_group_indices;
if (row_group_indices.empty()) {
std::transform(per_file_metadata.cbegin(),
per_file_metadata.cend(),
std::back_inserter(all_row_group_indices),
[](auto const& file_meta) {
std::vector<size_type> rg_idx(file_meta.row_groups.size());
std::iota(rg_idx.begin(), rg_idx.end(), 0);
return rg_idx;
});
input_row_group_indices = host_span<std::vector<size_type> const>(all_row_group_indices);
} else {
input_row_group_indices = row_group_indices;
}
auto const total_row_groups = std::accumulate(
input_row_group_indices.begin(),
input_row_group_indices.end(),
size_t{0},
[](size_t sum, auto const& per_file_row_groups) { return sum + per_file_row_groups.size(); });

// Check if we have less than 2B total row groups.
CUDF_EXPECTS(total_row_groups <= std::numeric_limits<cudf::size_type>::max(),
"Total number of row groups exceed the size_type's limit");

// Converts Column chunk statistics to a table
// where min(col[i]) = columns[i*2], max(col[i])=columns[i*2+1]
Expand Down Expand Up @@ -451,29 +428,55 @@ std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::fi
// Converts AST to StatsAST with reference to min, max columns in above `stats_table`.
stats_expression_converter const stats_expr{filter.get(),
static_cast<size_type>(output_dtypes.size())};
auto stats_ast = stats_expr.get_stats_expr();
auto predicate_col = cudf::detail::compute_column(stats_table, stats_ast.get(), stream, mr);
auto predicate = predicate_col->view();
CUDF_EXPECTS(predicate.type().id() == cudf::type_id::BOOL8,
"Filter expression must return a boolean column");

// Filter stats table with StatsAST expression and collect filtered row group indices
auto const filtered_row_group_indices = collect_filtered_row_group_indices(
stats_table, stats_expr.get_stats_expr(), input_row_group_indices, stream);

// Number of surviving row groups after applying stats filter
auto const num_stats_filtered_row_groups =
filtered_row_group_indices.has_value()
? std::accumulate(filtered_row_group_indices.value().cbegin(),
filtered_row_group_indices.value().cend(),
size_type{0},
[](auto& sum, auto const& per_file_row_groups) {
return sum + per_file_row_groups.size();
})
: total_row_groups;

// Span of row groups to apply bloom filtering on.
auto const bloom_filter_input_row_groups =
filtered_row_group_indices.has_value()
? host_span<std::vector<size_type> const>(filtered_row_group_indices.value())
: input_row_group_indices;

// Apply bloom filtering on the bloom filter input row groups
auto const bloom_filtered_row_groups = apply_bloom_filters(
sources, bloom_filter_input_row_groups, output_dtypes, output_column_schemas, filter, stream);
auto const [bloom_filtered_row_groups, bloom_filters_exist] =
apply_bloom_filters(sources,
bloom_filter_input_row_groups,
num_stats_filtered_row_groups,
output_dtypes,
output_column_schemas,
filter,
stream);

// Number of surviving row groups after applying bloom filter
auto const num_bloom_filtered_row_groups =
bloom_filters_exist
? (bloom_filtered_row_groups.has_value()
? std::make_optional(std::accumulate(bloom_filtered_row_groups.value().cbegin(),
bloom_filtered_row_groups.value().cend(),
size_type{0},
[](auto& sum, auto const& per_file_row_groups) {
return sum + per_file_row_groups.size();
}))
: std::make_optional(num_stats_filtered_row_groups))
: std::nullopt;

// Return bloom filtered row group indices iff collected
return bloom_filtered_row_groups.has_value() ? bloom_filtered_row_groups
: filtered_row_group_indices;
return {
bloom_filtered_row_groups.has_value() ? bloom_filtered_row_groups : filtered_row_group_indices,
{std::make_optional(num_stats_filtered_row_groups), num_bloom_filtered_row_groups}};
}

// convert column named expression to column index reference expression
Expand Down
11 changes: 11 additions & 0 deletions cpp/src/io/parquet/reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,17 @@ table_with_metadata reader::impl::read_chunk_internal(read_mode mode)
auto out_columns = std::vector<std::unique_ptr<column>>{};
out_columns.reserve(_output_buffers.size());

// Copy number of total input row groups and number of surviving row groups from predicate
// pushdown.
out_metadata.num_input_row_groups = _file_itm_data.num_input_row_groups;
// Copy the number surviving row groups from each predicate pushdown only if the filter has value.
if (_expr_conv.get_converted_expr().has_value()) {
out_metadata.num_row_groups_after_stats_filter =
_file_itm_data.surviving_row_groups.after_stats_filter;
out_metadata.num_row_groups_after_bloom_filter =
_file_itm_data.surviving_row_groups.after_bloom_filter;
}

// no work to do (this can happen on the first pass if we have no rows to read)
if (!has_more_work()) {
// Check if number of rows per source should be included in output metadata.
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/io/parquet/reader_impl_chunking.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ struct file_intermediate_data {
// partial sum of the number of rows per data source
std::vector<std::size_t> exclusive_sum_num_rows_per_source{};

size_type num_input_row_groups{0}; // total number of input row groups across all data sources

// struct containing the number of remaining row groups after each predicate pushdown filter
surviving_row_group_metrics surviving_row_groups;

size_t _current_input_pass{0}; // current input pass index
size_t _output_chunk_count{0}; // how many output chunks we have produced

Expand Down
Loading

0 comments on commit 10c1fb4

Please sign in to comment.