-
Notifications
You must be signed in to change notification settings - Fork 421
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: move and update Optimize operation #1154
Conversation
return Err(DeltaWriterError::PartialParquetWrite { | ||
sample_error: match &partial_writes[0].1 { | ||
ParquetError::General(msg) => ParquetError::General(msg.to_owned()), | ||
ParquetError::ArrowError(msg) => ParquetError::ArrowError(msg.to_owned()), | ||
ParquetError::EOF(msg) => ParquetError::EOF(msg.to_owned()), | ||
ParquetError::External(err) => ParquetError::General(err.to_string()), | ||
ParquetError::IndexOutOfBound(u, v) => { | ||
ParquetError::IndexOutOfBound(u.to_owned(), v.to_owned()) | ||
} | ||
ParquetError::NYI(msg) => ParquetError::NYI(msg.to_owned()), | ||
}, | ||
skipped_values: partial_writes, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was not too happy doing this, but the parquet errors are no longer cloneable, so to_onwed
does not do much. My assumption was that we have to keep all the individual errors, since kafka-delta-ingest relies on them? @rtyler
assert!(maybe_metrics.is_err()); | ||
assert_eq!(dt.version(), version + 1); | ||
Ok(()) | ||
} | ||
|
||
#[ignore = "we do not yet re-try in operations commits."] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm finally working up to doing the commits conflict resolution. so this will be re-enabled soonishly ..
@roeap The optimize changes LGTM.
The bin packing optimization must be idempotent so I think just passing the entire partition would violate that. Sorting files by their size ensures that. It's good that you kept the check for only one add action so any undesired behaviour should be caught and reported. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few minor suggestions. Overall looks good :)
Co-authored-by: Will Jones <[email protected]>
# Description This PR moves the optimize operation into the operations module. As part of this we do a few updates to the operation as well - adopt `IntoFuture` pattern - use writer from operations module - replace `SerializedFileReader` with `ParquetRecordBatchStream` As part of this we also update datafusion and arrow, which in turn requires updating pyo3. This requires updating some deprecated features. i.e. how function signatures are annotated. It also leads to a breaking change in the python funcitons - specifically the order of arguments in `dataset_partitions`. cc @Blajda # Related Issue(s) part of delta-io#1136 # Documentation <!--- Share links to useful documentation ---> --------- Co-authored-by: Will Jones <[email protected]>
Description
This PR moves the optimize operation into the operations module. As part of this we do a few updates to the operation as well
IntoFuture
patternSerializedFileReader
withParquetRecordBatchStream
As part of this we also update datafusion and arrow, which in turn requires updating pyo3. This requires updating some deprecated features. i.e. how function signatures are annotated. It also leads to a breaking change in the python funcitons - specifically the order of arguments in
dataset_partitions
.There is one more thing is was considering while doing these updates. Essentially we may be getting some undesired behaviour, as the writer in the operations module already considers file size, so not all data in a bin might actually end up in the same file. there may also be a naïve yet much simpler way to do this now, by just passing all data in a partition through the partition writer. This should yield files of the desired size within the accuracy of the configured row-group size. It may of course lead to a rather small remainder file.
cc @Blajda
Related Issue(s)
part of #1136
Documentation