-
Notifications
You must be signed in to change notification settings - Fork 761
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
bump arrow2 to main #4212
bump arrow2 to main #4212
Conversation
This pull request is being automatically deployed with Vercel (learn more). 🔍 Inspect: https://vercel.com/databend/databend/32KaRCUPcxR5pk8JCu46uTfDLpp7 [Deployment for 1db356f canceled] |
Thanks for the contribution! Please review the labels and make any necessary changes. |
Some Note: first, the new async reader interface read_columns_many_async has 2 improvements: however, current ParquetSource brings a reader with it, thus cannot use the function, so I wrote a similar version of maybe we can adopt the reader factory pattern later. second, Some arrow2 functions return |
hi, there is a rough idea, FYI
How about instead of converting from
|
looks good to me,翻墙从中午一直挂,晚点再来改 @dantengsky |
let col_metas = get_field_columns(columns, field_name); | ||
let mut cols = Vec::with_capacity(col_metas.len()); | ||
for meta in col_metas { | ||
cols.push((meta, _read_single_column_async(reader, meta).await?)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How to make IO run on parallel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about accepting an Operator
or Object
here to create different readers for different columns?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my version of read_columns_many_async is not parallel.
need it because SourceFactory pass a Reader to ParquetSource
the one in arrow2 is parallel, it is ok to use it in block_reader.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So let's export _read_single_column_async
as pub, we can use it in block_reader
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, block_reader already used it, I mean maybe SourceFactory can accept something like (data_accessor, path). even use factory justlike arrow2, and struct(data_accessor, path) an impl of it
@@ -106,7 +106,7 @@ async fn test_interpreter_interceptor_for_insert() -> Result<()> { | |||
"| log_type | handler_type | cpu_usage | scan_rows | scan_bytes | scan_partitions | written_rows | written_bytes | result_rows | result_bytes | query_kind | query_text | sql_user | sql_user_quota |", | |||
"+----------+--------------+-----------+-----------+------------+-----------------+--------------+---------------+-------------+--------------+-----------------+----------------------------------------------------+----------+---------------------------------------------------------------------------+", | |||
"| 1 | TestSession | 8 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | CreateTablePlan | create table t as select number from numbers_mt(1) | root | UserQuota { max_cpu: 0, max_memory_in_bytes: 0, max_storage_in_bytes: 0 } |", | |||
"| 2 | TestSession | 8 | 1 | 8 | 0 | 1 | 1090 | 0 | 0 | CreateTablePlan | create table t as select number from numbers_mt(1) | root | UserQuota { max_cpu: 0, max_memory_in_bytes: 0, max_storage_in_bytes: 0 } |", | |||
"| 2 | TestSession | 8 | 1 | 8 | 0 | 1 | 1330 | 0 | 0 | CreateTablePlan | create table t as select number from numbers_mt(1) | root | UserQuota { max_cpu: 0, max_memory_in_bytes: 0, max_storage_in_bytes: 0 } |", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
written_bytes changed from 1090 to 1330
not known why
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's related to HashMap --> BTreeMap
.
let fields_to_read = self | ||
.projection | ||
.clone() | ||
.into_iter() | ||
.map(|idx| arrow_fields[idx].clone()) | ||
.map(|idx| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the test test_fuse_table_normal_case
failed because parquet is written with field names 'a', 'b' ... ( from select operator)
while the block reader later read with field name 'id' (from table schema)
|
||
let stream = futures::stream::iter(cols).map(|(col_meta, idx)| { | ||
let factory = || { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we use the arrow2 parallel read_columns_many_async here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Xuanwo (data_accessor, path) as a impl of F: Fn() -> BoxFuture<'b, std::io::Result<R>> + Clone
(from arrow2) https://github.com/jorgecarleitao/arrow2/blob/3d528c99589e96f0539de4c07b11843fa22f23ac/src/io/parquet/read/row_group.rs#L248
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some thoughts here:
-
shall we control the number of futures that simultaneously read the data of columns?
-
about the
try_join_all
used inread_columns_may_async
at least, "join_all will switch to the more powerful FuturesOrdered for performance reasons" according to
https://docs.rs/futures/0.3.21/futures/future/fn.join_all.html#see-also
not sure if it's worth bothering to replace it. -
read column chunk in one go might be a huge benefit (as @youngsofun already mentioned)
now the read_page_header
scatterly read bytes in an in-memory Cursor
https://github.com/jorgecarleitao/parquet2/blob/85d1f01597907c0cc30a234a6d6209c3d7ef17cf/src/read/page_iterator.rs#L65-L68
maybe we can eliminate the BufReader
used in the reading of column
https://github.com/datafuselabs/databend/blob/b41d39b05dcf281db921377d8b027477d333f8b6/query/src/storages/fuse/io/block_reader.rs#L108-L111
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cursor is only for file metadata https://github.com/jorgecarleitao/parquet2/search?q=cursor
but we can eliminate the BufReader because "we execute exactly 1 seek and 1 read on them." https://github.com/jorgecarleitao/arrow2/blob/3d528c99589e96f0539de4c07b11843fa22f23ac/examples/parquet_read_async.rs#L31
pr #4230
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/lgtm |
I hereby agree to the terms of the CLA available at: https://databend.rs/dev/policies/cla/
Summary
RecordBatch
byChunk
is there any other consideration given test is passed?
Changelog
Related Issues
Fixes #3746
Test Plan