-
Notifications
You must be signed in to change notification settings - Fork 421
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: logical Node for find files #2194
Conversation
ACTION NEEDED delta-rs follows the Conventional Commits specification for release automation. The PR title and description are used as the merge commit message. Please update your PR title and description to match the specification. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's quite a bit work that's still required. When implementing this operator my recommendation is to reuse as much of the DeltaScan
functionality a possible. This will help avoid bugs in the future whenever support for schema evolution / column mapping is introduced.
pub struct FindFilesNode { | ||
pub id: String, | ||
pub input: LogicalPlan, | ||
pub predicates: Vec<Expr>, | ||
pub files: Vec<String>, | ||
pub schema: DFSchemaRef, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The internal of this structure should be opaque and corresponding new
functions should be made.
Users will not provide the list of files to scan, instead they will provide some reference to the DeltaTable (i.e EagerSnapshot
). From that snapshot you can obtain the schema and files.
You also shouldn't need input
here. This operation should function as a source.
fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { | ||
write!( | ||
f, | ||
"FindFiles id={}, predicate={:?}, files={:?}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having files here will be a bit too verbose here. Would be better to print the version of the table.
if let Some(column) = batch.column_by_name(PATH_COLUMN) { | ||
let mut column_iter = column.as_string::<i32>().into_iter(); | ||
while let Some(Some(row)) = column_iter.next() { | ||
let df = ctx | ||
.read_parquet(row, ParquetReadOptions::default()) | ||
.await? | ||
.filter(predicate.to_owned())?; | ||
if df.count().await? > 0 { | ||
batches.push(row); | ||
} | ||
} | ||
} | ||
let str_array = Arc::new(StringArray::from(batches)); | ||
RecordBatch::try_new(only_file_path_schema(), vec![str_array]).map_err(Into::into) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Multiple enhancements can be made here. The current implementation of find_files
is able to perform a projection which reads only data required to evaluate the predicate. Keeping the projection behavior is a must since it reduces how much is read.
Something else to consider here is that FindFiles only needs to know if at least one record satisfies the predicate hence once one match is found the scan of that file can be stopped. Further optimization can be done by having a limit of 1 for the filter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another thing I have to add is when it's partition columns we don't need to read the file at all since it can be inferred from the path itself.
|
||
let e = p.execute(0, state.task_ctx())?; | ||
let s = collect_sendable_stream(e).await.unwrap(); | ||
print_batches(&s)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should assert on the final output result here. See other operations for an example of comparing a string batch representation to a batch.
} | ||
|
||
fn output_partitioning(&self) -> Partitioning { | ||
Partitioning::UnknownPartitioning(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This operation should not be limited a single partition. I.e think of each partition as a cpu thread here ideally we should be able to divide the files being scanned to each available cpu thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was just a lazy initial implementation, I can fix that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Significant improvements.
How much effort / what would be required to remove the dictionary output schema?
PATH_COLUMN, | ||
DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)), | ||
false, | ||
)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah there's definitely a bit of a weird interface mismatch. This operation must only output a file path at most once so there is minimal value in having a dictionary being returned to end user however the dictionary encoding should be used when we perform a scan.
Is it easy to simply have string be returned?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added to delta scan config the ability to turn off the dictionary encoding, it's on by default everywhere else, but here we use just a simple string as you mentioned
.map(|batch| batch.project(&[field_idx]).unwrap()) | ||
.collect(); | ||
|
||
let result_batches = concat_batches(&ONLY_FILES_SCHEMA.clone(), &path_batches)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking for this operation to output file paths as soon as they are discovered to allow operations downstream to begin their work ASAP. When performing an memory only scan to makes sense to output as a single batch since IO is minimal.
For this PR I think it's okay since it aligns with the previous behaviour but its something we can change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you elaborate a bit more? I understand what you are suggesting here, but I'm not sure what you are expecting. Like you want the return type to be a vec or record batches or a stream or something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes output record batches and these record batches should simply be a string. There is no benefit for using dictionaries in the output.
I was thinking longer term. The current implementation waits for all files to be scanned prior to sending a record batch downstream. There might be some benefit to send a record batch with a single record as soon as a file match is determined.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh you're talking about the actual schema of the record batch, I thought you were saying the return type of the function was wrong or something of the sort for it to be "immediately available" or something of the sort.
…ether or not to wrap partition columns in dictionary encodings, this is on by default
) -> Result<RecordBatch> { | ||
register_store(log_store.clone(), state.runtime_env().clone()); | ||
let scan_config = DeltaScanConfigBuilder::new() | ||
.wrap_partition_values(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we don't want to completely disable dictionary values when performing a scan since it can provide significant memory savings benefits. I'd prefer we wrap the partition values during a scan however in the output batches for the operation they should be provided as strings.
…coding, now just turn it on and rebuild the batch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks for making the changes.
Description
Some of my first workings on David's proposal in #2006, this is also meant to push #2048 and general CDF forward as well by making the logical operations of delta tables more composable than they are today.
Related Issue(s)
#2006
#2048
I think and @Blajda correct me here, we can build upon this and eventually move towards a
DeltaPlanner
esq enum for operations and their associated logical plan building.Still to do
DeltaScan
so delta scan can make use of this logical node