-
Notifications
You must be signed in to change notification settings - Fork 421
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement simple delete case #832
Comments
I would like this! |
Hi, If i understand this correctly, the dummy approach to implement this would be to:
Then for each file with matched rows (count > 0), rewrite it with (with filters applied), add proper delete actions and commit. |
Surely, as a first step we would need to check if predicate columns matches table partitioning. |
Hey @dudzicp, Thanks for offering to help, we are more then happy to support along the way! Your initial thought is going in the right direction I think. One of the beautiful things about delta though is, that we have a lot of valuable information available in the delta log already that we can leverage to exclude some files a-priori and avoid the expensive IO that comes with it. The good news is, the integration with datafusion can already leverage this. Without having thought about it too much yet, the way I would get started is to leverage the pruning predicate that is implemented for delta-rs/rust/src/delta_datafusion.rs Lines 367 to 381 in 8a82f79
So one way to go could be to pre-select the files from the log that might have matches using As you already mentioned in this case we might already be doing too much work, however we could leverage a lot of the existing implementation. I.e. we get all the parallelism etc that comes out of the box with datafusion almost for free and don't need to implement that here. An alternative way, that might already get us a bit along the way to also supporting deletion vectors is to create a query plan ( I guess to get us started and establish the higher level APIs, my suggestion would be to use the pruning predicate to get all relevant files, use these to create a Another angle on this might be to start from the same list of files as above, read all the data we want to keep and for each file-partition check if we have as many rows as indicated in the meta-data, if so, there were no hits on the delete predicate and can (have to) disregard the data. if we have less rows, we saw have a hit, and should write the data. This is so far just thinking out loud, and I have to think a bit more about the best approach I guess for the first iteration we should aim for scanning each relevant file at most once, and leverage the scheduling / execution capabilities from datafusion as best we can. Then again I may also have missed something obvious and @houqp, @wjones127, or @schuermannator have some better ideas how to proceed 😆. |
I may also be wrong about scanning files only once. Specifically fi we write the data, we need all columns etc, deciding if we need it may be done on just a small subset of columns and thus potentially require much less IO ... |
Yeah I'd almost be tempted to start there. I imagine the flow is something like (sort of Rust, sort of pseudo code): enum FilterAction {
Ignore, // No matching rows in the file
Remove, // The entire file matches the predicate
Rewrite // At least part of the file matches the predicate
}
/// For an add action, decide whether the predicate applies to the full file,
/// none of the file, or part of the file.
fn check_filter(action: AddAction, predicate: Predicate) -> FilterAction {
todo!()
}
fn filter_operation(table: DeltaTable, predicate: Predicate) -> {
let new_actions: Vec<Action> = vec![];
let files_to_rewrite: Vec<&AddAction> = vec![];
for action in table.add_actions() {
match check_filter(action, predicate) {
Ignore => {},
Remove => {
new_actions.push(make_remove_action(action));
},
Rewrite => {
new_actions.push(make_remove_action(action));
files_to_rewrite.push(action);
}
}
}
// Read and filter parquet files we need to rewrite
let plan = ctx.read_parquet(files_to_rewrite).filter(!predicate);
let files_written = write(plan);
new_actions.extend(create_add_actions(files_written));
commit_transaction(new_actions);
} (This is basically what Robert described.)
I think once you know you need to rewrite a file, there isn't a much more efficient way to do the filtering. (Except maybe you could you Parquet row group statistics to determine which row groups can be passed through to the new file as is without any decoding/decompression. But that would require getting deep into the Parquet implementation, so we'll keep that out of scope.) The nice thing is a scan then filter has no pipeline breakers, so it can be done in a totally streaming fashion (don't have to load the entire table into memory at once). |
Hi @dudzicp are you still interesting on taking this work on? |
Yes, I will get back to this topic probably in a month as I am swamped at my primary job. Feel free to push this further if you have time. |
# Description This is a full implementation of the Delta Delete Command. Users can now delete records that match a predicate. This implementation is not limited to only partition columns and allows non-partition columns. This also implements a `find_files` function which can be used to implement the Update command. # Related Issue(s) - closes #832 # Documentation
Description
It's not uncommon to delete along partition boundaries. In those cases the delete should be very simple to implement; just commit some remove actions.
I think we should probably design the API so we can eventually support any where clause in deletes, rather than have a separate function for this simple case and the case where we have to rewrite files.
See the
write
operation implementation for inspiration: https://github.com/delta-io/delta-rs/blob/main/rust/src/operations/write.rsUse Case
Related Issue(s)
The text was updated successfully, but these errors were encountered: