Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: prevent writing checkpoints with a version that does not exist in table state #1863

Merged
merged 1 commit into from
Nov 14, 2023

Conversation

rtyler
Copy link
Member

@rtyler rtyler commented Nov 14, 2023

In some situations where the same writer is issuing append transactions using the operations API, which returns the newly created version, such as 10.

If the caller then attempts to create a checkpoint for version 10, the operation will produce an inconsistency in the _last_checkpoint file, if the callers in-memory table state has not been reloaded since the append operation completed.

In this scenario the _delta_log/ directory may contain: .
├── 00000000000000000000.json
├── 00000000000000000001.json
├── 00000000000000000002.json
├── 00000000000000000003.json
├── 00000000000000000004.json
├── 00000000000000000005.json
├── 00000000000000000006.json
├── 00000000000000000007.json
├── 00000000000000000008.json
├── 00000000000000000009.json
├── 00000000000000000010.checkpoint.parquet
├── 00000000000000000010.json
└── _last_checkpoint

While _last_checkpoint contains the following:
{"num_of_add_files":null,"parts":null,"size":342,"size_in_bytes":95104,"version":9}

This will result in an error on any attempts to read the Delta table:

>>> from deltalake import DeltaTable
>>> dt = DeltaTable('.')
[2023-11-14T18:05:59Z DEBUG deltalake_core::protocol] loading checkpoint from _delta_log/_last_checkpoint
[2023-11-14T18:05:59Z DEBUG deltalake_core::table] update with latest checkpoint CheckPoint { version: 9, size: 342, parts: None, size_in_bytes: Some(95104), num_of_add_files: None }
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/tyler/venv/lib64/python3.11/site-packages/deltalake/table.py", line 253, in __init__
    self._table = RawDeltaTable(
                ^^^^^^^^^^^^^^
FileNotFoundError: Object at location /home/tyler/corrupted-table/_delta_log/00000000000000000009.checkpoint.parquet not found: No such file or directory (os error 2)
>>>

To prevent this error condition, the create_checkpoint_for() function should ensure that the provided checkpoint version (used to write the .checkpoint.parquet file) matches the table state's version (used to write the _last_checkpoint file).

This has the added benefit of helping prevent the caller from passing in a nonsensical version number that could also lead to a broken table.

@rtyler rtyler added bug Something isn't working binding/rust Issues for the Rust crate labels Nov 14, 2023
Comment on lines +126 to +130
error!(
"create_checkpoint_for called with version {version} but table state contains: {}. The table state may need to be reloaded",
state.version()
);
return Err(CheckpointError::StaleTableVersion(version, state.version()).into());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't we just checkout the correct version?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wjones127 how do you mean "checkout?" I am less versed in the nuance of checkpoints. If the last version in the table is 100, is it valid to call this with _last_checkpoint of version: 5?

I honestly don't know whether we should even allow a user-specified version here at all, rather than creating a checkpoint with the version of the table state.

HOWEVER, if a writer has a stale table state creates _last_checkpoint after another writer has created a later checkpoint, this becomes confusing and out of sync anyways 😖

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess to expand, I like the failure option here because the caller is asking us to do something here that will result in incorrect state, and we don't know what they intend or what performance concerns they have. I.e. I don't think we should reload the state here, as an example, because that would be surprising and may actually result in a later version than the user specified too

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like (a) failing is the easiest solution that avoids any potential new bug introduction and (b) reloading correct version could have unexpected perf impact, esp for things like streaming workloads that are managing their own table state

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh one more thing since my thoughts are disjointed here 😆 I think we already have the path you describe covered with create_checkpoint() 😄

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay re-read the signature. Erroring makes sense to me.

I was thinking in the situation you were describing in the issue report, it sounds like the correct thing to do is to checkout the desired version first then create the checkpoint.

…le state

I have seen this in a production environment where the same writer is issuing
append transactions using the operations API, which returns the newly created
version, such as 10.

If the caller then attempts to create a checkpoint for version 10, the operation
will produce an inconsistency in the `_last_checkpoint` file, if the callers
in-memory table state has *not* been reloaded since the append operation
completed.

In this scenario the _delta_log/ directory may contain:
.
    ├── 00000000000000000000.json
    ├── 00000000000000000001.json
    ├── 00000000000000000002.json
    ├── 00000000000000000003.json
    ├── 00000000000000000004.json
    ├── 00000000000000000005.json
    ├── 00000000000000000006.json
    ├── 00000000000000000007.json
    ├── 00000000000000000008.json
    ├── 00000000000000000009.json
    ├── 00000000000000000010.checkpoint.parquet
    ├── 00000000000000000010.json
    └── _last_checkpoint

While `_last_checkpoint` contains the following:
    {"num_of_add_files":null,"parts":null,"size":342,"size_in_bytes":95104,"version":9}

This will result in an error on any attempts to read the Delta table:

    >>> from deltalake import DeltaTable
    >>> dt = DeltaTable('.')
    [2023-11-14T18:05:59Z DEBUG deltalake_core::protocol] loading checkpoint from _delta_log/_last_checkpoint
    [2023-11-14T18:05:59Z DEBUG deltalake_core::table] update with latest checkpoint CheckPoint { version: 9, size: 342, parts: None, size_in_bytes: Some(95104), num_of_add_files: None }
    Traceback (most recent call last):
    File "<stdin>", line 1, in <module>
    File "/home/tyler/venv/lib64/python3.11/site-packages/deltalake/table.py", line 253, in __init__
        self._table = RawDeltaTable(
                    ^^^^^^^^^^^^^^
    FileNotFoundError: Object at location /home/tyler/corrupted-table/_delta_log/00000000000000000009.checkpoint.parquet not found: No such file or directory (os error 2)
    >>>

To prevent this error condition, the create_checkpoint_for() function should ensure
that the provided checkpoint version (used to write the `.checkpoint.parquet` file) matches
the table state's version (used to write the `_last_checkpoint` file).

This has the added benefit of helping prevent the caller from passing in a
nonsensical version number that could also lead to a broken table.

Sponsored-by: Scribd Inc
@rtyler rtyler force-pushed the checkpoint-table-state branch from 2334f7d to 6b98ff5 Compare November 14, 2023 20:40
@rtyler rtyler marked this pull request as ready for review November 14, 2023 20:55
@rtyler rtyler requested a review from roeap as a code owner November 14, 2023 20:55
Copy link
Collaborator

@roeap roeap left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes a lot of sense :).

Currently there also is a draft PR in the works to be able to handle v2 checkpoints and honour the latest recommendations from the protocol.

@rtyler rtyler merged commit cdf52df into delta-io:main Nov 14, 2023
20 of 21 checks passed
@rtyler rtyler deleted the checkpoint-table-state branch November 14, 2023 23:32
@rtyler rtyler added this to the Rust v0.17 milestone Nov 14, 2023
rtyler added a commit to buoyant-data/oxbow that referenced this pull request Nov 15, 2023
… oxbow can provide

This change incorporates some lessons learned from identifying a production
table inconsistency issue where a checkpoint would be written for a version that
was newer than the loaded table state.

See also delta-io/delta-rs#1863

oxbow is now a bit more conservative and will skip writing checkpoints if a
version mismatch between table state and the desired version to write are
present
rtyler added a commit to buoyant-data/oxbow that referenced this pull request Nov 15, 2023
Without reloading the table state the creation of a checkpoint may result in an
inconsistent table state with a checkpoint for version X and a _last_checkpoint
file for version X-1 being created (a file which does not exist).

This was quite pesky to track down, so the lambda also is restructured to more
cleanly create a table if one does not exist but return an error for all other
errored execution

See also delta-io/delta-rs#1863
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
binding/rust Issues for the Rust crate bug Something isn't working crate/core
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants