Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(rust): handle 429 from GCS #2454

Merged
merged 6 commits into from
Apr 30, 2024
Merged

Conversation

adriangb
Copy link
Contributor

@adriangb adriangb commented Apr 25, 2024

Fixes #2451

This is probably not the best implementation but it at least let me test the outcome.

Here's my test script:

import os
from uuid import uuid4

os.environ['RUST_LOG'] = 'INFO'

from time import time
from concurrent.futures import ProcessPoolExecutor

import polars as pl
from deltalake import DeltaTable, write_deltalake

def worker(uri: str, df: pl.DataFrame) -> None:
    table = DeltaTable(uri, log_buffer_size=1024 * 1024)
    table.update_incremental()

    print('writing in a loop')
    while True:
        try:
            start = time()
            write_deltalake(table, df.to_arrow(), engine='rust', mode='append')
            print('Time taken:', time() - start)
        except Exception as e:
            print('Python error:', e)
            continue

if __name__ == '__main__':    
    df = pl.DataFrame({
        'a': [1, 2, 3],
        'b': [4, 5, 6],
    })

    uri = f'gs://bucket-00d0331/test/delta/{uuid4()}'

    print('initializing delta table')
    write_deltalake(uri, df.to_arrow(), engine='rust', mode='append')

    with ProcessPoolExecutor(32) as executor:
        futures = [
            executor.submit(worker, uri, df)
            for _ in range(32)
        ]
        for future in futures:
            future.result()

With this change I now get:

Attempt 1 failed: Version 1 already exists
Attempt 1 failed: Version 1 already exists
Python error: Generic DeltaTable error: Version mismatch
Time taken: 1.4619898796081543
Attempt 1 failed: Version 1 already exists
Attempt 2 failed: Version 2 already exists
Attempt 1 failed: Version 1 already exists
Attempt 2 failed: Version 2 already exists
Python error: Generic DeltaTable error: Version mismatch
Attempt 1 failed: Version 1 already exists
Attempt 1 failed: Version 1 already exists
Attempt 1 failed: Version 1 already exists
Attempt 1 failed: Version 1 already exists
Attempt 1 failed: Version 1 already exists
Attempt 2 failed: Version 2 already exists
Attempt 1 failed: Version 1 already exists
Attempt 1 failed: Version 1 already exists
Attempt 1 failed: Version 1 already exists
Attempt 1 failed: Version 1 already exists
Attempt 1 failed: Version 1 already exists
Attempt 1 failed: Version 1 already exists
Attempt 1 failed: Version 1 already exists
Attempt 1 failed: Version 1 already exists
Attempt 3 failed: Version 3 already exists
Attempt 1 failed: Version 1 already exists
Attempt 1 failed: Version 1 already exists
Attempt 3 failed: Version 3 already exists
Attempt 1 failed: Version 2 already exists
Attempt 2 failed: Version 2 already exists
Attempt 2 failed: Version 2 already exists
Attempt 1 failed: Version 1 already exists
Attempt 2 failed: Version 2 already exists
Attempt 1 failed: Version 2 already exists
Attempt 2 failed: Version 2 already exists
Attempt 2 failed: Version 2 already exists
Attempt 2 failed: Version 2 already exists
Attempt 1 failed: Version 1 already exists
Attempt 1 failed: Version 1 already exists
Attempt 2 failed: Version 2 already exists
Attempt 2 failed: Version 2 already exists
Attempt 1 failed: Version 2 already exists
Attempt 2 failed: Version 2 already exists
Attempt 1 failed: Version 2 already exists
Attempt 2 failed: Version 2 already exists
Attempt 1 failed: Version 1 already exists
Python error: Generic DeltaTable error: Version mismatch
Attempt 2 failed: Version 2 already exists
Attempt 2 failed: Version 2 already exists
Attempt 1 failed: Version 2 already exists
Attempt 3 failed: Version 3 already exists
Attempt 1 failed: Version 2 already exists
Attempt 1 failed: Version 2 already exists
Attempt 2 failed: Version 2 already exists
Attempt 2 failed: Version 2 already exists
Attempt 2 failed: Version 2 already exists
Attempt 2 failed: Version 2 already exists
Attempt 3 failed: Version 3 already exists
Attempt 4 failed: Version 4 already exists
Attempt 3 failed: Version 3 already exists
Attempt 3 failed: Version 3 already exists
Attempt 2 failed: Version 3 already exists
Attempt 3 failed: Version 3 already exists
Attempt 3 failed: Version 3 already exists
Attempt 2 failed: Version 3 already exists
Attempt 3 failed: Version 3 already exists
Attempt 2 failed: Version 3 already exists
Attempt 2 failed: Version 2 already exists
Attempt 2 failed: Version 2 already exists
Attempt 1 failed: Version 2 already exists
Attempt 3 failed: Version 3 already exists
Attempt 3 failed: Version 3 already exists
Attempt 3 failed: Version 3 already exists
Attempt 2 failed: Version 3 already exists
Attempt 2 failed: Version 2 already exists
Attempt 3 failed: Version 3 already exists
Python error: Generic DeltaTable error: Version mismatch

So this is an improvement but the behavior is still problematic: I'm not sure that it's updating to try the next version every time a commit fails. I also don't know if updating to the next version is even the right thing to do, maybe it should be refreshing the table state before retrying in case it's gotten far out of sync (e.g. because writing the data took a long time). Buy maybe these are crate wide or protocol wide issues not related to the particularities of GCS.

@github-actions github-actions bot added the binding/rust Issues for the Rust crate label Apr 25, 2024
Copy link

ACTION NEEDED

delta-rs follows the Conventional Commits specification for release automation.

The PR title and description are used as the merge commit message. Please update your PR title and description to match the specification.

@adriangb adriangb changed the title Handle 429 from GCS fix(rust): Handle 429 from GCS Apr 25, 2024
@adriangb
Copy link
Contributor Author

cc @ion-elgreco

@@ -556,6 +556,7 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
);
match conflict_checker.check_conflicts() {
Ok(_) => {
println!("Attempt {} failed: Version {} already exists", attempt_number, version);
Copy link
Collaborator

@ion-elgreco ion-elgreco Apr 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you can remove this, and fix the linting/fmt, then we are good to go. Also our GCS integration tests are disabled at the moment, so trusting that this works ; P

@adriangb adriangb marked this pull request as ready for review April 26, 2024 19:00
ion-elgreco
ion-elgreco previously approved these changes Apr 26, 2024
rtyler
rtyler previously approved these changes Apr 29, 2024
@rtyler rtyler enabled auto-merge (rebase) April 29, 2024 05:12
This change also loosens the meta-crate version dependency to allow more
easy upgrades in the future
@rtyler rtyler dismissed stale reviews from ion-elgreco and themself via ff78c63 April 29, 2024 05:18
@rtyler rtyler changed the title fix(rust): Handle 429 from GCS fix(rust): handle 429 from GCS Apr 29, 2024
@rtyler rtyler merged commit 41cb9d7 into delta-io:main Apr 30, 2024
21 of 22 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
binding/rust Issues for the Rust crate storage/gcp
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Handle rate limiting during write contention
3 participants