-
Notifications
You must be signed in to change notification settings - Fork 421
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merge execution time grows exponetially with the number of column #2107
Comments
Thanks @hmilbi for reporting the issue. |
Hey @Blajda, the number of rows does not seem to matter 100000 x 100 : 23.10 seconds |
@Blajda maybe this is related: apache/datafusion#7698? |
Yes the issue in the DF repo align with the observations here. If I get some free time I think it would still be worth checking if any optimizations can be made on our end. |
I can confirm this as well, a drastic slowdown in merge times using azure blob storage:
the even more interesting thing is that if you just melt your column instead of putting them wide then it is more efficient in terms of mergeing but not writing. so in the first two columns merge and write are just classic tabular style:
and where the merge and write long come from this operation:
so the additional data affects the write process but not nearly as much on the update and merge: where your normal merge looks like this:
vs the long merge:
I would expect somewhat more similar to the behaviour of the long one for both, if not even a bit more efficient when running tabular since the matching will be easier as you are matching on fewer columns. |
any updates on datafusions progress on this or if there are some ways to improve this in delta-rs? if keeping data in pure tabular form (many columns, single index) it very quickly becomes more effecient to just do the operation in python instead of using .merge + .when_matched_update_all and there some other performance drawbacks of using the melted versions of these datasets |
@pfwnicks for that you need to follow the DataFusion issue I mentioned above. Most of the slowness is the plan building which I don't think much can be done there to improve it, and is caused by how DataFusion works at the moment |
@ion-elgreco Thanks for the reply, do we know which part of it specifically is affecting it, is it possible that some of the subtasks from below will help?: apache/datafusion#5637 (comment) Also how is it looking in terms of implementing these changes from datafusion when they are finished? |
Ran into this as well. 178 columns turned my insert merge statement into a 288 second task even if the table was completely empty (first write) and only had 75,000 rows. Appending or overwriting the partition takes < 2 seconds. |
There should be some improvements with #2222 merged in. |
Is that pull request stalled? There have been no improvements in 0.16.0 as I can see. |
@t1g0rz those changes are in 0.16.0. If you don't see an impact, could you then share some numbers before and after? |
Compared to my first test, it is now much faster: n= 10 : 0.03 seconds n= 10 : ||||| |
@emcake amazing work 🤗🎉 |
@ion-elgreco # CPU times: user 39.8 ms, sys: 25.5 ms, total: 65.3 ms
# Wall time: 455 ms
N_COL = 300
N_ROWS = 200_000
storage_options = {
"AWS_S3_LOCKING_PROVIDER": "dynamodb",
"DELTA_DYNAMO_TABLE_NAME": "delta_log_dev",
...}
dt = DeltaTable.create("s3a://lake/test_rand/",
schema=pa.schema(
[
pa.field("feature", pa.string(), nullable=True),
pa.field("ts", pa.timestamp("us"), nullable=True)
] + [pa.field(f"col_{i}", pa.float64(), nullable=True) for i in range(N_COL)]
),
mode="overwrite",
partition_by=["feature"],
storage_options=storage_options
)
# CPU times: user 2min 35s, sys: 32.8 s, total: 3min 8s
# Wall time: 1min 34s
for i in range(10):
df = pd.DataFrame(np.round(np.random.random((N_ROWS, N_COL)),3))
df.columns = [f"col_{i}" for i in df.columns]
df.insert(0, "feature", f"feature_{i}")
df.insert(1, "ts", pd.date_range(start="2021-01-01", periods=N_ROWS, freq="s"))
dt.merge(df,
"s.feature = t.feature and s.ts = t.ts",
source_alias='s',
target_alias='t').when_matched_update_all().when_not_matched_insert_all().execute()
# CPU times: user 5.48 s, sys: 7.66 s, total: 13.1 s
# Wall time: 3.64 s
dt.to_pandas()
# CPU times: user 587 ms, sys: 734 ms, total: 1.32 s
# Wall time: 1.05 s
x = dt.to_pandas(filters=[('feature', '=', 'feature_1')]).head()
# FIRST MERGE
# CPU times: user 30.5 s, sys: 5.83 s, total: 36.3 s
# Wall time: 31.4 s
dt.merge(
x,
predicate="s.feature = t.feature and s.ts = s.ts",
source_alias='s',
target_alias='t',
).when_matched_update_all().when_not_matched_insert_all().execute()
"""
{'num_source_rows': 5,
'num_target_rows_inserted': 0,
'num_target_rows_updated': 1000000,
'num_target_rows_deleted': 0,
'num_target_rows_copied': 0,
'num_output_rows': 1000000,
'num_target_files_added': 2,
'num_target_files_removed': 1,
'execution_time_ms': 30941,
'scan_time_ms': 0,
'rewrite_time_ms': 27918}
"""
# SECOND MERGE
# CPU times: user 2min 6s, sys: 8.72 s, total: 2min 15s
# Wall time: 2min 3s
dt.merge(
x,
predicate="s.feature = t.feature and s.ts = s.ts",
source_alias='s',
target_alias='t',
).when_matched_update_all().when_not_matched_insert_all().execute()
"""
{'num_source_rows': 5,
'num_target_rows_inserted': 0,
'num_target_rows_updated': 5000000,
'num_target_rows_deleted': 0,
'num_target_rows_copied': 0,
'num_output_rows': 5000000,
'num_target_files_added': 8,
'num_target_files_removed': 2,
'execution_time_ms': 122965,
'scan_time_ms': 0,
'rewrite_time_ms': 120026}
""""
# THIRD MERGE
# CPU times: user 10min 26s, sys: 26 s, total: 10min 52s
# Wall time: 8min
dt.merge(
x,
predicate="s.feature = t.feature and s.ts = s.ts",
source_alias='s',
target_alias='t',
).when_matched_update_all().when_not_matched_insert_all().execute()
"""
{'num_source_rows': 5,
'num_target_rows_inserted': 0,
'num_target_rows_updated': 25000000,
'num_target_rows_deleted': 0,
'num_target_rows_copied': 0,
'num_output_rows': 25000000,
'num_target_files_added': 36,
'num_target_files_removed': 8,
'execution_time_ms': 477999,
'scan_time_ms': 0,
'rewrite_time_ms': 475107}
""" |
@t1g0rz that's likely because you're doing |
@ion-elgreco |
@t1g0rz no worries!! |
@Blajda shall we close this issue then, seems that performance is now quite good. Any upstream changes to datafusion will also improve it even more |
Environment
Python 3.11
Delta-rs version:
0.15.1
Environment:
Bug
What happened:
I have several tables that have more than 100 columns. Mergin into these takes too much time.
What you expected to happen:
Merging 1 row with 100 columns into a table with 1 row and 100 columns should not take 22 second.
How to reproduce it:
Output
More details:
The text was updated successfully, but these errors were encountered: