Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Successful writes return error when using concurrent writers #2279

Closed
helanto opened this issue Mar 11, 2024 · 5 comments · Fixed by #2396
Closed

Successful writes return error when using concurrent writers #2279

helanto opened this issue Mar 11, 2024 · 5 comments · Fixed by #2396
Labels
bug Something isn't working

Comments

@helanto
Copy link
Contributor

helanto commented Mar 11, 2024

Environment

Delta-rs version: deltalake 0.17.1

Binding: rust

Environment:

  • Cloud provider: S3
  • OS: Tested locally on MacOs Sonoma 14.2.1 and using Docker base image rust:1.76-bookworm
  • Other:

Bug

What happened:
I have two concurrent writers into the same table. Each writer performs an Append operation to the table inserting one row. I use S3DynamoDbLogStore to prevent data loss. One writer returns back an error while the other succeeds. However when querying the table both entries are successfully inserted.

What you expected to happen:
When the writer succeeds, I expect the corresponding data to be part of the resulting table.
When the writer fails, I expect the data corresponding to that write not to be part of the resulting table.

How to reproduce it:
First we set up S3DynamoDBLogStore. Then we use two parallel threads to insert two rows in parallel.

async fn insertToTable(uri: &str, value: i32) {
    // Setup access to S3 and Dynamo. Put values for AWS_S3_LOCKING_PROVIDER=dynamodb and DELTA_DYNAMO_TABLE_NAME
    let storage_options: HashMap<String, String> =  HashMap::new();
    let delta_table: deltalake::DeltaOps = 
        deltalake::DeltaOps::try_from_uri_with_storage_options(uri, storage_options).await.unwrap();
    
    // Create one row
    let field1 = Int32Array::from(vec![value]);
    let schema = Arc::new(Schema::new(vec![
        Field::new("foo", DataType::Int32, false),
    ]));
    let record_batch =
        vec![RecordBatch::try_new(schema.clone(), vec![Arc::new(field1)]).unwrap()];
    
    // Write row to delta table
    match delta_table.write(record_batch).await {
        Ok(_) => {
            log::info!("Wrote to delta table at '{}'", uri);
        }
        Err(e) => {log::info!("Error writing to delta table: {}", e.to_string());},
    }
}

#[tokio::main]
async fn main() {
    deltalake::aws::register_handlers(None);
    let task1 = tokio::spawn(insertToTable(uri, 1001));
    let task2 = tokio::spawn(insertToTable(uri, 1002));
    let _ = tokio::try_join!(task1, task2);
}

We get back:

> cargo run
[WARN  deltalake_aws::logstore] LockClientError::VersionAlreadyExists(1)
[INFO my_project] Wrote to delta table at 's3://...'
[INFO my_project] Error writing to delta table: Generic DeltaTable error: Version mismatch

When reading the table I get back both values (while the one write returned an error).

More details:
By digging a bit deeper into the code, seems that the error originates at the final part of the write operation. At the end of the write operation, the writer attempts to advance the (in-memory) snapshot so to include the commit made by the writer. However the snapshot was already advanced by a different process / writer, that the writer is not aware of. We get back a DeltaTableError::Generic("Version mismatch").

When making the change here the issue is fixed. Not sure however if this is correct path to follow.

@helanto helanto added the bug Something isn't working label Mar 11, 2024
@ion-elgreco
Copy link
Collaborator

ion-elgreco commented Mar 11, 2024

@helanto thanks for the issue : )

I am not sure though if that change you did in your repo would be the best way to solve this. Ideally you update the state of the table to the (commit version - 1), and then you merge the state with the actions that the write action did so you end up with the version

@vegarsti
Copy link
Contributor

vegarsti commented Mar 11, 2024

@helanto thanks for the issue : )

I am not sure though if that change you did in your repo would be the best way to solve this. Ideally you update the state of the table to the (commit version - 1), and then you merge the state with the actions that the write action did so you end up with the version

Thanks for the quick reply! By "commit version" do you mean this?
https://github.com/duneanalytics/delta-rs/blob/33a16bbbb5bd74cc55e53a47e7bcb4a0190d5918/crates/core/src/operations/write.rs#L807-L814

Maybe I'm misunderstanding, but I don't understand how this will create a correct snapshot. We don't necessarily know which version is the newest, or what the other actions that have committed since our successful version commit are.

@helanto
Copy link
Contributor Author

helanto commented Mar 11, 2024

@helanto thanks for the issue : )

I am not sure though if that change you did in your repo would be the best way to solve this. Ideally you update the state of the table to the (commit version - 1), and then you merge the state with the actions that the write action did so you end up with the version

Thank you @ion-elgreco for your reply!

This means we need to read the state from the log_store, right ? Is something like the following what you have in mind ?

if let Some(mut snapshot) = this.snapshot {
                # In case existing snapshot version is not the previous version
                if snapshot.version() != version -1 {
                    # Update the snapshot to version - 1
                    snapshot.update(this.log_store.clone(), Some(version -1)).await?;
                }
                # Then we are good to merge!
                snapshot.merge(actions, &operation, version)?;
                Ok(DeltaTable::new_with_state(this.log_store, snapshot))
} else {
               # The same
}

@ion-elgreco
Copy link
Collaborator

Yes, the version that is return after doing the commit action.

So I am saying you update the table snapshot to that version substracted by one, since only then merging the state with the actions of that writer will lead to the version of the commit action.

@ion-elgreco
Copy link
Collaborator

@helanto Yes that should work I believe

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
3 participants