Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle merging of evolved schemas in ParquetExec #1622

Merged
merged 14 commits into from
Jan 23, 2022

Conversation

thinkharderdev
Copy link
Contributor

Which issue does this PR close?

Closes #132

Rationale for this change

Currently, it is assumed that all parquet files in a listing scan will have the same schema. This will allow support for schema evolution in the underlying storage layer by merging parquet schemas on read.

What changes are included in this PR?

There are three parts:

  1. Modify ParquetFormat to merge schemas for all listed parquet files using the underlying Schema::try_merge method in arrow-rs
  2. When reading individual parquet files, map projected column indexes from the merged schema to the file schema.
  3. "Backfill" any missing columns from the merged schema with null-valued columns.

Are there any user-facing changes?

Scans that contain heterogenous schemas will attempt to merge the schemas. If that error was expected then the observed behavior will be different.

No

@github-actions github-actions bot added the datafusion Changes in the datafusion crate label Jan 20, 2022
@alamb
Copy link
Contributor

alamb commented Jan 20, 2022

Thanks @thinkharderdev ! I am hoping to review this tomorrow

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you so much @thinkharderdev -- this is a wonderful first contribution.

I think other than some additional testing, this is basically ready to go. I left comments about what cases I think should be covered and a suggestion of how to do so (and avoid having to mess with parquet-data)

@@ -473,6 +536,69 @@ mod tests {
schema::types::SchemaDescPtr,
};

#[tokio::test]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! This is a great start on testing

I was thinking there is no reason to use checked in parquet files for this test -- we can create files as part of the test. I went ahead and coded this up as coralogix#1 as the scaffolding was a bit annoying.

With that code I suggest we test:

  1. columns in different orders in different files (e.g. one file has (a, b, c) columns and one has (b, a)
  2. projection (which I do see is covered here)
  3. Column with the same name and different types
  4. Columns with different subsets of the data (a, b) and (b, c) for example

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On point 1, I actually noticed this morning that my implementation would fail in the case where the columns are in different orders but all projected columns are present in the file. The easiest way to fix that would be to remove the condition on re-mapping the columns in the output batch (so do that mapping in all cases).

Are we concerned about the runtime cost of that operation and try to avoid it if unnecessary? I'm relatively new to Rust so not sure how expensive cloning Arc is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm relatively new to Rust so not sure how expensive cloning Arc is.

Cloneing an Arc is very fast (it increments an atomic counter) 🚤

datafusion/src/physical_plan/file_format/parquet.rs Outdated Show resolved Hide resolved
@thinkharderdev
Copy link
Contributor Author

Thank you so much @thinkharderdev -- this is a wonderful first contribution.

I think other than some additional testing, this is basically ready to go. I left comments about what cases I think should be covered and a suggestion of how to do so (and avoid having to mess with parquet-data)

Thanks! I'll add the additional test coverage.

Add round trip parquet testing
@@ -385,9 +388,33 @@ fn build_row_group_predicate(
}
}

// Map projections from the schema which merges all file schemas to projections on a particular
// file
fn map_projections(
Copy link
Contributor

@tustvold tustvold Jan 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this logic and the logic in read_partition might be extracted into some of SchemaAdapter, akin to PartitionColumnProjector. This would allow the logic to be reused with other file formats, e.g. JSON or CSV, whilst also allowing testing it in isolation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is a good idea. If not in this PR I'll file a ticket to do it in a follow on

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1. Add additional test cases

2. Map projected column indexes in all cases

3. Raise an explicit error in the case where there a conflict between a
   file schema and the merged schema.
use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use arrow::array::{
BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
TimestampNanosecondArray,
};
use futures::StreamExt;

#[tokio::test]
async fn test_merge_schema() -> Result<()> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb Not sure we need a test case here since it gets tested implicitly in the ParquetExec tests? If you'd rather have an explicit test case here, then I can use the same utility from those test cases over here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think coverage at the ParquetExec level is more than adequate.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking really close @thinkharderdev -- thank you so much. I think the test for incompatible schema needs to be fixed but then this is ready to merge

datafusion/src/physical_plan/file_format/parquet.rs Outdated Show resolved Hide resolved

// read/write them files:
let read =
round_trip_to_parquet(vec![batch1, batch2], Some(vec![0, 3]), None).await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using the projection index 3 is a good 👍 (as it requires projecting on the merged schema)

}

#[tokio::test]
async fn evolved_schema_incompatible_types() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test name implies it is for incompatible types, but then merges two files with compatible types.

Perhaps you could switch one of the column types and then assert a panic like

Suggested change
async fn evolved_schema_incompatible_types() {
#[should_panic(expected = "incorrect types")]
async fn evolved_schema_incompatible_types() {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is what I tried to do originally but the issue is that the panic is on the reader thread. In effect it basically just causes the read on that partition to get dropped. We could test for a panic by calling read_partition directly from the test if you'd rather do it that way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I think this is good enough for now -- I'll file a ticket to improve the behavior (error) on schema mismatch

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed #1651

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in #1837

thinkharderdev and others added 4 commits January 22, 2022 09:56
…ser error rather than something to investigate) Using a DataFusionError rather than one from Parquet (the rationale being that this error comes from DataFusion, and is not related to being able to read the parquet files)

Co-authored-by: Andrew Lamb <[email protected]>
@alamb
Copy link
Contributor

alamb commented Jan 23, 2022

Thanks @thinkharderdev !

@thinkharderdev
Copy link
Contributor Author

Thanks @thinkharderdev !

Thanks you for your help!

@alamb
Copy link
Contributor

alamb commented Jan 24, 2022

FYI @thinkharderdev this also fixed @capkurmagati 's issue in #1527 🎉

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
datafusion Changes in the datafusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add support for Parquet schema merging
4 participants