Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merging to a table with multiple distinct partitions in parallel fails #2227

Closed
ldacey opened this issue Feb 27, 2024 · 3 comments · Fixed by #2291
Closed

Merging to a table with multiple distinct partitions in parallel fails #2227

ldacey opened this issue Feb 27, 2024 · 3 comments · Fixed by #2291
Labels
bug Something isn't working

Comments

@ldacey
Copy link
Contributor

ldacey commented Feb 27, 2024

Environment

Delta-rs version:
0.15.3
Environment: GKE

  • Cloud provider: Google

Bug

What happened:

I don't have a good MRE, but errors are raised if multiple partitions are merged to in parallel. This does not happen if we use "overwrite" or "append" table writes, just when we .execute() a merge. For example, if I pass the unique partitions which were updated based on the transaction log to dynamic Airflow tasks (each task processes a single partition) then only one of the tasks will succeed. Other tasks will fail:

  File "/home/airflow/.local/lib/python3.11/site-packages/deltalake/table.py", line 1597, in execute
    metrics = self.table._table.merge_execute(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_internal.DeltaError: Generic DeltaTable error: Schema error: No field named s.unique_row_hash. Valid fields are

Example of an Airflow task which attempts to merge each of these mapped tasks which represent distinct partitions which should not have any conflict with each other. Right now I need to limit my schedule to only allow one partition to update at a time which slows down the pipeline a huge amount compared to using "overwrite" instead of merges.

image

What you expected to happen:

Since each partition is unique (for example, each partition is an individual date) they should be able to be written to in parallel. The error message about the column not existing is false as well. Clearing the task makes it succeed as well.

How to reproduce it:

Merge to a Delta Table using partition_filters where multiple partitions are written to in parallel.

More details:

@ldacey ldacey added the bug Something isn't working label Feb 27, 2024
@ion-elgreco
Copy link
Collaborator

Can you share the exact filter you are passing?

@ion-elgreco
Copy link
Collaborator

I am able to reproduce it with some random data, and then running two merges at the same time:

initial table:

df = pl.DataFrame({
    "foo": [*range(0,100)],
    "baz": [*range(0,1000,10)],
    "bar": [str(i) for i in range(0,10)]*10
    })

df.write_delta("test_table", 
               delta_write_options= {"partition_by":["bar"], 
                                     "configuration": {"delta.isolationLevel":"WriteSerializable"}})

writer 1

df = pl.DataFrame({
    "foo": [*range(0,10)]*1000,
    "baz": [*range(100,200,10)]*1000,
    "bar": [str(i) for i in range(0,1)]*10000
    })
df.write_delta("test_table", 
               mode='merge', 
               delta_merge_options={"source_alias":"s", "target_alias":"t", 
                                    "predicate":"s.foo = t.foo AND s.bar = t.bar AND t.bar = '0'"}
               ).when_matched_update_all().execute()

writer 2

import polars as pl
df = pl.DataFrame({
    "foo": [*range(0,10)]*1000,
    "baz": [*range(100,200,10)]*1000,
    "bar": [str(i) for i in range(1,2)]*10000
    })

df.write_delta("test_table", 
               mode='merge', 
               delta_merge_options={"source_alias":"s", "target_alias":"t", 
                                    "predicate":"s.foo = t.foo AND s.bar = t.bar AND t.bar = '1'"}
               ).when_matched_update_all().execute()

@germanmunuera
Copy link

Related bug #2084

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants