From e95314502acb6a9bfe63f14132fa6fbfb383fb88 Mon Sep 17 00:00:00 2001 From: youngsofun Date: Mon, 21 Feb 2022 20:43:12 +0800 Subject: [PATCH] bump arrow2 to 0.9.1. --- Cargo.lock | 57 +++++++++++++------ common/arrow/Cargo.toml | 8 +-- common/datablocks/src/data_block.rs | 44 +++++++------- common/datablocks/src/lib.rs | 1 + common/datablocks/tests/it/data_block.rs | 14 ++--- common/datavalues/src/columns/column.rs | 1 + common/datavalues/src/data_field.rs | 2 +- common/datavalues/src/data_schema.rs | 15 +++-- common/datavalues/src/types/data_type.rs | 40 +++++++------ common/streams/src/sources/source_parquet.rs | 4 +- common/streams/src/stream_limit_by.rs | 8 ++- common/streams/tests/it/source.rs | 4 +- metasrv/src/api/grpc/grpc_service.rs | 4 +- query/src/api/rpc/flight_client_stream.rs | 8 +-- query/src/api/rpc/flight_service.rs | 4 +- .../new/executor/executor_worker_context.rs | 2 +- .../transforms/transform_limit_by.rs | 9 ++- query/src/sql/optimizer/group.rs | 2 +- .../src/sql/statements/analyzer_statement.rs | 2 +- query/src/storages/fuse/io/block_reader.rs | 2 +- query/src/storages/fuse/io/block_writer.rs | 6 +- query/tests/it/tests/parquet.rs | 4 +- 22 files changed, 132 insertions(+), 109 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 87404728f10fe..b7862d59c7825 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -75,6 +75,12 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c5d78ce20460b82d3fa150275ed9d55e21064fc7951177baacf86a145c4a4b1f" +[[package]] +name = "array-init-cursor" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf7d0a018de4f6aa429b9d33d69edf69072b1c5b1cb8d3e4a5f7ef898fc3eb76" + [[package]] name = "arrayref" version = "0.3.6" @@ -98,24 +104,27 @@ checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6" [[package]] name = "arrow-format" -version = "0.3.0" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f7da2d9660bfaebbdb0a44a33b3bd1dcb5a952fafa02c0dfc6a51ea471fef2a" +checksum = "2333f8ccf0d597ba779863c57a0b61f635721187fb2fdeabae92691d7d582fe5" dependencies = [ - "flatbuffers", + "planus", "prost", "prost-derive", + "serde", "tonic", ] [[package]] name = "arrow2" -version = "0.8.1" -source = "git+https://github.com/datafuse-extras/arrow2?rev=d14ae86#d14ae86c69cd76957adec3b14bb62d93732b43c9" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aea88c49c98db9de8c72ccaa0470182857e70faa635f32fc4aa3c9e1a1dfefea" dependencies = [ "ahash", "arrow-format", "base64 0.13.0", + "bytemuck", "chrono", "csv", "fallible-streaming-iterator", @@ -1019,6 +1028,20 @@ name = "bytemuck" version = "1.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "439989e6b8c38d1b6570a384ef1e49c8848128f5a97f3914baef02920842712f" +dependencies = [ + "bytemuck_derive", +] + +[[package]] +name = "bytemuck_derive" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e215f8c2f9f79cb53c8335e687ffd07d5bfcb6fe5fc80723762d0be46e7cc54" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] [[package]] name = "byteorder" @@ -2809,17 +2832,6 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "279fb028e20b3c4c320317955b77c5e0c9701f05a1d309905d6fc702cdc5053e" -[[package]] -name = "flatbuffers" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef4c5738bcd7fad10315029c50026f83c9da5e4a21f8ed66826f43e0e2bde5f6" -dependencies = [ - "bitflags", - "smallvec", - "thiserror", -] - [[package]] name = "flate2" version = "1.0.22" @@ -4971,9 +4983,9 @@ dependencies = [ [[package]] name = "parquet2" -version = "0.8.1" +version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57e98d7da0076cead49c49580cc5771dfe0ba8a93cadff9b47c1681a4a78e1f9" +checksum = "45476d276db539ec4076f6abe62392619460fb70a1a8edebcc06e11cd93c0ec3" dependencies = [ "async-stream", "bitpacking", @@ -5218,6 +5230,15 @@ version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "58893f751c9b0412871a09abd62ecd2a00298c6c83befa223ef98c52aef40cbe" +[[package]] +name = "planus" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bffebaf174d6cad46a5f0f1bb1c45c6eb509571688bcb18dfab217f3c9f9b151" +dependencies = [ + "array-init-cursor", +] + [[package]] name = "plotters" version = "0.3.1" diff --git a/common/arrow/Cargo.toml b/common/arrow/Cargo.toml index de1e5f10452d4..f38436d963948 100644 --- a/common/arrow/Cargo.toml +++ b/common/arrow/Cargo.toml @@ -11,7 +11,7 @@ doctest = false test = false [features] -arrow-default = ["arrow/compute", "arrow/regex", "arrow/io_csv", "arrow/io_parquet", "arrow/io_json", "arrow/io_flight"] +arrow-default = ["arrow/compute", "arrow/regex", "arrow/io_csv", "arrow/io_parquet", "arrow/io_json", "arrow/io_flight", "arrow/compute_filter"] default = ["arrow-default", "parquet-default"] parquet-default = ["parquet2/stream", "parquet2/lz4"] simd = ["arrow/simd"] @@ -20,9 +20,9 @@ simd = ["arrow/simd"] # Workspace dependencies # Github dependencies -arrow = { package = "arrow2", git = "https://github.com/datafuse-extras/arrow2", default-features = false, rev = "d14ae86"} -arrow-format = { version = "0.3.0", features = ["flight-data", "flight-service"] } -parquet2 = { version = "0.8.1", default_features = false } +arrow = { package = "arrow2", version="0.9.1", default-features = false} +arrow-format = { version = "0.4.0", features = ["flight-data", "flight-service", "ipc"] } +parquet2 = { version = "0.9.0", default_features = false } # Crates.io dependencies [dev-dependencies] diff --git a/common/datablocks/src/data_block.rs b/common/datablocks/src/data_block.rs index 518fc68b1a21f..d4cdcd60c3dea 100644 --- a/common/datablocks/src/data_block.rs +++ b/common/datablocks/src/data_block.rs @@ -16,8 +16,9 @@ use std::convert::TryFrom; use std::fmt; use std::sync::Arc; -use common_arrow::arrow; -use common_arrow::arrow::record_batch::RecordBatch; +use common_arrow::arrow::array::Array; +use common_arrow::arrow::array::ArrayRef; +use common_arrow::arrow::chunk::Chunk; use common_datavalues::prelude::*; use common_exception::ErrorCode; use common_exception::Result; @@ -174,38 +175,37 @@ impl DataBlock { Ok(Self { columns, schema }) } -} - -impl TryFrom for RecordBatch { - type Error = ErrorCode; - fn try_from(v: DataBlock) -> Result { - let arrays = v + pub fn from_chunk(schema: &DataSchemaRef, chuck: &Chunk) -> Result { + let columns = chuck .columns() .iter() - .map(|c| c.as_arrow_array()) - .collect::>(); + .zip(schema.fields().iter()) + .map(|(col, f)| match f.is_nullable() { + true => col.into_nullable_column(), + false => col.into_column(), + }) + .collect(); - Ok(RecordBatch::try_new(Arc::new(v.schema.to_arrow()), arrays)?) + Ok(DataBlock::create(schema.clone(), columns)) } } -impl TryFrom for DataBlock { +pub fn box_chunk_to_arc_chunk(c: Chunk>) -> Chunk { + Chunk::::new(c.into_arrays().into_iter().map(Arc::from).collect()) +} + +impl TryFrom for Chunk { type Error = ErrorCode; - fn try_from(v: arrow::record_batch::RecordBatch) -> Result { - let schema: DataSchemaRef = Arc::new(v.schema().as_ref().into()); - let columns = v + fn try_from(v: DataBlock) -> Result> { + let arrays = v .columns() .iter() - .zip(schema.fields().iter()) - .map(|(col, f)| match f.is_nullable() { - true => col.into_nullable_column(), - false => col.into_column(), - }) - .collect(); + .map(|c| c.as_arrow_array()) + .collect::>(); - Ok(DataBlock::create(schema, columns)) + Ok(Chunk::try_new(arrays)?) } } diff --git a/common/datablocks/src/lib.rs b/common/datablocks/src/lib.rs index 291ee2d711c2c..8f51feaaf4efb 100644 --- a/common/datablocks/src/lib.rs +++ b/common/datablocks/src/lib.rs @@ -19,6 +19,7 @@ mod data_block_debug; mod kernels; mod memory; +pub use data_block::box_chunk_to_arc_chunk; pub use data_block::DataBlock; pub use data_block_debug::*; pub use kernels::*; diff --git a/common/datablocks/tests/it/data_block.rs b/common/datablocks/tests/it/data_block.rs index eae56b36c13c9..30bc927d692e5 100644 --- a/common/datablocks/tests/it/data_block.rs +++ b/common/datablocks/tests/it/data_block.rs @@ -12,7 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use common_arrow::arrow::record_batch::RecordBatch; +use common_arrow::arrow::array::ArrayRef; +use common_arrow::arrow::chunk::Chunk; use common_datablocks::DataBlock; use common_datavalues::prelude::*; use common_exception::Result; @@ -60,18 +61,15 @@ fn test_data_block_convert() -> Result<()> { assert_eq!(3, block.num_rows()); assert_eq!(4, block.num_columns()); - let record_batch: RecordBatch = block.try_into().unwrap(); + let chunk: Chunk = block.try_into().unwrap(); // first and last test. - assert_eq!(3, record_batch.num_rows()); - assert_eq!(4, record_batch.num_columns()); + assert_eq!(3, chunk.len()); + assert_eq!(4, chunk.columns().len()); - let new_block: DataBlock = record_batch.try_into().unwrap(); + let new_block: DataBlock = DataBlock::from_chunk(&schema, &chunk).unwrap(); assert_eq!(3, new_block.num_rows()); assert_eq!(4, new_block.num_columns()); - let new_schema = new_block.schema(); - - assert_eq!(new_schema, &schema); Ok(()) } diff --git a/common/datavalues/src/columns/column.rs b/common/datavalues/src/columns/column.rs index 50b7fc5c483b2..1e888413c1528 100644 --- a/common/datavalues/src/columns/column.rs +++ b/common/datavalues/src/columns/column.rs @@ -161,6 +161,7 @@ impl IntoColumn for &ArrayRef { } } +//impl> IntoColumn for A { impl IntoColumn for ArrayRef { fn into_column(self) -> ColumnRef { use TypeID::*; diff --git a/common/datavalues/src/data_field.rs b/common/datavalues/src/data_field.rs index 7e26dc997bc70..00d5c3456f1e9 100644 --- a/common/datavalues/src/data_field.rs +++ b/common/datavalues/src/data_field.rs @@ -104,7 +104,7 @@ impl From<&ArrowField> for DataField { fn from(f: &ArrowField) -> Self { let dt: DataTypePtr = from_arrow_field(f); - DataField::new(f.name(), dt) + DataField::new(&f.name, dt) } } diff --git a/common/datavalues/src/data_schema.rs b/common/datavalues/src/data_schema.rs index 8102201774b71..fbb4a9322a9e6 100644 --- a/common/datavalues/src/data_schema.rs +++ b/common/datavalues/src/data_schema.rs @@ -14,7 +14,6 @@ use core::fmt; use std::collections::BTreeMap; -use std::collections::HashMap; use std::sync::Arc; use common_arrow::arrow::datatypes::Schema as ArrowSchema; @@ -28,25 +27,25 @@ use crate::DataField; #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)] pub struct DataSchema { pub(crate) fields: Vec, - pub(crate) metadata: HashMap, + pub(crate) metadata: BTreeMap, } impl DataSchema { pub fn empty() -> Self { Self { fields: vec![], - metadata: HashMap::new(), + metadata: BTreeMap::new(), } } pub fn new(fields: Vec) -> Self { Self { fields, - metadata: HashMap::new(), + metadata: BTreeMap::new(), } } - pub fn new_from(fields: Vec, metadata: HashMap) -> Self { + pub fn new_from(fields: Vec, metadata: BTreeMap) -> Self { Self { fields, metadata } } @@ -84,7 +83,7 @@ impl DataSchema { /// Returns an immutable reference to field `metadata`. #[inline] - pub const fn meta(&self) -> &HashMap { + pub const fn meta(&self) -> &BTreeMap { &self.metadata } @@ -148,7 +147,7 @@ impl DataSchema { .map(|f| f.to_arrow()) .collect::>(); - ArrowSchema::new_from(fields, self.metadata.clone()) + ArrowSchema::from(fields).with_metadata(self.metadata.clone()) } } @@ -165,7 +164,7 @@ impl DataSchemaRefExt { impl From<&ArrowSchema> for DataSchema { fn from(a_schema: &ArrowSchema) -> Self { let fields = a_schema - .fields() + .fields .iter() .map(|arrow_f| arrow_f.into()) .collect::>(); diff --git a/common/datavalues/src/types/data_type.rs b/common/datavalues/src/types/data_type.rs index 76fcd8bad28c5..adb37aa3a7f2c 100644 --- a/common/datavalues/src/types/data_type.rs +++ b/common/datavalues/src/types/data_type.rs @@ -131,7 +131,7 @@ pub fn from_arrow_type(dt: &ArrowType) -> DataTypePtr { ArrowType::Date64 => Arc::new(Date32Type::default()), ArrowType::Struct(fields) => { - let names = fields.iter().map(|f| f.name().to_string()).collect(); + let names = fields.iter().map(|f| f.name.clone()).collect(); let types = fields.iter().map(from_arrow_field).collect(); Arc::new(StructType::create(names, types)) @@ -145,32 +145,30 @@ pub fn from_arrow_type(dt: &ArrowType) -> DataTypePtr { } pub fn from_arrow_field(f: &ArrowField) -> DataTypePtr { - if let Some(m) = f.metadata() { - if let Some(custom_name) = m.get(ARROW_EXTENSION_NAME) { - let metadata = m.get(ARROW_EXTENSION_META).cloned(); - match custom_name.as_str() { - "Date" | "Date16" => return Date16Type::arc(), - "Date32" => return Date32Type::arc(), - "DateTime" | "DateTime32" => return DateTime32Type::arc(metadata), - "DateTime64" => match metadata { - Some(meta) => { - let mut chars = meta.chars(); - let precision = chars.next().unwrap().to_digit(10).unwrap(); - let tz = chars.collect::(); - return DateTime64Type::arc(precision as usize, Some(tz)); - } - None => return DateTime64Type::arc(3, None), - }, - "Interval" => return IntervalType::arc(metadata.unwrap().into()), - _ => {} - } + if let Some(custom_name) = f.metadata.get(ARROW_EXTENSION_NAME) { + let metadata = f.metadata.get(ARROW_EXTENSION_META).cloned(); + match custom_name.as_str() { + "Date" | "Date16" => return Date16Type::arc(), + "Date32" => return Date32Type::arc(), + "DateTime" | "DateTime32" => return DateTime32Type::arc(metadata), + "DateTime64" => match metadata { + Some(meta) => { + let mut chars = meta.chars(); + let precision = chars.next().unwrap().to_digit(10).unwrap(); + let tz = chars.collect::(); + return DateTime64Type::arc(precision as usize, Some(tz)); + } + None => return DateTime64Type::arc(3, None), + }, + "Interval" => return IntervalType::arc(metadata.unwrap().into()), + _ => {} } } let dt = f.data_type(); let ty = from_arrow_type(dt); - let is_nullable = f.is_nullable(); + let is_nullable = f.is_nullable; if is_nullable && ty.can_inside_nullable() { Arc::new(NullableType::create(ty)) } else { diff --git a/common/streams/src/sources/source_parquet.rs b/common/streams/src/sources/source_parquet.rs index ede3a27723a1d..c516df9f857f7 100644 --- a/common/streams/src/sources/source_parquet.rs +++ b/common/streams/src/sources/source_parquet.rs @@ -94,7 +94,7 @@ where R: AsyncRead + AsyncSeek + Unpin + Send return Ok(None); } - let fields = self.arrow_table_schema.fields(); + let fields = &self.arrow_table_schema.fields; let row_grp = &metadata.row_groups[self.current_row_group]; let cols = self .projection @@ -114,7 +114,7 @@ where R: AsyncRead + AsyncSeek + Unpin + Send .await?; let array: Arc = array.into(); - let column = match fields[idx].nullable { + let column = match fields[idx].is_nullable { false => array.into_column(), true => array.into_nullable_column(), }; diff --git a/common/streams/src/stream_limit_by.rs b/common/streams/src/stream_limit_by.rs index bc9da1c7d28c7..5967e065bc4ae 100644 --- a/common/streams/src/stream_limit_by.rs +++ b/common/streams/src/stream_limit_by.rs @@ -21,6 +21,7 @@ use common_arrow::arrow; use common_arrow::arrow::array::BooleanArray; use common_arrow::arrow::bitmap::MutableBitmap; use common_arrow::arrow::datatypes::DataType as ArrowType; +use common_datablocks::box_chunk_to_arc_chunk; use common_datablocks::DataBlock; use common_datablocks::HashMethod; use common_datablocks::HashMethodSerializer; @@ -72,9 +73,10 @@ impl LimitByStream { } let array = BooleanArray::from_data(ArrowType::Boolean, filter.into(), None); - let batch = block.clone().try_into()?; - let batch = arrow::compute::filter::filter_record_batch(&batch, &array)?; - Some(batch.try_into()).transpose() + let chunk = block.clone().try_into()?; + let chunk = arrow::compute::filter::filter_chunk(&chunk, &array)?; + let chunk = box_chunk_to_arc_chunk(chunk); + Some(DataBlock::from_chunk(block.schema(), &chunk)).transpose() } } diff --git a/common/streams/tests/it/source.rs b/common/streams/tests/it/source.rs index 5b64e9817c0a4..4f75dec05dc5c 100644 --- a/common/streams/tests/it/source.rs +++ b/common/streams/tests/it/source.rs @@ -203,8 +203,8 @@ async fn test_source_parquet() -> Result<()> { let col_b = Series::from_data(vec!["1", "1", "2", "1", "2", "3"]); let sample_block = DataBlock::create(schema.clone(), vec![col_a, col_b]); - use common_arrow::arrow::record_batch::RecordBatch; - let batch = RecordBatch::try_from(sample_block)?; + use common_arrow::arrow::chunk::Chunk; + let batch = Chunk::try_from(sample_block)?; use common_arrow::parquet::encoding::Encoding; let encodings = std::iter::repeat(Encoding::Plain) .take(arrow_schema.fields.len()) diff --git a/metasrv/src/api/grpc/grpc_service.rs b/metasrv/src/api/grpc/grpc_service.rs index e7975c6a07276..ea0c3be3371f7 100644 --- a/metasrv/src/api/grpc/grpc_service.rs +++ b/metasrv/src/api/grpc/grpc_service.rs @@ -14,10 +14,10 @@ use std::pin::Pin; use std::sync::Arc; +use std::task::Context; +use std::task::Poll; use common_arrow::arrow_format::flight::data::BasicAuth; -use common_arrow::arrow_format::ipc::flatbuffers::bitflags::_core::task::Context; -use common_arrow::arrow_format::ipc::flatbuffers::bitflags::_core::task::Poll; use common_grpc::GrpcClaim; use common_grpc::GrpcToken; use common_meta_grpc::MetaGrpcReadReq; diff --git a/query/src/api/rpc/flight_client_stream.rs b/query/src/api/rpc/flight_client_stream.rs index e94bef65f7d88..0f7ca64e56062 100644 --- a/query/src/api/rpc/flight_client_stream.rs +++ b/query/src/api/rpc/flight_client_stream.rs @@ -48,11 +48,11 @@ impl FlightDataStream { }; let batch = deserialize_batch( &flight_data, - arrow_schema, + &arrow_schema.fields, &ipc_schema, &Default::default(), )?; - batch.try_into() + DataBlock::from_chunk(&schema, &batch) } } }) @@ -78,11 +78,11 @@ impl FlightDataStream { let batch = deserialize_batch( &flight_data, - arrow_schema, + &arrow_schema.fields, &ipc_schema, &Default::default(), )?; - batch.try_into() + DataBlock::from_chunk(&schema, &batch) } }) } diff --git a/query/src/api/rpc/flight_service.rs b/query/src/api/rpc/flight_service.rs index 083500f684bc7..ee45e72f5d1fc 100644 --- a/query/src/api/rpc/flight_service.rs +++ b/query/src/api/rpc/flight_service.rs @@ -110,9 +110,9 @@ impl FlightService for DatabendQueryFlightService { FlightTicket::StreamTicket(steam_ticket) => { let (receiver, data_schema) = self.dispatcher.get_stream(&steam_ticket)?; let arrow_schema = data_schema.to_arrow(); - let ipc_fields = default_ipc_fields(arrow_schema.fields()); + let ipc_fields = default_ipc_fields(&arrow_schema.fields); - serialize_schema(&arrow_schema, &ipc_fields); + serialize_schema(&arrow_schema, Some(&ipc_fields)); Ok(RawResponse::new( Box::pin(FlightDataStream::create(receiver, ipc_fields)) diff --git a/query/src/pipelines/new/executor/executor_worker_context.rs b/query/src/pipelines/new/executor/executor_worker_context.rs index 52f883c823ad6..f286196f62734 100644 --- a/query/src/pipelines/new/executor/executor_worker_context.rs +++ b/query/src/pipelines/new/executor/executor_worker_context.rs @@ -13,13 +13,13 @@ // limitations under the License. use std::fmt::Debug; +use std::fmt::Formatter; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::sync::Arc; use std::task::Context; use std::task::Poll; -use common_arrow::arrow_format::ipc::flatbuffers::bitflags::_core::fmt::Formatter; use common_exception::ErrorCode; use common_exception::Result; use futures::task::ArcWake; diff --git a/query/src/pipelines/new/processors/transforms/transform_limit_by.rs b/query/src/pipelines/new/processors/transforms/transform_limit_by.rs index 4ae6ff7d294ae..bf2afc7032b2c 100644 --- a/query/src/pipelines/new/processors/transforms/transform_limit_by.rs +++ b/query/src/pipelines/new/processors/transforms/transform_limit_by.rs @@ -19,6 +19,7 @@ use common_arrow::arrow; use common_arrow::arrow::array::BooleanArray; use common_arrow::arrow::bitmap::MutableBitmap; use common_arrow::arrow::datatypes::DataType as ArrowType; +use common_datablocks::box_chunk_to_arc_chunk; use common_datablocks::DataBlock; use common_datablocks::HashMethod; use common_datablocks::HashMethodSerializer; @@ -79,8 +80,10 @@ impl Transform for TransformLimitBy { } let array = BooleanArray::from_data(ArrowType::Boolean, filter.into(), None); - let batch = block.try_into()?; - let batch = arrow::compute::filter::filter_record_batch(&batch, &array)?; - batch.try_into() + let schema = block.schema().clone(); + let chunk = block.try_into()?; + let chunk = arrow::compute::filter::filter_chunk(&chunk, &array)?; + let chunk = box_chunk_to_arc_chunk(chunk); + DataBlock::from_chunk(&schema, &chunk) } } diff --git a/query/src/sql/optimizer/group.rs b/query/src/sql/optimizer/group.rs index 50a518eddcd15..0833f31a11d25 100644 --- a/query/src/sql/optimizer/group.rs +++ b/query/src/sql/optimizer/group.rs @@ -13,9 +13,9 @@ // limitations under the License. use std::fmt::Debug; +use std::fmt::Formatter; use std::iter::Iterator; -use common_arrow::arrow_format::ipc::flatbuffers::bitflags::_core::fmt::Formatter; use common_exception::Result; use crate::sql::optimizer::m_expr::MExpr; diff --git a/query/src/sql/statements/analyzer_statement.rs b/query/src/sql/statements/analyzer_statement.rs index 4268305f04076..bb5bccaae9f2f 100644 --- a/query/src/sql/statements/analyzer_statement.rs +++ b/query/src/sql/statements/analyzer_statement.rs @@ -13,9 +13,9 @@ // limitations under the License. use std::fmt::Debug; +use std::fmt::Formatter; use std::sync::Arc; -use common_arrow::arrow_format::ipc::flatbuffers::bitflags::_core::fmt::Formatter; use common_datavalues::DataSchema; use common_datavalues::DataSchemaRef; use common_exception::Result; diff --git a/query/src/storages/fuse/io/block_reader.rs b/query/src/storages/fuse/io/block_reader.rs index 80b8d00cc7604..de8bc5baad782 100644 --- a/query/src/storages/fuse/io/block_reader.rs +++ b/query/src/storages/fuse/io/block_reader.rs @@ -96,7 +96,7 @@ impl BlockReader { .map(|idx| (row_group.column(idx).clone(), idx)); let fields = self.table_schema.fields(); - let arrow_fields = self.arrow_table_schema.fields(); + let arrow_fields = &self.arrow_table_schema.fields; let stream_len = self.file_len; let read_buffer_size = self.read_buffer_size; diff --git a/query/src/storages/fuse/io/block_writer.rs b/query/src/storages/fuse/io/block_writer.rs index 52353ff4239a2..48aefcdff67f6 100644 --- a/query/src/storages/fuse/io/block_writer.rs +++ b/query/src/storages/fuse/io/block_writer.rs @@ -13,11 +13,11 @@ // limitations under the License. // +use common_arrow::arrow::chunk::Chunk; use common_arrow::arrow::datatypes::DataType as ArrowDataType; use common_arrow::arrow::datatypes::Schema as ArrowSchema; use common_arrow::arrow::io::parquet::write::WriteOptions; use common_arrow::arrow::io::parquet::write::*; -use common_arrow::arrow::record_batch::RecordBatch; use common_arrow::parquet::encoding::Encoding; use common_datablocks::DataBlock; use common_exception::ErrorCode; @@ -36,9 +36,9 @@ pub async fn write_block( compression: Compression::Lz4, // let's begin with lz4 version: Version::V2, }; - let batch = RecordBatch::try_from(block)?; + let batch = Chunk::try_from(block)?; let encodings: Vec<_> = arrow_schema - .fields() + .fields .iter() .map(|f| col_encoding(&f.data_type)) .collect(); diff --git a/query/tests/it/tests/parquet.rs b/query/tests/it/tests/parquet.rs index 6784e8ebcc605..b72eff8dd91d0 100644 --- a/query/tests/it/tests/parquet.rs +++ b/query/tests/it/tests/parquet.rs @@ -14,13 +14,13 @@ use std::fs::File; +use common_arrow::arrow::chunk::Chunk; use common_arrow::arrow::io::parquet::write::write_file; use common_arrow::arrow::io::parquet::write::Compression; use common_arrow::arrow::io::parquet::write::Encoding; use common_arrow::arrow::io::parquet::write::RowGroupIterator; use common_arrow::arrow::io::parquet::write::Version; use common_arrow::arrow::io::parquet::write::WriteOptions; -use common_arrow::arrow::record_batch::RecordBatch; use common_datablocks::DataBlock; use common_datavalues::prelude::*; @@ -62,7 +62,7 @@ impl ParquetTestData { let mut batches = vec![]; let mut encodings = vec![]; for block in blocks { - batches.push(Ok(RecordBatch::try_from(block.clone()).unwrap())); + batches.push(Ok(Chunk::try_from(block.clone()).unwrap())); encodings.push(Encoding::Plain); }