Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge very slow compared to delete + append on larger dataset #1846

Closed
ibabeo opened this issue Nov 13, 2023 · 6 comments · Fixed by #1958
Closed

merge very slow compared to delete + append on larger dataset #1846

ibabeo opened this issue Nov 13, 2023 · 6 comments · Fixed by #1958
Labels
bug Something isn't working

Comments

@ibabeo
Copy link

ibabeo commented Nov 13, 2023

Environment

Delta-rs version: 0.13

Binding: python

Environment:

  • OS: windows

Bug

What happened:
I have a dataset of shape (3066766, 18) and a part of it needs to be updated by a dataset of shape (185004, 18)
The natural way of performing the operation would be using merge().when_matched_update_all() which is super slow ( terminated after waiting for 20mins)
An alternative way I tested is to first dt.delete() then write_deltalake(mode='append'), which is very fast ( < 1s).

What you expected to happen:
Expect the performance of merge().xx() to be faster than delete then append

How to reproduce it:

import pandas as pd
import pyarrow as pa
from deltalake import DeltaTable
from deltalake.writer import write_deltalake
import tempfile

temppath = tempfile.TemporaryDirectory().name
# NYC taxi data sample
df = pd.read_parquet('https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet')
# make column names lower case due to bug in uppercase #1797 
df.columns = map(str.lower, df.columns)
# timezone timestamp issue #1777 
df = df.drop(columns=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])

# write to dt
df_temp = df.assign(calculationdate='2023-09-05')
write_deltalake(temppath, df_temp, mode='append', partition_by=["calculationdate"])

# load delta table
dt = DeltaTable(temppath)

# source table to be updated 
df_update = df_temp[df_temp.pulocationid.isin([161,  43])]


# method 1 - super slow
dt.merge(
    source=pa.Table.from_pandas(df_update),
    predicate="t.calculationdate = '2023-09-05' and t.calculationdate = s.calculationdate  and s.pulocationid = t.pulocationid",
    source_alias="s",
    target_alias="t",
).when_matched_update_all().execute()

# method 2 - alternative - fast
dt.delete(predicate = "calculationdate = '2023-09-05' and pulocationid in (161,43)")
write_deltalake(temppath, df_update.reset_index(drop=True), mode='append')

#1797 #1777

@ibabeo ibabeo added the bug Something isn't working label Nov 13, 2023
@ibabeo ibabeo changed the title merge performance issue on larger dataset merge very slow compared to delete + append on larger dataset Nov 13, 2023
@ion-elgreco
Copy link
Collaborator

This may be due to the fact a physical plan is built versus constructing a logical plan. @Blajda has a PR for this feature open.

@Blajda
Copy link
Collaborator

Blajda commented Nov 14, 2023

Yeah further enhancements will need to be done after the conversion to logical plans. Currently the operation rewrites the entire table but this can be optimized when upserts are performed.
Also if the number of records being updated is small then I imagine the cost of the join will become a major factor.

@ldacey
Copy link
Contributor

ldacey commented Nov 14, 2023

How would this ideally work in practice?

Currently, I process data from some window of time (previous hour, previous day, etc) and save it into a staging dataset.

Then I read the unique partitions which were just changed in the previous step into Polars, sort, and drop duplicates. Then I overwrite the partitions (normally daily or monthly) into my final dataset. The result is that I have the latest version of each unique row based on a unique constraint.

Using merge, I would instead just use when_matched_update_all and insert. Does this trigger the entire table to be overwritten? Some of my tables are hundreds of millions of rows and the data I am upserting would only exist in a single or few partitions.

@Blajda
Copy link
Collaborator

Blajda commented Nov 15, 2023

Using merge, I would instead just use when_matched_update_all and insert. Does this trigger the entire table to be overwritten? Some of my tables are hundreds of millions of rows and the data I am upserting would only exist in a single or few partitions.

Yes the current implementation does this and it is document on the rust side. We should add a warning for python users that current state it should not be used with large tables.

Ideally in the future merge will be able to perform partition pruning and additional pruning based on source statistics to ensure only that on files with modified/deleted records are removed.
Once those enhancements are implemented there should be no issue with using merge for your workflow.

@leodiegues
Copy link

Does #1720 solve this issue?

@Blajda
Copy link
Collaborator

Blajda commented Dec 9, 2023

No it doesn't. Additional work is still required to support efficient upserts.

ion-elgreco added a commit that referenced this issue Dec 20, 2023
# Description
This upgrades merge so that it can leverage partitions where specified
in the join predicate. There are two ways we can leverage partitions:

1. static references, i.e `target.partition = 1`.
2. Inferring from the data, i.e `source.partition = target.partition`.

In the first case, this implements the logic described in [this
comment](https://github.com/delta-io/delta-rs/blob/main/crates/deltalake-core/src/operations/merge.rs#L670).
Any predicate mentioning the source that is not covered by (2) is
pruned, which will leave predicates on just the target columns (and will
be amenable to file pruning)

In the second case, we first construct a version of the predicate with
references to source replaced with placeholders:

```sql
target.partition = source.partition and foo > 42
```

becomes:

```sql
target.partition = $1 and foo > 42
```

We then stream through the source table, gathering the distinct tuples
of the mentioned partitions:

```
| partition |
-------------
|       1   |
|       5   |
|       7   |
```

and then expand out the sql to take these into account:

```sql
(target.partition = 1 and foo > 42)
or (target.partition = 5 and foo > 42)
or (target.partition = 7 and foo > 42)
```
And insert this filter into the target chain. We also use the same
filter to process the file list, meaning we only make remove actions for
files that will be targeted by the scan.

I considered whether it would be possible to do this via datafusion sql
in a generic manner, for example by first joining against the distinct
partitions. I don't think it's possible - because each of the filters on
the logical plans are static, there's no opportunity for it to push the
distinct partition tuples down into the scan. Another variant would be
to make it so the source and partition tables share the same
`output_partitioning` structure, but as far as I can tell you wouldn't
be able to make the partitions line up such that you can do the merge
effectively and not read the whole table (plus `DeltaScan` doesn't
guarantee that one datafusion partition is one DeltaTable partition).

I think the static bit is a no brainer but the eager read of the source
table may cause issues if the source table is of a similar size to the
target table. It may be prudent hide that part behind a feature flag on
the merge, but would love comments on it.

# Performance

I created a 16GB table locally with 1.25 billion rows over 1k
partitions, and when updating 1 partition a full merge takes 1000-ish
seconds:

```
merge took 985.0801 seconds
merge metrics: MergeMetrics { num_source_rows: 1250000, num_target_rows_inserted: 468790, num_target_rows_updated: 781210, num_target_rows_deleted: 0, num_target_rows_copied: 1249687667, num_output_rows: 1250937667, num_target_files_added: 1001, num_target_files_removed: 1001, execution_time_ms: 983851, scan_time_ms: 0, rewrite_time_ms: 983322 }
```

but with partitioning it takes about 3:
```
merge took 2.6337671 seconds
merge metrics: MergeMetrics { num_source_rows: 1250000, num_target_rows_inserted: 468877, num_target_rows_updated: 781123, num_target_rows_deleted: 0, num_target_rows_copied: 468877, num_output_rows: 1718877, num_target_files_added: 2, num_target_files_removed: 2, execution_time_ms: 2622, scan_time_ms: 0, rewrite_time_ms: 2316 }
```

In practice, the tables I'm wanting to use this for are terabytes in
size so using merge is currently impractical. This would be a
significant speed boost to them.


# Related Issue(s)
closes #1846

---------

Co-authored-by: Ion Koutsouris <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants