Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataFusion filter on partition column doesn't work. (when the phsical schema ordering is different to logical one) #2494

Closed
Veiasai opened this issue May 9, 2024 · 5 comments · Fixed by #2614
Assignees
Labels
binding/rust Issues for the Rust crate bug Something isn't working

Comments

@Veiasai
Copy link

Veiasai commented May 9, 2024

Environment

Linux, Rust
Delta-rs version:
0.17.3

Binding:

Environment:

  • Cloud provider:
  • OS:
  • Other:

Bug

What happened:
The filter expr didn't return expected rows. My table is relatively big so I tried to construct a minimal test to reproduce it, see below code.
Besides, from what I see in the log, my guess is:

  1. delta scan is good, it successfully prune irrelated files.
  2. https://github.com/delta-io/delta-rs/pull/1071/files#diff-f3a4847c9506848f6f5bf021b5f10fb24602373580e58739bd2a2a24f9878e77R438 we use InExact filter push down, so datafusion apply the same filter again, but however, the physical plan gets wrong column index.
  3. I am not an expert on datafusion or delta-rs.. so I stop here... thank you in advance for any help...

What you expected to happen:

How to reproduce it:

I wrote a unit test to check it, but it seems like I don't have permission to push it?

    #[tokio::test]
    async fn delta_scan_mixed_partition_order_and_filter() {
        let schema = Arc::new(ArrowSchema::new(vec![
            Field::new("modified", DataType::Utf8, true),
            Field::new("id", DataType::Utf8, true),
            Field::new("value", DataType::Int32, true),
        ]));

        let table = crate::DeltaOps::new_in_memory()
            .create()
            .with_columns(get_delta_schema().fields().clone())
            .with_partition_columns(["modified", "id"])
            .await
            .unwrap();
        assert_eq!(table.version(), 0);

        let batch = RecordBatch::try_new(
            schema.clone(),
            vec![
                Arc::new(arrow::array::StringArray::from(vec![
                    "2021-02-01",
                ])),
                Arc::new(arrow::array::StringArray::from(vec!["A"])),
                Arc::new(arrow::array::Int32Array::from(vec![1])),
            ],
        )
        .unwrap();
        // write some data
        let table = crate::DeltaOps(table)
            .write(vec![batch.clone()])
            .with_save_mode(crate::protocol::SaveMode::Append)
            .await
            .unwrap();

        let provider = Arc::new(table);
        let ctx = SessionContext::new();
        let df = ctx.read_table(provider).unwrap();

        let actual = df.clone().collect().await.unwrap();
        let expected = vec![
            "+-------+------------+----+",
            "| value | modified   | id |",
            "+-------+------------+----+",
            "| 1     | 2021-02-01 | A  |",
            "+-------+------------+----+",
        ];
        assert_batches_sorted_eq!(&expected, &actual);

        let actual = df.clone().filter(col("value").eq(lit(1))).unwrap().collect().await.unwrap();
        assert_batches_sorted_eq!(&expected, &actual);

        let actual = df.clone().filter(col("id").eq(lit("A"))).unwrap().collect().await.unwrap();
        assert_batches_sorted_eq!(&expected, &actual);
    }

More details:

expected:

[
    "+-------+------------+----+",
    "| value | modified   | id |",
    "+-------+------------+----+",
    "| 1     | 2021-02-01 | A  |",
    "+-------+------------+----+",
]
actual:

[
    "++",
    "++",
]


  left: ["+-------+------------+----+", "| value | modified   | id |", "+-------+------------+----+", "| 1     | 2021-02-01 | A  |", "+-------+------------+----+"]
 right: ["++", "++"]
@Veiasai Veiasai added the bug Something isn't working label May 9, 2024
@Veiasai
Copy link
Author

Veiasai commented May 9, 2024

One more suggestion:
Actually, we are able to return dynamic filter push down flag?

pub enum TableProviderFilterPushDown {
    /// The expression cannot be used by the provider.
    Unsupported,
    /// The expression can be used to reduce the data retrieved,
    /// but the provider cannot guarantee it will omit all tuples that
    /// may be filtered. In this case, DataFusion will apply an additional
    /// `Filter` operation after the scan to ensure all rows are filtered correctly.
    Inexact,
    /// The provider **guarantees** that it will omit **all** tuples that are
    /// filtered by the filter expression. This is the fastest option, if available
    /// as DataFusion will not apply additional filtering.
    Exact,
}

when the expr only includes partition columns, we should return Exact.

@rtyler rtyler added the binding/rust Issues for the Rust crate label May 9, 2024
@rtyler
Copy link
Member

rtyler commented May 9, 2024

Thanks for taking the time to write a test @Veiasai ! I'll take a look at this shortly

@rtyler rtyler self-assigned this May 9, 2024
@Veiasai
Copy link
Author

Veiasai commented May 15, 2024

hey, any updates?

@aditanase
Copy link
Contributor

aditanase commented Jun 18, 2024

@rtyler I have a local fix for this issue - I am not sure on what the delta protocol dictates, but in some of our test tables the partitioning columns would appear in a different order in the json schema and in the partition columns array.

_arrow_schema uses an iterator + chain + 2 filters on the schema, while the rest of the code (e.g. DeltaScanBuilder.build) will filter them out, then append them explicitly in the order dictated by partition_columns.

This is the essence of my fix

fn _arrow_schema(snapshot: &Snapshot, wrap_partitions: bool) -> DeltaResult<ArrowSchemaRef> {
    let meta = snapshot.metadata();

    let schema = meta.schema()?;
    let fields = schema
        .fields()
        .filter(|f| !meta.partition_columns.contains(&f.name().to_string()))
        .map(|f| f.try_into())
        .chain(
            // keep consistent order of partitioning columns
            meta.partition_columns.iter().map(|partition_col| {
                let f = schema.field(partition_col).unwrap();
                let field = Field::try_from(f)?;
                // ...

LMK if this is enough as a pointer or I should send a PR with this.

@aditanase
Copy link
Contributor

@rtyler I've sent a PR just in case #2614

Would be glad to add some tests if you point me at the correct suite or an example, I was looking for a test with more than one partitioning column and didn't find anything.

aditanase added a commit to aditanase/delta-rs that referenced this issue Jun 27, 2024
@rtyler rtyler closed this as completed in f432c4f Jul 11, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
binding/rust Issues for the Rust crate bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants