Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support statistics pruning for formats other than parquet #380

Closed
wants to merge 6 commits into from

Conversation

alamb
Copy link
Contributor

@alamb alamb commented May 21, 2021

Closes #363

Edit: Note: There is an alternate API in #426

Rationale

As explained on #363 the high level goal is to make the parquet row group pruning logic generic to any types of min/max statistics (not just parquet metadata)

Changes:

  1. Introduce a new PruningStatistics trait
  2. Refactor PruningPredicateBuilder to be generic in terms of PruningStatistics
  3. Add documentation and tests

Example

Here is a brief snippet of one of the tests that shows the new API:

        // Prune using s2 > 5
        let expr = col("s2").gt(lit(5));

        // s2 [0, 5] ==> no rows should pass
        let stats1 = TestStatistics::new()
            .with("s1", MinMax::new(None, None))
            .with("s2", MinMax::new(Some(0i32.into()), Some(5i32.into())));

        // s2 [4, 6] ==> some rows could pass
        let stats2 = TestStatistics::new()
            .with("s1", MinMax::new(None, None))
            .with("s2", MinMax::new(Some(4i32.into()), Some(6i32.into())));

        let p = PruningPredicate::try_new(&expr, schema).unwrap();
        let result = p.prune(&[stats1, stats2]).unwrap();

        // false means no rows could possibly match (can prune)
        // true means some rows might match (can not prune)
        let expected = vec![false, true];

        assert_eq!(expected, result);

Sequence:

I am trying to do this in a few small PRs to reduce review burden; Here is how connect together:

Planned changes:

/// Note this function takes a slice of statistics as a parameter
/// to amortize the cost of the evaluation of the predicate
/// against a single record batch.
pub fn prune<S: PruningStatistics>(&self, statistics: &[S]) -> Result<Vec<bool>> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the heart of the change -- rather than building up Arrays directly from ParquetMetadata, this PR now builds the arrays up from ScalarValues provided by the PruningStatistics trait.

I also tried to improve the comments to make it easier to follow what is going on

// here the first max value is None and not the Some(10) value which was actually set
// because the min value is None
assert_eq!(int32_vec, vec![None, Some(20), Some(30)]);
fn test_build_statistics_record_batch() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this shows how creating the statistics record batch works

@@ -748,4 +821,40 @@ mod tests {

Ok(())
}

#[test]
fn prune_api() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shows end-to-end how to use the prune API (what I want to be able to do in IOx)

@@ -283,6 +283,155 @@ impl ScalarValue {
self.to_array_of_size(1)
}

/// Converts an iterator of references [`ScalarValue`] into an [`ArrayRef`]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find any other way to take a bunch of ScalarValues and turn them back into an Array. Perhaps I missed something

@codecov-commenter
Copy link

Codecov Report

Merging #380 (159f1da) into master (db4f098) will increase coverage by 0.04%.
The diff coverage is 83.62%.

❗ Current head 159f1da differs from pull request most recent head 896d261. Consider uploading reports for the commit 896d261 to get more accurate results
Impacted file tree graph

@@            Coverage Diff             @@
##           master     #380      +/-   ##
==========================================
+ Coverage   74.94%   74.98%   +0.04%     
==========================================
  Files         146      146              
  Lines       24314    24448     +134     
==========================================
+ Hits        18221    18332     +111     
- Misses       6093     6116      +23     
Impacted Files Coverage Δ
datafusion/src/scalar.rs 60.61% <74.33%> (+5.13%) ⬆️
datafusion/src/physical_plan/parquet.rs 81.96% <86.66%> (+0.69%) ⬆️
datafusion/src/physical_optimizer/pruning.rs 90.61% <90.29%> (ø)
datafusion/src/physical_optimizer/repartition.rs 96.92% <100.00%> (+0.09%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update db4f098...896d261. Read the comment docs.

@Dandandan
Copy link
Contributor

Dandandan commented May 21, 2021

I think this is really cool, I think it would be also great to have this for in-memory tables.

@alamb
Copy link
Contributor Author

alamb commented May 21, 2021

I think this is really cool, I think it would be also great to have this for in-memory tables.

I agree -- I think all that is needed is to calculate the min/max statistics for each partition (or maybe even record batch) though we might have to be careful not to slow down queries where it wouldn't help. Maybe it could be opt in. Or perhaps we could compute the statistics "on demand" (after we have created a PruningPredicate)

Copy link
Contributor

@NGA-TRAN NGA-TRAN left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

datafusion/src/physical_optimizer/pruning.rs Outdated Show resolved Hide resolved
datafusion/src/physical_optimizer/pruning.rs Outdated Show resolved Hide resolved
datafusion/src/physical_optimizer/pruning.rs Outdated Show resolved Hide resolved
/// ```text
/// s1_min | s2_maxx
/// -------+--------
/// 5 | 10
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean s2_max = 1000?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, yes -- that is a great catch

datafusion/src/physical_optimizer/pruning.rs Outdated Show resolved Hide resolved
datafusion/src/physical_optimizer/pruning.rs Outdated Show resolved Hide resolved
datafusion/src/physical_optimizer/pruning.rs Outdated Show resolved Hide resolved
datafusion/src/physical_optimizer/pruning.rs Outdated Show resolved Hide resolved

let p = PruningPredicate::try_new(&expr, schema).unwrap();
let result = p.prune(&[stats1, stats2, stats3, stats4]).unwrap();
let expected = vec![false, true, true, true];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

datafusion/src/scalar.rs Outdated Show resolved Hide resolved
@Dandandan
Copy link
Contributor

I think this is really cool, I think it would be also great to have this for in-memory tables.

I agree -- I think all that is needed is to calculate the min/max statistics for each partition (or maybe even record batch) though we might have to be careful not to slow down queries where it wouldn't help. Maybe it could be opt in. Or perhaps we could compute the statistics "on demand" (after we have created a PruningPredicate)

Yes, I think we might want also look into support something like analyze ... besides having an option when loading. Together with sorting of data that could become very interesting I think for in-memory analytics.

@alamb alamb force-pushed the alamb/generic_pruning_input branch from 896d261 to 6b1a261 Compare May 24, 2021 13:35
@alamb alamb marked this pull request as ready for review May 24, 2021 16:28
@alamb
Copy link
Contributor Author

alamb commented May 24, 2021

@Dandandan I think this PR is now ready for review.

/// Interface to pass statistics information to [`PruningPredicates`]
pub trait PruningStatistics {
/// return the minimum value for the named column, if known
fn min_value(&self, column: &str) -> Option<ScalarValue>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One concern I have (in general with current state of DataFusion) is that we use ScalarValue a lot in code which can detrimental to performance in some cases (compared to a typed array).
Also in this case, if we would have a large dataset with 1000s of statistics values, calculating statistics might be slower than it could be when it is stored in contiguous memory.
Just a thought - not something we should block this PR for.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what would be a better alternative in the long run? generic?

Copy link
Contributor

@Dandandan Dandandan May 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the longer run I would say we should start pushing more towards typed contiguous arrays (Arrays or Vec), indeed using generics. For example, here the min and max values per group could be stored in two arrays of corresponding types which would be faster and uses less memory.

Vectorized processing is what we try to use Arrow for already, so I would say it is good to try to use it in more places where it makes sense.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I very much agree with you, @Dandandan . I have a PR on arrow2 exactly on this on arrow2. jorgecarleitao/arrow2#56

I am waiting for the experimental repos to be available so that we can discuss it further in apache/*

Copy link
Contributor Author

@alamb alamb May 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I can take a shot at trying to do this.

@Dandandan are you thinking something like:

pub trait PruningStatistics {
    fn min_values(&self, column: &str) -> Option<ArrayRef>;
    fn max_values(&self, column: &str) -> Option<ArrayRef>;
} 

Which would be constrained to return Arrays the same number of rows?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb yes - something along those lines.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll give it a go and see what it looks like

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is what the alternate API looks like #426

I would say it is now harder to implement the PruningStatistics API in the parquet reader, but the implementation in pruning.rs is simpler

@alamb
Copy link
Contributor Author

alamb commented May 26, 2021

#426 appears to be the more populate option; Closing in favor of that one

@alamb alamb closed this May 26, 2021
@alamb alamb deleted the alamb/generic_pruning_input branch October 6, 2022 18:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Reusable "row group pruning" logic
6 participants