Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge update+insert truncates a delta table if the table is big enough #2362

Closed
t1g0rz opened this issue Mar 31, 2024 · 4 comments · Fixed by #2364
Closed

Merge update+insert truncates a delta table if the table is big enough #2362

t1g0rz opened this issue Mar 31, 2024 · 4 comments · Fixed by #2364
Labels
bug Something isn't working

Comments

@t1g0rz
Copy link
Contributor

t1g0rz commented Mar 31, 2024

Environment

Delta-rs version: 0.16.3
Binding: python
OS: ubuntu22.04.1


Bug

What happened:
#2320 still persists in 0.16.3, but it has become more sophisticated. Apparently, if the delta table contains a few Parquet files, it can remove a few of them without a reason

What you expected to happen:
Updates and inserts to occur according to the predicate.

How to reproduce it:

import numpy as np
import pandas as pd
from deltalake import DeltaTable
import pyarrow as pa
import polars as pl
import string

def create_mock_df(st_idx, end_idx, sets_of_data):
    diff = end_idx - st_idx
    res = []
    for i in range(sets_of_data):
        mock_df = pd.DataFrame(np.random.random((diff, 5)), columns=[f"c{i}" for i in range(1, 6)], dtype=str)
        mock_df.insert(0, 'iii', range(st_idx, end_idx))
        mock_df.insert(1, 'name', np.random.choice(list(string.ascii_uppercase), size=diff))
        res.append(mock_df)
    
    return pd.concat(res, ignore_index=True).drop_duplicates(['iii', 'name'])

settings_to_merge = [
    (0, 1_400_000, 10),
    (1_040_000, 1_045_000, 10),
    (1_450_000, 1_500_000, 10),
    (1_139_800, 1_600_000, 10),
]

path = 'test'
storage_options=None

DeltaTable.create(path, 
                  storage_options=storage_options, 
                  schema=pa.schema(
        [
            pa.field('iii', type=pa.int64(), nullable=False),
            pa.field('name', type=pa.string(), nullable=False),
            pa.field('c1', type=pa.string()),
            pa.field('c2', type=pa.string()),
            pa.field('c3', type=pa.string()),
            pa.field('c4', type=pa.string()),
            pa.field('c5', type=pa.string()),
        ]
    )
)

for st_idx, end_idx, sets_of_data in settings_to_merge:
    mock_df = create_mock_df(st_idx, end_idx, sets_of_data)
    dt = DeltaTable(path, storage_options=storage_options)
    es = (
        dt.merge(mock_df, predicate=f't.iii > {mock_df.iii.min()} and s.iii = t.iii and s.name = t.name', source_alias='s', target_alias='t')
          .when_not_matched_insert_all()
          .when_matched_update_all()
          .execute()
    )
    print(es)
    print('init df shape:', len(mock_df))
    print('delta shape after merge:', pl.scan_delta(path, storage_options=storage_options).select('name').collect().shape)
    print('----')

Here is the output:

{'num_source_rows': 11810600, 'num_target_rows_inserted': 11810600, 'num_target_rows_updated': 0, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 0, 'num_output_rows': 11810600, 'num_target_files_added': 10, 'num_target_files_removed': 0, 'execution_time_ms': 15032, 'scan_time_ms': 0, 'rewrite_time_ms': 15018}
init df shape: 11810600
delta shape after merge: (11_810_600, 1)
----
{'num_source_rows': 42210, 'num_target_rows_inserted': 28587, 'num_target_rows_updated': 13623, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 11779569, 'num_output_rows': 11821779, 'num_target_files_added': 14, 'num_target_files_removed': 10, 'execution_time_ms': 6020, 'scan_time_ms': 0, 'rewrite_time_ms': 6005}
init df shape: 42210
delta shape after merge: (11_821_779, 1)
----
{'num_source_rows': 421823, 'num_target_rows_inserted': 421823, 'num_target_rows_updated': 0, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 0, 'num_output_rows': 421823, 'num_target_files_added': 1, 'num_target_files_removed': 0, 'execution_time_ms': 576, 'scan_time_ms': 0, 'rewrite_time_ms': 561}
init df shape: 421823
delta shape after merge: (12_243_602, 1)
----
{'num_source_rows': 3880878, 'num_target_rows_inserted': 3032571, 'num_target_rows_updated': 848307, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 2344350, 'num_output_rows': 6225228, 'num_target_files_added': 10, 'num_target_files_removed': 15, 'execution_time_ms': 7357, 'scan_time_ms': 0, 'rewrite_time_ms': 7341}
init df shape: 3880878
delta shape after merge: (6_225_228, 1)
----
@t1g0rz t1g0rz added the bug Something isn't working label Mar 31, 2024
@Blajda
Copy link
Collaborator

Blajda commented Mar 31, 2024

@t1g0rz Thanks for the report.
Did some digging given the sample code you provided. We use a special operator called MergeBarrier that requires the data to be partitioned based on its source. Currently re-partitioning is not happening causing inconsistent results.
Looking into a fix.

@t1g0rz
Copy link
Contributor Author

t1g0rz commented Mar 31, 2024

@Blajda, I have tested the snippet above, and it works fine on deltalake versions 0.16.0 and 0.16.1. However, I noticed that if there is any partition column, it stops working if the partition column value has been changed. This is likely related to what you mentioned. I hope it can be useful as well. I slightly modified the original code from issue #2320:

dt = DeltaTable.create(
    "./test1",
    schema=pa.schema(
        [
            pa.field("ts", pa.timestamp("us"), nullable=False),
            pa.field("some_data", pa.float64(), nullable=True),
            pa.field("some_part", pa.string(), nullable=True),
        ]
    ),
    partition_by=["some_part"]
)

df = pd.DataFrame(
    {
        "ts": pd.date_range("2023-01-01", freq="1h", periods=5),
        "some_data": np.random.random(5),
        "some_part": np.random.choice(["A", "B"], 5),
    }
)
dt = DeltaTable("./test1")
dt.merge(
    df,
    predicate=f"t.ts::Timestamp >= '{df.ts.min()}'::Timestamp and s.ts = t.ts",
    source_alias="s",
    target_alias="t",
).when_matched_update_all().when_not_matched_insert_all().execute()
"""
{'num_source_rows': 5,
 'num_target_rows_inserted': 5,
 'num_target_rows_updated': 0,
 'num_target_rows_deleted': 0,
 'num_target_rows_copied': 0,
 'num_output_rows': 5,
 'num_target_files_added': 2,
 'num_target_files_removed': 0,
 'execution_time_ms': 11,
 'scan_time_ms': 0,
 'rewrite_time_ms': 1}
"""
print(pl.from_dataframe(df))
"""
┌─────────────────────┬───────────┬───────────┐
│ ts                  ┆ some_data ┆ some_part │
│ ---                 ┆ ---       ┆ ---       │
│ datetime[ns]        ┆ f64       ┆ str       │
╞═════════════════════╪═══════════╪═══════════╡
│ 2023-01-01 00:00:00 ┆ 0.031419  ┆ B         │
│ 2023-01-01 01:00:00 ┆ 0.508243  ┆ A         │
│ 2023-01-01 02:00:00 ┆ 0.260409  ┆ A         │
│ 2023-01-01 03:00:00 ┆ 0.996127  ┆ B         │
│ 2023-01-01 04:00:00 ┆ 0.774423  ┆ B         │
└─────────────────────┴───────────┴───────────┘
"""
print(pl.read_delta('./test1').sort('ts'))
"""
┌─────────────────────┬───────────┬───────────┐
│ ts                  ┆ some_data ┆ some_part │
│ ---                 ┆ ---       ┆ ---       │
│ datetime[μs]        ┆ f64       ┆ str       │
╞═════════════════════╪═══════════╪═══════════╡
│ 2023-01-01 00:00:00 ┆ 0.031419  ┆ B         │
│ 2023-01-01 01:00:00 ┆ 0.508243  ┆ A         │
│ 2023-01-01 02:00:00 ┆ 0.260409  ┆ A         │
│ 2023-01-01 03:00:00 ┆ 0.996127  ┆ B         │
│ 2023-01-01 04:00:00 ┆ 0.774423  ┆ B         │
└─────────────────────┴───────────┴───────────┘
"""
dt = DeltaTable("./test1")
df = pd.DataFrame(
    {
        "ts": pd.date_range("2023-01-01 1:00:00", freq="1h", periods=5),
        "some_data": np.random.random(5),
        "some_part": np.random.choice(["A", "B"], 5),
    }
)
dt.merge(
    df,
    predicate=f"t.ts::Timestamp >= '{df.ts.min()}'::Timestamp and s.ts = t.ts",
    source_alias="s",
    target_alias="t",
).when_matched_update_all().when_not_matched_insert_all().execute()
"""
{'num_source_rows': 5,
 'num_target_rows_inserted': 1,
 'num_target_rows_updated': 4,
 'num_target_rows_deleted': 0,
 'num_target_rows_copied': 0,
 'num_output_rows': 5,
 'num_target_files_added': 3,
 'num_target_files_removed': 1,
 'execution_time_ms': 11,
 'scan_time_ms': 0,
 'rewrite_time_ms': 2}
"""
print(pl.from_dataframe(df))
"""
┌─────────────────────┬───────────┬───────────┐
│ ts                  ┆ some_data ┆ some_part │
│ ---                 ┆ ---       ┆ ---       │
│ datetime[ns]        ┆ f64       ┆ str       │
╞═════════════════════╪═══════════╪═══════════╡
│ 2023-01-01 01:00:00 ┆ 0.374384  ┆ A         │
│ 2023-01-01 02:00:00 ┆ 0.479215  ┆ B         │
│ 2023-01-01 03:00:00 ┆ 0.658368  ┆ A         │
│ 2023-01-01 04:00:00 ┆ 0.698976  ┆ B         │
│ 2023-01-01 05:00:00 ┆ 0.913481  ┆ B         │
└─────────────────────┴───────────┴───────────┘
"""
print(pl.read_delta('./test1').sort('ts'))
"""
┌─────────────────────┬───────────┬───────────┐
│ ts                  ┆ some_data ┆ some_part │
│ ---                 ┆ ---       ┆ ---       │
│ datetime[μs]        ┆ f64       ┆ str       │
╞═════════════════════╪═══════════╪═══════════╡
│ 2023-01-01 01:00:00 ┆ 0.374384  ┆ A         │
│ 2023-01-01 02:00:00 ┆ 0.479215  ┆ B         │
│ 2023-01-01 03:00:00 ┆ 0.658368  ┆ A         │
│ 2023-01-01 04:00:00 ┆ 0.698976  ┆ B         │
│ 2023-01-01 05:00:00 ┆ 0.913481  ┆ B         │
└─────────────────────┴───────────┴───────────┘
"""

@Blajda
Copy link
Collaborator

Blajda commented Mar 31, 2024

The root cause is the filter t.ts::Timestamp >= '{df.ts.min()}'::Timestamp is also being pushed to the parquet scan which results in only parquet groups that satisfy the filter being read. The current implementation of merge requires us to read the entire file so a small change between a prune filter vs. records filters that can be pushed down is required.

My previous comment about MergeBarrier was incorrect. Apparently after a re-partition it is possible for dictionaries to contain values that don't exist in the record batch which caught me by surprise.

Blajda added a commit that referenced this issue Mar 31, 2024
# Description
Delta scan will push filter to the parquet scan when possible. Added a
new configuration for the special case where operations need to operate
on an entire file but still want to perform pruning.

# Related Issue(s)
- fixes #2362
@Blajda
Copy link
Collaborator

Blajda commented Mar 31, 2024

@t1g0rz Let me know if any of your workflows are having issues with the latest merge. If they are I'll reopen the issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants