Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MERGE logical plan vs execution plan schema mismatch #2104

Closed
ion-elgreco opened this issue Jan 22, 2024 · 7 comments · Fixed by #2129
Closed

MERGE logical plan vs execution plan schema mismatch #2104

ion-elgreco opened this issue Jan 22, 2024 · 7 comments · Fixed by #2129
Labels
binding/python Issues for the Python package binding/rust Issues for the Rust crate bug Something isn't working

Comments

@ion-elgreco
Copy link
Collaborator

ion-elgreco commented Jan 22, 2024

Environment

Delta-rs version: 0.15.1

Binding: python


Bug

What happened:
We do a merge on a predicate id==id and only do when_not_matched_insert_all. This works with local tests. So just this, nothing special since it just dispatches to TableMerger and then to merge execute in rust:

import polars as pl
pldf = pl.DataFrame()
  pldf.write_delta(
      dt,
      mode="merge",
      delta_merge_options=delta_merge_options,
  )
  .when_not_matched_insert_all()
  .execute()

For context the table is partitioned on year_month and code.

However, we get this weird schema mismatch even though it works in the tests:

LogicalPlan schema: DFSchema { fields: [
 DFField { qualifier: Some(Bare { table: "t" }), field: Field { name: "col1", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} } },
 DFField { qualifier: Some(Bare { table: "t" }), field: Field { name: "col2", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} } }, 
 DFField { qualifier: Some(Bare { table: "t" }), field: Field { name: "date", data_type: Timestamp(Microsecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} } }, 
 DFField { qualifier: Some(Bare { table: "t" }), field: Field { name: "text1", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} } }, 
 DFField { qualifier: Some(Bare { table: "t" }), field: Field { name: "text2", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} } }, 
 DFField { qualifier: Some(Bare { table: "t" }), field: Field { name: "_commit_version", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} } }, 
 DFField { qualifier: Some(Bare { table: "t" }), field: Field { name: "m_rec_inserted_utc", data_type: Timestamp(Microsecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} } }, 
 DFField { qualifier: Some(Bare { table: "t" }), field: Field { name: "year_month", data_type: Dictionary(UInt16, Utf8), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} } }, 
 DFField { qualifier: Some(Bare { table: "t" }), field: Field { name: "code", data_type: Dictionary(UInt16, Utf8), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} } }, 
 DFField { qualifier: Some(Bare { table: "t" }), field: Field { name: "__delta_rs_path", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} } }]

 ExecutionPlan schema: Schema { fields: [
 Field { name: "col1", data_type: LargeUtf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, 
 Field { name: "col2", data_type: LargeUtf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, 
 Field { name: "date", data_type: Timestamp(Microsecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, 
 Field { name: "text1", data_type: LargeUtf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, 
 Field { name: "text2", data_type: LargeUtf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, 
 Field { name: "_commit_version", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, 
 Field { name: "m_rec_inserted_utc", data_type: Timestamp(Microsecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, 
 Field { name: "code", data_type: Dictionary(UInt16, Utf8), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, 
 Field { name: "year_month", data_type: Dictionary(UInt16, Utf8), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, 
 Field { name: "__delta_rs_path", data_type: Dictionary(UInt16, Utf8), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }

Maybe the main culprit here is, the the delta_rs_path in the logical plan is Utf8 while in the execution plan is a dictionary. @Blajda

Otherwise the LargeUtf8 vs Utf8 is still an issue? But seems less likely since it worked before

@ion-elgreco ion-elgreco added the bug Something isn't working label Jan 22, 2024
@Blajda
Copy link
Collaborator

Blajda commented Jan 22, 2024

Does the error in this case happen consistently? Would be nice to have the exact stack trace / error.
Since this data is partitioned on two columns another possibility is the the distinct partition value scan might have an issue.

@ion-elgreco
Copy link
Collaborator Author

ion-elgreco commented Jan 23, 2024

@Blajda yeah happens consistently, I'll try to see if I find some time to reproduce it with an MRE

I can try giving you the full stack trace but I have to rename a bunch of stuff since it's confidential, I'll get back to on that tomorrow:)

@sylvanayelda
Copy link

I am seeing the exact same behavior. My table is partitioned on two columns. When I change it to be partitioned by a single column, the error no longer occurs.

@ion-elgreco
Copy link
Collaborator Author

@sylvanayelda which version do you see the issue? Could you try it on 0.14.0 as well

@sylvanayelda
Copy link

@ion-elgreco I was using version 0.15.1. But with 0.14.0, I see the same issue (although the __delta_rs_path is no longer listed in either the LogicalPlan or ExecutionPlan schemas).

@ion-elgreco ion-elgreco added binding/python Issues for the Python package binding/rust Issues for the Rust crate labels Jan 25, 2024
@Blajda
Copy link
Collaborator

Blajda commented Jan 26, 2024

@sylvanayelda Are you merging using the polars interface?
Please also provide the schema of the table you are using.

@sylvanayelda
Copy link

sylvanayelda commented Jan 26, 2024

@Blajda I'm not using polars. Here is a sample of my code:

from deltalake import DeltaTable, write_deltalake
import pyarrow as pa

# schema is a pyarrow schema
source_table = pa.Table.from_pylist(records, schema=schema)
target_table = DeltaTable(table_path, storage_options=storage_opts)
(
          target_table.merge(
              source=source_table,
              source_alias=SOURCE_ALIAS,
              target_alias=TARGET_ALIAS,
              predicate=get_predicate(),
              large_dtypes=False,
          )
          .when_not_matched_insert(updates=get_inserts(schema.names))
          .execute()
        )

The table has the following schema:

Schema(
    [
        Field(partition_col1, PrimitiveType("string"), nullable=True),
        Field(col2, PrimitiveType("string"), nullable=True),
        Field(col3, PrimitiveType("string"), nullable=True),
        Field(col4, PrimitiveType("long"), nullable=True),
        Field(col5, PrimitiveType("long"), nullable=True),
        Field(col6, PrimitiveType("string"), nullable=True),
        Field(col7, PrimitiveType("string"), nullable=True),
        Field(col8, PrimitiveType("long"), nullable=True),
        Field(col9, PrimitiveType("long"), nullable=True),
        Field(col10, PrimitiveType("long"), nullable=True),
        Field(col11, PrimitiveType("long"), nullable=True),
        Field(
            col12,
            ArrayType(PrimitiveType("long"), contains_null=True),
            nullable=True,
        ),
        Field(
            col13,
            ArrayType(PrimitiveType("long"), contains_null=True),
            nullable=True,
        ),
        Field(partition_date, PrimitiveType("string"), nullable=True),
    ]
)

I should point out that we are also storing the data in ADLS2. Could that be causing any issues here?

Blajda added a commit that referenced this issue Jan 27, 2024
# Description
When using logical plans with DF, the order & location of partitioned
columns did not match with physical schema. This would cause errors when
logical relations were converted to physical relations.

# Related Issue(s)
- closes #2104
RobinLin666 pushed a commit to RobinLin666/delta-rs that referenced this issue Feb 2, 2024
# Description
When using logical plans with DF, the order & location of partitioned
columns did not match with physical schema. This would cause errors when
logical relations were converted to physical relations.

# Related Issue(s)
- closes delta-io#2104
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
binding/python Issues for the Python package binding/rust Issues for the Rust crate bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants