Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Reduce cast.rs and utils.rs logic from parquet_support.rs for experimental native scans #1387

Open
wants to merge 10 commits into
base: main
Choose a base branch
from

Conversation

mbutrovich
Copy link
Contributor

@mbutrovich mbutrovich commented Feb 11, 2025

Which issue does this PR close?

Closes #.

Rationale for this change

Reduces number of test failures in native_datafusion and native_iceberg_compat modes:

  • native_datafusion: succeeded 651, failed 27, canceled 2, ignored 54, pending 0
  • native_iceberg_compat: succeeded 628, failed 50, canceled 2, ignored 54, pending 0

What changes are included in this PR?

After staring at conversions and hoisting code from arrow-rs to debug, it turns out we can just use arrow's cast in this case.

How are these changes tested?

Ran make test-jvm for native_datafusion and native_iceberg_compat modes.

@mbutrovich
Copy link
Contributor Author

utils.rs's array_with_timezone mentions it doesn't support converting to NTZ, but we were calling it. I am gonna try an assertion in there and see if that breaks anything.

@@ -596,7 +595,10 @@ fn cast_array(
parquet_options: &SparkParquetOptions,
) -> DataFusionResult<ArrayRef> {
use DataType::*;
let array = array_with_timezone(array, parquet_options.timezone.clone(), Some(to_type))?;
Copy link
Contributor Author

@mbutrovich mbutrovich Feb 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was causing one of the duplicate time zone changes.

@codecov-commenter
Copy link

codecov-commenter commented Feb 11, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 39.06%. Comparing base (f09f8af) to head (79af331).
Report is 26 commits behind head on main.

Additional details and impacted files
@@              Coverage Diff              @@
##               main    #1387       +/-   ##
=============================================
- Coverage     56.12%   39.06%   -17.06%     
- Complexity      976     2071     +1095     
=============================================
  Files           119      263      +144     
  Lines         11743    60746    +49003     
  Branches       2251    12909    +10658     
=============================================
+ Hits           6591    23733    +17142     
- Misses         4012    32530    +28518     
- Partials       1140     4483     +3343     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@@ -596,7 +595,10 @@ fn cast_array(
parquet_options: &SparkParquetOptions,
) -> DataFusionResult<ArrayRef> {
use DataType::*;
let array = array_with_timezone(array, parquet_options.timezone.clone(), Some(to_type))?;
let array = match to_type {
Timestamp(_, None) => array, // array_with_timezone does not support to_type of NTZ.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will skip all the matches inside array_with_timezone

    match array.data_type() {
        DataType::Timestamp(_, None) => {
            ...
            match to_type {
                ...
                Some(DataType::Timestamp(_, None)) => {
                    timestamp_ntz_to_timestamp(array, timezone.as_str(), None)
                }
                ...

Is it intentional?

Copy link
Contributor Author

@mbutrovich mbutrovich Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it shouldn't be called with that type:

// | Any other type | Not Supported |

The condition that matches inside array_with_timezone was added by #1348...

timestamp_ntz_to_timestamp(array, timezone.as_str(), None)

...and we likely didn't understand the blast radius at the time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact maybe I'll remove that in this PR if nothing breaks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

utils.rs timezone functions are intended for use by cast and specifically convert between Spark representation and Arrow representation. Changing them is likely to break some timezone related unit tests.

Copy link
Contributor

@kazuyukitanimura kazuyukitanimura Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or what about updating the case match in array_with_timezone rather than fixing at this location?

Copy link
Contributor Author

@mbutrovich mbutrovich Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep I updated array_with_timezone.

let array = array_with_timezone(array, parquet_options.timezone.clone(), Some(to_type))?;
let array = match to_type {
Timestamp(_, None) => array, // array_with_timezone does not support to_type of NTZ.
_ => array_with_timezone(array, parquet_options.timezone.clone(), Some(to_type))?,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you log an issue to review the use of array_with_timezone ? The method was written to cast timestamps provided by Spark while here we are casting timestamps provided by Parquet and there might be subtle cases where the behavior is different. As long as the unit tests pass, I believe we are safe for all use cases, but it would be a good idea to review.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm slowly going through all of the behavior that we hoisted from cast.rs to parquet_support.rs, but yep I can file an issue.

@mbutrovich mbutrovich changed the title fix: Reduce timestamp issues in native_datafusion and native_icerberg_compat Parquet modes fix: Reduce timestamp issues in native_datafusion and native_iceberg_compat Parquet modes Feb 12, 2025
@mbutrovich mbutrovich changed the title fix: Reduce timestamp issues in native_datafusion and native_iceberg_compat Parquet modes fix: Remove cast.rs logic from parquet_support.rs for experimental native readers Feb 12, 2025
@mbutrovich
Copy link
Contributor Author

For context: when we were working in the experimental branch and added the SchemaAdapter, we were calling into cast.rs for type conversions. However, we started making changes in cast.rs to get more tests to pass for the experimental native scans. Not wanting to commingle logic and accidentally break cast behavior, we duplicated cast.rs and created parquet_support.rs to start making changes in. I now realize that duplicating cast.rs might have been a mistake, and have removed most of that logic from parquet_support.rs. This PR now defaults to DataFusion's cast logic unless we need specialized logic as in cast_struct_to_struct.

This change does not introduce any new failures for the experimental native scans, but that doesn't mean this approach is bug free. @andygrove is working on some fuzz testing that will help us have more confidence in the experimental native scans in the future. I propose we move forward with this design, and bring over (or call into) cast.rs logic as we discover issues with this design.

@mbutrovich mbutrovich changed the title fix: Remove cast.rs logic from parquet_support.rs for experimental native readers fix: Reduce cast.rs logic from parquet_support.rs for experimental native readers Feb 12, 2025
@mbutrovich
Copy link
Contributor Author

Updated the test failures in the original description. native_iceberg_compat really does not like this change.

@@ -72,9 +72,6 @@ pub fn array_with_timezone(
Some(DataType::Timestamp(_, Some(_))) => {
timestamp_ntz_to_timestamp(array, timezone.as_str(), Some(timezone.as_str()))
}
Some(DataType::Timestamp(_, None)) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iirc this was introduced to fix cast issues caught by the fuzz tests which are not part of the ci. Can we run the fuzz tests again to verify that there are no regressions.
Ideally, parquet_support should not depend on any functions in utils.rs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be surprised if it came in for fuzz tests since the blame has the line coming in with the experimental native scans PR. AFAIK, parquet_support no longer depends on utils.rs after this PR. I was also just removing this errant line.

matches!(to_type, DataType::Utf8)
}
_ => false,
_ => Ok(cast_with_options(&array, to_type, &PARQUET_OPTIONS)?),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@parthchandra
Copy link
Contributor

I had the impression that this PR had originally reduced the number of failures in native_iceberg_compat as well but that is no longer true after the cleanup. Is that correct?

@parthchandra
Copy link
Contributor

I applied this PR to my working branch : https://github.com/parthchandra/datafusion-comet/commits/complex_test_failures_with_PR1387/
The branch contains all the changes in #1376 and some additional fixes I have been working on. The results are:
native_datafusion: *** 18 TESTS FAILED ***
native_iceberg_compat: *** 27 TESTS FAILED ***

The differences are due to 5 tests related to

  1. runs of - fix: native Unsafe row accessors return incorrect results which seems to fallback to Spark with native_datafusion (and so always succeeds).
  2. 6 tests related to int96 fail in native_iceberg_compat. The failure might be related to the Spark configuration - PARQUET_INT96_TIMESTAMP_CONVERSION i.e "spark.sql.parquet.int96TimestampConversion"
    Investigating this, but it is unlikely to be related to this PR.

So it looks like this PR does not have a negative impact on native_iceberg_compat

@mbutrovich
Copy link
Contributor Author

I had the impression that this PR had originally reduced the number of failures in native_iceberg_compat as well but that is no longer true after the cleanup. Is that correct?

Initially that was the case, then I got more aggressive with pruning parquet_support.

@mbutrovich
Copy link
Contributor Author

mbutrovich commented Feb 13, 2025

So it looks like this PR does not have a negative impact on native_iceberg_compat

Well that's good news!

@parthchandra
Copy link
Contributor

More info -
with native_iceberg_compat:

from_type: Timestamp(Nanosecond, None)  to_type: Timestamp(Microsecond, Some("America/Los_Angeles"))
array: PrimitiveArray<Timestamp(Nanosecond, None)>
[
  2020-01-01T09:02:03.123456,
  2020-01-01T09:02:03.123456,
  2020-01-01T09:02:03.123456,
  2020-01-01T09:02:03.123456,
  2020-01-01T09:02:03.123456,
  2020-01-01T09:02:03.123456,
  2020-01-01T09:02:03.123456,
  2020-01-01T09:02:03.123456,
  2020-01-01T09:02:03.123456,
  2020-01-01T09:02:03.123456,
], 
casted: Ok(PrimitiveArray<Timestamp(Microsecond, Some("America/Los_Angeles"))>
[
  2020-01-01T09:02:03.123456-08:00,
  2020-01-01T09:02:03.123456-08:00,
  2020-01-01T09:02:03.123456-08:00,
  2020-01-01T09:02:03.123456-08:00,
  2020-01-01T09:02:03.123456-08:00,
  2020-01-01T09:02:03.123456-08:00,
  2020-01-01T09:02:03.123456-08:00,
  2020-01-01T09:02:03.123456-08:00,
  2020-01-01T09:02:03.123456-08:00,
  2020-01-01T09:02:03.123456-08:00,
])

with native_datafusion:
from_type: Timestamp(Nanosecond, None) to_type: Timestamp(Microsecond, Some("UTC"))
array: PrimitiveArray<Timestamp(Nanosecond, None)>
[
2020-01-01T09:02:03.123456,
2020-01-01T09:02:03.123456,
2020-01-01T09:02:03.123456,
2020-01-01T09:02:03.123456,
2020-01-01T09:02:03.123456,
2020-01-01T09:02:03.123456,
2020-01-01T09:02:03.123456,
2020-01-01T09:02:03.123456,
2020-01-01T09:02:03.123456,
2020-01-01T09:02:03.123456,
],
casted: Ok(PrimitiveArray<Timestamp(Microsecond, Some("UTC"))>
[
2020-01-01T09:02:03.123456+00:00,
2020-01-01T09:02:03.123456+00:00,
2020-01-01T09:02:03.123456+00:00,
2020-01-01T09:02:03.123456+00:00,
2020-01-01T09:02:03.123456+00:00,
2020-01-01T09:02:03.123456+00:00,
2020-01-01T09:02:03.123456+00:00,
2020-01-01T09:02:03.123456+00:00,
2020-01-01T09:02:03.123456+00:00,
2020-01-01T09:02:03.123456+00:00,
])

So the difference in behavior is because of a different timezone being applied. Both are using session timezone, but one of them is wrong. Either way, we cannot pin the failures on this PR.

@mbutrovich mbutrovich changed the title fix: Reduce cast.rs logic from parquet_support.rs for experimental native readers fix: Reduce cast.rs and utils.rs logic from parquet_support.rs for experimental native readers Feb 13, 2025
@mbutrovich mbutrovich changed the title fix: Reduce cast.rs and utils.rs logic from parquet_support.rs for experimental native readers fix: Reduce cast.rs and utils.rs logic from parquet_support.rs for experimental native scans Feb 13, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants