From fbd0d0f173c1bf27a7ce63ab76948b0dd00962ab Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sun, 2 Jan 2022 21:55:05 +0100 Subject: [PATCH] Migrated to planus --- .cargo/audit.toml | 4 - .github/workflows/security.yml | 26 - Cargo.toml | 2 +- integration-testing/Cargo.toml | 2 +- .../integration_test.rs | 34 +- .../integration_test.rs | 44 +- src/io/flight/mod.rs | 60 +- src/io/ipc/mod.rs | 8 + src/io/ipc/read/array/binary.rs | 10 +- src/io/ipc/read/array/boolean.rs | 10 +- src/io/ipc/read/array/dictionary.rs | 10 +- src/io/ipc/read/array/fixed_size_binary.rs | 10 +- src/io/ipc/read/array/fixed_size_list.rs | 13 +- src/io/ipc/read/array/list.rs | 13 +- src/io/ipc/read/array/map.rs | 13 +- src/io/ipc/read/array/null.rs | 2 +- src/io/ipc/read/array/primitive.rs | 10 +- src/io/ipc/read/array/struct_.rs | 13 +- src/io/ipc/read/array/union.rs | 15 +- src/io/ipc/read/array/utf8.rs | 10 +- src/io/ipc/read/common.rs | 29 +- src/io/ipc/read/deserialize.rs | 13 +- src/io/ipc/read/mod.rs | 7 +- src/io/ipc/read/read_basic.rs | 40 +- src/io/ipc/read/reader.rs | 80 +- src/io/ipc/read/schema.rs | 349 ++++---- src/io/ipc/read/stream.rs | 56 +- src/io/ipc/write/common.rs | 151 ++-- src/io/ipc/write/schema.rs | 845 ++++++------------ src/io/ipc/write/serialize.rs | 84 +- src/io/ipc/write/writer.rs | 48 +- src/io/parquet/read/schema/metadata.rs | 21 +- 32 files changed, 773 insertions(+), 1259 deletions(-) diff --git a/.cargo/audit.toml b/.cargo/audit.toml index 414802516a5..aa5492c1beb 100644 --- a/.cargo/audit.toml +++ b/.cargo/audit.toml @@ -10,8 +10,4 @@ ignore = [ # Therefore, this advisory does not affect us. "RUSTSEC-2020-0071", "RUSTSEC-2020-0159", # same as previous - - # this cannot be addressed, only mitigated. - # See [.github/workflows/security.yml] for details on how we mitigate this. - "RUSTSEC-2021-0122", ] diff --git a/.github/workflows/security.yml b/.github/workflows/security.yml index eca8e67a8a4..a1d19e6f8c6 100644 --- a/.github/workflows/security.yml +++ b/.github/workflows/security.yml @@ -8,29 +8,3 @@ jobs: - uses: actions-rs/audit-check@v1 with: token: ${{ secrets.GITHUB_TOKEN }} - - # mitigation for RUSTSEC-2021-0122 - # flatbuffers' usage of `unsafe` is problematic and a risk. - # This performs a round-trip over IPC (that uses flatbuffers) for some arrow types - # using miri, which hits much of `flatbuffers` usage in this crate. - miri-checks: - name: RUSTSEC-2021-0122 mitigation - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v2 - with: - submodules: true # needed to test IPC, which are located in a submodule - - uses: actions-rs/toolchain@v1 - with: - toolchain: nightly-2021-12-10 - override: true - - uses: Swatinem/rust-cache@v1 - with: - key: key1 - - name: Install Miri - run: | - rustup component add miri - cargo miri setup - - - name: Run - run: MIRIFLAGS="-Zmiri-disable-stacked-borrows -Zmiri-disable-isolation" cargo miri test --tests --features io_ipc,io_ipc_compression,io_json_integration io::ipc::write::file::write_100_nested diff --git a/Cargo.toml b/Cargo.toml index f3a3f4096f6..5ec6516fcc1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,7 +45,7 @@ indexmap = { version = "^1.6", optional = true } # used to print columns in a nice columnar format comfy-table = { version = "5.0", optional = true, default-features = false } -arrow-format = { version = "0.3.0", optional = true, features = ["ipc"] } +arrow-format = { git = "https://github.com/DataEngineeringLabs/arrow-format", branch = "planus", optional = true, features = ["ipc"] } hex = { version = "^0.4", optional = true } diff --git a/integration-testing/Cargo.toml b/integration-testing/Cargo.toml index a9487cb37bb..3aa800146b5 100644 --- a/integration-testing/Cargo.toml +++ b/integration-testing/Cargo.toml @@ -29,7 +29,7 @@ logging = ["tracing-subscriber"] [dependencies] arrow2 = { path = "../", features = ["io_ipc", "io_ipc_compression", "io_flight", "io_json_integration"] } -arrow-format = { version = "0.3.0", features = ["full"] } +arrow-format = { git = "https://github.com/DataEngineeringLabs/arrow-format", branch = "planus", features = ["full"] } async-trait = "0.1.41" clap = "2.33" futures = "0.3" diff --git a/integration-testing/src/flight_client_scenarios/integration_test.rs b/integration-testing/src/flight_client_scenarios/integration_test.rs index ae548eb4837..ce015f3ae7a 100644 --- a/integration-testing/src/flight_client_scenarios/integration_test.rs +++ b/integration-testing/src/flight_client_scenarios/integration_test.rs @@ -30,12 +30,14 @@ use arrow2::{ ipc::IpcField, }, }; -use arrow_format::flight::data::{ - flight_descriptor::DescriptorType, FlightData, FlightDescriptor, Location, Ticket, -}; use arrow_format::flight::service::flight_service_client::FlightServiceClient; use arrow_format::ipc; -use arrow_format::ipc::Message::MessageHeader; +use arrow_format::{ + flight::data::{ + flight_descriptor::DescriptorType, FlightData, FlightDescriptor, Location, Ticket, + }, + ipc::planus::ReadAsRoot, +}; use futures::{channel::mpsc, sink::SinkExt, stream, StreamExt}; use tonic::{Request, Streaming}; @@ -267,25 +269,19 @@ async fn receive_batch_flight_data( ) -> Option { let mut data = resp.next().await?.ok()?; let mut message = - ipc::Message::root_as_message(&data.data_header[..]).expect("Error parsing first message"); + ipc::MessageRef::read_as_root(&data.data_header).expect("Error parsing first message"); - while message.header_type() == MessageHeader::DictionaryBatch { + while let ipc::MessageHeaderRef::DictionaryBatch(batch) = message + .header() + .expect("Header to be valid flatbuffers") + .expect("Header to be present") + { let mut reader = std::io::Cursor::new(&data.data_body); - read::read_dictionary( - message - .header_as_dictionary_batch() - .expect("Error parsing dictionary"), - fields, - ipc_schema, - dictionaries, - &mut reader, - 0, - ) - .expect("Error reading dictionary"); + read::read_dictionary(batch, fields, ipc_schema, dictionaries, &mut reader, 0) + .expect("Error reading dictionary"); data = resp.next().await?.ok()?; - message = - ipc::Message::root_as_message(&data.data_header[..]).expect("Error parsing message"); + message = ipc::MessageRef::read_as_root(&data.data_header).expect("Error parsing message"); } Some(data) diff --git a/integration-testing/src/flight_server_scenarios/integration_test.rs b/integration-testing/src/flight_server_scenarios/integration_test.rs index 4c0c4683b6e..98c26a56c4f 100644 --- a/integration-testing/src/flight_server_scenarios/integration_test.rs +++ b/integration-testing/src/flight_server_scenarios/integration_test.rs @@ -27,8 +27,8 @@ use arrow2::io::ipc::IpcSchema; use arrow_format::flight::data::flight_descriptor::*; use arrow_format::flight::data::*; use arrow_format::flight::service::flight_service_server::*; -use arrow_format::ipc::Message::{root_as_message, Message, MessageHeader}; -use arrow_format::ipc::Schema as ArrowSchema; +use arrow_format::ipc::planus::ReadAsRoot; +use arrow_format::ipc::MessageHeaderRef; use arrow2::{datatypes::*, io::flight::serialize_schema_to_info, io::ipc}; @@ -276,25 +276,21 @@ async fn send_app_metadata( } async fn record_batch_from_message( - message: Message<'_>, + batch: arrow_format::ipc::RecordBatchRef<'_>, data_body: &[u8], fields: &[Field], ipc_schema: &IpcSchema, dictionaries: &mut Dictionaries, ) -> Result>, Status> { - let ipc_batch = message - .header_as_record_batch() - .ok_or_else(|| Status::internal("Could not parse message header as record batch"))?; - let mut reader = std::io::Cursor::new(data_body); let arrow_batch_result = ipc::read::read_record_batch( - ipc_batch, + batch, fields, ipc_schema, None, dictionaries, - ArrowSchema::MetadataVersion::V5, + arrow_format::ipc::MetadataVersion::V5, &mut reader, 0, ); @@ -303,20 +299,16 @@ async fn record_batch_from_message( } async fn dictionary_from_message( - message: Message<'_>, + dict_batch: arrow_format::ipc::DictionaryBatchRef<'_>, data_body: &[u8], fields: &[Field], ipc_schema: &IpcSchema, dictionaries: &mut Dictionaries, ) -> Result<(), Status> { - let ipc_batch = message - .header_as_dictionary_batch() - .ok_or_else(|| Status::internal("Could not parse message header as dictionary batch"))?; - let mut reader = std::io::Cursor::new(data_body); let dictionary_batch_result = - ipc::read::read_dictionary(ipc_batch, fields, ipc_schema, dictionaries, &mut reader, 0); + ipc::read::read_dictionary(dict_batch, fields, ipc_schema, dictionaries, &mut reader, 0); dictionary_batch_result .map_err(|e| Status::internal(format!("Could not convert to Dictionary: {:?}", e))) } @@ -335,20 +327,28 @@ async fn save_uploaded_chunks( let mut dictionaries = Default::default(); while let Some(Ok(data)) = input_stream.next().await { - let message = root_as_message(&data.data_header[..]) + let message = arrow_format::ipc::MessageRef::read_as_root(&data.data_header) .map_err(|e| Status::internal(format!("Could not parse message: {:?}", e)))?; + let header = message + .header() + .map_err(|x| Status::internal(x.to_string()))? + .ok_or_else(|| { + Status::internal( + "Unable to convert flight data header to a record batch".to_string(), + ) + })?; - match message.header_type() { - MessageHeader::Schema => { + match header { + MessageHeaderRef::Schema(_) => { return Err(Status::internal( "Not expecting a schema when messages are read", )) } - MessageHeader::RecordBatch => { + MessageHeaderRef::RecordBatch(batch) => { send_app_metadata(&mut response_tx, &data.app_metadata).await?; let batch = record_batch_from_message( - message, + batch, &data.data_body, &schema.fields, &ipc_schema, @@ -358,9 +358,9 @@ async fn save_uploaded_chunks( chunks.push(batch); } - MessageHeader::DictionaryBatch => { + MessageHeaderRef::DictionaryBatch(dict_batch) => { dictionary_from_message( - message, + dict_batch, &data.data_body, &schema.fields, &ipc_schema, diff --git a/src/io/flight/mod.rs b/src/io/flight/mod.rs index d1e35d32920..a9a02b87792 100644 --- a/src/io/flight/mod.rs +++ b/src/io/flight/mod.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use arrow_format::flight::data::{FlightData, SchemaResult}; use arrow_format::ipc; +use arrow_format::ipc::planus::ReadAsRoot; use crate::{ array::Array, @@ -84,19 +85,7 @@ fn schema_as_encoded_data(schema: &Schema, ipc_fields: &[IpcField]) -> EncodedDa /// Deserialize an IPC message into [`Schema`], [`IpcSchema`]. /// Use to deserialize [`FlightData::data_header`] and [`SchemaResult::schema`]. pub fn deserialize_schemas(bytes: &[u8]) -> Result<(Schema, IpcSchema)> { - if let Ok(ipc) = ipc::Message::root_as_message(bytes) { - if let Some(schemas) = ipc.header_as_schema().map(read::fb_to_schema) { - schemas - } else { - Err(ArrowError::OutOfSpec( - "Unable to get head as schema".to_string(), - )) - } - } else { - Err(ArrowError::OutOfSpec( - "Unable to get root as message".to_string(), - )) - } + read::deserialize_schema(bytes) } /// Deserializes [`FlightData`] to [`Chunk`]. @@ -107,29 +96,30 @@ pub fn deserialize_batch( dictionaries: &read::Dictionaries, ) -> Result>> { // check that the data_header is a record batch message - let message = ipc::Message::root_as_message(&data.data_header[..]).map_err(|err| { - ArrowError::OutOfSpec(format!("Unable to get root as message: {:?}", err)) - })?; + let message = + arrow_format::ipc::MessageRef::read_as_root(&data.data_header).map_err(|err| { + ArrowError::OutOfSpec(format!("Unable to get root as message: {:?}", err)) + })?; let mut reader = std::io::Cursor::new(&data.data_body); - message - .header_as_record_batch() - .ok_or_else(|| { - ArrowError::OutOfSpec( - "Unable to convert flight data header to a record batch".to_string(), - ) - }) - .map(|batch| { - read::read_record_batch( - batch, - fields, - ipc_schema, - None, - dictionaries, - ipc::Schema::MetadataVersion::V5, - &mut reader, - 0, - ) - })? + let version = message.version()?; + + match message.header()?.ok_or_else(|| { + ArrowError::oos("Unable to convert flight data header to a record batch".to_string()) + })? { + ipc::MessageHeaderRef::RecordBatch(batch) => read::read_record_batch( + batch, + fields, + ipc_schema, + None, + dictionaries, + version, + &mut reader, + 0, + ), + _ => Err(ArrowError::nyi( + "flight currently only supports reading RecordBatch messages", + )), + } } diff --git a/src/io/ipc/mod.rs b/src/io/ipc/mod.rs index 6505bb9fb8b..64443afc002 100644 --- a/src/io/ipc/mod.rs +++ b/src/io/ipc/mod.rs @@ -76,6 +76,8 @@ //! [2](https://github.com/jorgecarleitao/arrow2/blob/main/examples/ipc_file_write.rs), //! [3](https://github.com/jorgecarleitao/arrow2/tree/main/examples/ipc_pyarrow)). +use crate::error::ArrowError; + mod compression; mod endianess; @@ -100,3 +102,9 @@ pub struct IpcSchema { pub fields: Vec, pub is_little_endian: bool, } + +impl From for ArrowError { + fn from(error: arrow_format::ipc::planus::Error) -> Self { + ArrowError::OutOfSpec(error.to_string()) + } +} diff --git a/src/io/ipc/read/array/binary.rs b/src/io/ipc/read/array/binary.rs index b758f43550a..f19069fd351 100644 --- a/src/io/ipc/read/array/binary.rs +++ b/src/io/ipc/read/array/binary.rs @@ -1,24 +1,22 @@ use std::collections::VecDeque; use std::io::{Read, Seek}; -use arrow_format::ipc; - use crate::array::{BinaryArray, Offset}; use crate::buffer::Buffer; use crate::datatypes::DataType; use crate::error::{ArrowError, Result}; -use super::super::deserialize::Node; use super::super::read_basic::*; +use super::super::{Compression, IpcBuffer, Node}; pub fn read_binary( field_nodes: &mut VecDeque, data_type: DataType, - buffers: &mut VecDeque<&ipc::Schema::Buffer>, + buffers: &mut VecDeque, reader: &mut R, block_offset: u64, is_little_endian: bool, - compression: Option, + compression: Option, ) -> Result> { let field_node = field_nodes.pop_front().ok_or_else(|| { ArrowError::oos(format!( @@ -64,7 +62,7 @@ pub fn read_binary( pub fn skip_binary( field_nodes: &mut VecDeque, - buffers: &mut VecDeque<&ipc::Schema::Buffer>, + buffers: &mut VecDeque, ) -> Result<()> { let _ = field_nodes.pop_front().ok_or_else(|| { ArrowError::oos( diff --git a/src/io/ipc/read/array/boolean.rs b/src/io/ipc/read/array/boolean.rs index 2c9db797647..33c49694dad 100644 --- a/src/io/ipc/read/array/boolean.rs +++ b/src/io/ipc/read/array/boolean.rs @@ -1,23 +1,21 @@ use std::collections::VecDeque; use std::io::{Read, Seek}; -use arrow_format::ipc; - use crate::array::BooleanArray; use crate::datatypes::DataType; use crate::error::{ArrowError, Result}; -use super::super::deserialize::Node; use super::super::read_basic::*; +use super::super::{Compression, IpcBuffer, Node}; pub fn read_boolean( field_nodes: &mut VecDeque, data_type: DataType, - buffers: &mut VecDeque<&ipc::Schema::Buffer>, + buffers: &mut VecDeque, reader: &mut R, block_offset: u64, is_little_endian: bool, - compression: Option, + compression: Option, ) -> Result { let field_node = field_nodes.pop_front().ok_or_else(|| { ArrowError::oos(format!( @@ -49,7 +47,7 @@ pub fn read_boolean( pub fn skip_boolean( field_nodes: &mut VecDeque, - buffers: &mut VecDeque<&ipc::Schema::Buffer>, + buffers: &mut VecDeque, ) -> Result<()> { let _ = field_nodes.pop_front().ok_or_else(|| { ArrowError::oos( diff --git a/src/io/ipc/read/array/dictionary.rs b/src/io/ipc/read/array/dictionary.rs index f083741a8f8..9d8f04e064d 100644 --- a/src/io/ipc/read/array/dictionary.rs +++ b/src/io/ipc/read/array/dictionary.rs @@ -2,24 +2,22 @@ use std::collections::{HashSet, VecDeque}; use std::convert::TryInto; use std::io::{Read, Seek}; -use arrow_format::ipc; - use crate::array::{DictionaryArray, DictionaryKey}; use crate::error::{ArrowError, Result}; -use super::super::deserialize::Node; use super::super::Dictionaries; +use super::super::{Compression, IpcBuffer, Node}; use super::{read_primitive, skip_primitive}; #[allow(clippy::too_many_arguments)] pub fn read_dictionary( field_nodes: &mut VecDeque, id: Option, - buffers: &mut VecDeque<&ipc::Schema::Buffer>, + buffers: &mut VecDeque, reader: &mut R, dictionaries: &Dictionaries, block_offset: u64, - compression: Option, + compression: Option, is_little_endian: bool, ) -> Result> where @@ -56,7 +54,7 @@ where pub fn skip_dictionary( field_nodes: &mut VecDeque, - buffers: &mut VecDeque<&ipc::Schema::Buffer>, + buffers: &mut VecDeque, ) -> Result<()> { skip_primitive(field_nodes, buffers) } diff --git a/src/io/ipc/read/array/fixed_size_binary.rs b/src/io/ipc/read/array/fixed_size_binary.rs index ce7e1adf026..0d5a84c65c7 100644 --- a/src/io/ipc/read/array/fixed_size_binary.rs +++ b/src/io/ipc/read/array/fixed_size_binary.rs @@ -1,23 +1,21 @@ use std::collections::VecDeque; use std::io::{Read, Seek}; -use arrow_format::ipc; - use crate::array::FixedSizeBinaryArray; use crate::datatypes::DataType; use crate::error::{ArrowError, Result}; -use super::super::deserialize::Node; use super::super::read_basic::*; +use super::super::{Compression, IpcBuffer, Node}; pub fn read_fixed_size_binary( field_nodes: &mut VecDeque, data_type: DataType, - buffers: &mut VecDeque<&ipc::Schema::Buffer>, + buffers: &mut VecDeque, reader: &mut R, block_offset: u64, is_little_endian: bool, - compression: Option, + compression: Option, ) -> Result { let field_node = field_nodes.pop_front().ok_or_else(|| { ArrowError::oos(format!( @@ -50,7 +48,7 @@ pub fn read_fixed_size_binary( pub fn skip_fixed_size_binary( field_nodes: &mut VecDeque, - buffers: &mut VecDeque<&ipc::Schema::Buffer>, + buffers: &mut VecDeque, ) -> Result<()> { let _ = field_nodes.pop_front().ok_or_else(|| { ArrowError::oos( diff --git a/src/io/ipc/read/array/fixed_size_list.rs b/src/io/ipc/read/array/fixed_size_list.rs index 02e0d986b07..0247b1e6ade 100644 --- a/src/io/ipc/read/array/fixed_size_list.rs +++ b/src/io/ipc/read/array/fixed_size_list.rs @@ -1,29 +1,28 @@ use std::collections::VecDeque; use std::io::{Read, Seek}; -use arrow_format::ipc; - use crate::array::FixedSizeListArray; use crate::datatypes::DataType; use crate::error::{ArrowError, Result}; use super::super::super::IpcField; -use super::super::deserialize::{read, skip, Node}; +use super::super::deserialize::{read, skip}; use super::super::read_basic::*; use super::super::Dictionaries; +use super::super::{Compression, IpcBuffer, Node, Version}; #[allow(clippy::too_many_arguments)] pub fn read_fixed_size_list( field_nodes: &mut VecDeque, data_type: DataType, ipc_field: &IpcField, - buffers: &mut VecDeque<&ipc::Schema::Buffer>, + buffers: &mut VecDeque, reader: &mut R, dictionaries: &Dictionaries, block_offset: u64, is_little_endian: bool, - compression: Option, - version: ipc::Schema::MetadataVersion, + compression: Option, + version: Version, ) -> Result { let field_node = field_nodes.pop_front().ok_or_else(|| { ArrowError::oos(format!( @@ -61,7 +60,7 @@ pub fn read_fixed_size_list( pub fn skip_fixed_size_list( field_nodes: &mut VecDeque, data_type: &DataType, - buffers: &mut VecDeque<&ipc::Schema::Buffer>, + buffers: &mut VecDeque, ) -> Result<()> { let _ = field_nodes.pop_front().ok_or_else(|| { ArrowError::oos( diff --git a/src/io/ipc/read/array/list.rs b/src/io/ipc/read/array/list.rs index 666417b1c4a..46d2fdc5768 100644 --- a/src/io/ipc/read/array/list.rs +++ b/src/io/ipc/read/array/list.rs @@ -2,30 +2,29 @@ use std::collections::VecDeque; use std::convert::TryInto; use std::io::{Read, Seek}; -use arrow_format::ipc; - use crate::array::{ListArray, Offset}; use crate::buffer::Buffer; use crate::datatypes::DataType; use crate::error::{ArrowError, Result}; use super::super::super::IpcField; -use super::super::deserialize::{read, skip, Node}; +use super::super::deserialize::{read, skip}; use super::super::read_basic::*; use super::super::Dictionaries; +use super::super::{Compression, IpcBuffer, Node, Version}; #[allow(clippy::too_many_arguments)] pub fn read_list( field_nodes: &mut VecDeque, data_type: DataType, ipc_field: &IpcField, - buffers: &mut VecDeque<&ipc::Schema::Buffer>, + buffers: &mut VecDeque, reader: &mut R, dictionaries: &Dictionaries, block_offset: u64, is_little_endian: bool, - compression: Option, - version: ipc::Schema::MetadataVersion, + compression: Option, + version: Version, ) -> Result> where Vec: TryInto, @@ -77,7 +76,7 @@ where pub fn skip_list( field_nodes: &mut VecDeque, data_type: &DataType, - buffers: &mut VecDeque<&ipc::Schema::Buffer>, + buffers: &mut VecDeque, ) -> Result<()> { let _ = field_nodes.pop_front().ok_or_else(|| { ArrowError::oos("IPC: unable to fetch the field for list. The file or stream is corrupted.") diff --git a/src/io/ipc/read/array/map.rs b/src/io/ipc/read/array/map.rs index 0163f61bb75..c22394d0fb5 100644 --- a/src/io/ipc/read/array/map.rs +++ b/src/io/ipc/read/array/map.rs @@ -1,30 +1,29 @@ use std::collections::VecDeque; use std::io::{Read, Seek}; -use arrow_format::ipc; - use crate::array::MapArray; use crate::buffer::Buffer; use crate::datatypes::DataType; use crate::error::{ArrowError, Result}; use super::super::super::IpcField; -use super::super::deserialize::{read, skip, Node}; +use super::super::deserialize::{read, skip}; use super::super::read_basic::*; use super::super::Dictionaries; +use super::super::{Compression, IpcBuffer, Node, Version}; #[allow(clippy::too_many_arguments)] pub fn read_map( field_nodes: &mut VecDeque, data_type: DataType, ipc_field: &IpcField, - buffers: &mut VecDeque<&ipc::Schema::Buffer>, + buffers: &mut VecDeque, reader: &mut R, dictionaries: &Dictionaries, block_offset: u64, is_little_endian: bool, - compression: Option, - version: ipc::Schema::MetadataVersion, + compression: Option, + version: Version, ) -> Result { let field_node = field_nodes.pop_front().ok_or_else(|| { ArrowError::oos(format!( @@ -73,7 +72,7 @@ pub fn read_map( pub fn skip_map( field_nodes: &mut VecDeque, data_type: &DataType, - buffers: &mut VecDeque<&ipc::Schema::Buffer>, + buffers: &mut VecDeque, ) -> Result<()> { let _ = field_nodes.pop_front().ok_or_else(|| { ArrowError::oos("IPC: unable to fetch the field for map. The file or stream is corrupted.") diff --git a/src/io/ipc/read/array/null.rs b/src/io/ipc/read/array/null.rs index 06bdc5f9725..9ff7da6f775 100644 --- a/src/io/ipc/read/array/null.rs +++ b/src/io/ipc/read/array/null.rs @@ -6,7 +6,7 @@ use crate::{ error::{ArrowError, Result}, }; -use super::super::deserialize::Node; +use super::super::Node; pub fn read_null(field_nodes: &mut VecDeque, data_type: DataType) -> Result { let field_node = field_nodes.pop_front().ok_or_else(|| { diff --git a/src/io/ipc/read/array/primitive.rs b/src/io/ipc/read/array/primitive.rs index 039c853496b..a80905d3859 100644 --- a/src/io/ipc/read/array/primitive.rs +++ b/src/io/ipc/read/array/primitive.rs @@ -1,23 +1,21 @@ use std::io::{Read, Seek}; use std::{collections::VecDeque, convert::TryInto}; -use arrow_format::ipc; - use crate::datatypes::DataType; use crate::error::{ArrowError, Result}; use crate::{array::PrimitiveArray, types::NativeType}; -use super::super::deserialize::Node; use super::super::read_basic::*; +use super::super::{Compression, IpcBuffer, Node}; pub fn read_primitive( field_nodes: &mut VecDeque, data_type: DataType, - buffers: &mut VecDeque<&ipc::Schema::Buffer>, + buffers: &mut VecDeque, reader: &mut R, block_offset: u64, is_little_endian: bool, - compression: Option, + compression: Option, ) -> Result> where Vec: TryInto, @@ -51,7 +49,7 @@ where pub fn skip_primitive( field_nodes: &mut VecDeque, - buffers: &mut VecDeque<&ipc::Schema::Buffer>, + buffers: &mut VecDeque, ) -> Result<()> { let _ = field_nodes.pop_front().ok_or_else(|| { ArrowError::oos( diff --git a/src/io/ipc/read/array/struct_.rs b/src/io/ipc/read/array/struct_.rs index 435534a38d7..6594b331008 100644 --- a/src/io/ipc/read/array/struct_.rs +++ b/src/io/ipc/read/array/struct_.rs @@ -1,29 +1,28 @@ use std::collections::VecDeque; use std::io::{Read, Seek}; -use arrow_format::ipc; - use crate::array::StructArray; use crate::datatypes::DataType; use crate::error::{ArrowError, Result}; use super::super::super::IpcField; -use super::super::deserialize::{read, skip, Node}; +use super::super::deserialize::{read, skip}; use super::super::read_basic::*; use super::super::Dictionaries; +use super::super::{Compression, IpcBuffer, Node, Version}; #[allow(clippy::too_many_arguments)] pub fn read_struct( field_nodes: &mut VecDeque, data_type: DataType, ipc_field: &IpcField, - buffers: &mut VecDeque<&ipc::Schema::Buffer>, + buffers: &mut VecDeque, reader: &mut R, dictionaries: &Dictionaries, block_offset: u64, is_little_endian: bool, - compression: Option, - version: ipc::Schema::MetadataVersion, + compression: Option, + version: Version, ) -> Result { let field_node = field_nodes.pop_front().ok_or_else(|| { ArrowError::oos(format!( @@ -68,7 +67,7 @@ pub fn read_struct( pub fn skip_struct( field_nodes: &mut VecDeque, data_type: &DataType, - buffers: &mut VecDeque<&ipc::Schema::Buffer>, + buffers: &mut VecDeque, ) -> Result<()> { let _ = field_nodes.pop_front().ok_or_else(|| { ArrowError::oos( diff --git a/src/io/ipc/read/array/union.rs b/src/io/ipc/read/array/union.rs index bb270f053fe..b23f3ec9a7a 100644 --- a/src/io/ipc/read/array/union.rs +++ b/src/io/ipc/read/array/union.rs @@ -1,30 +1,29 @@ use std::collections::VecDeque; use std::io::{Read, Seek}; -use arrow_format::ipc; - use crate::array::UnionArray; use crate::datatypes::DataType; use crate::datatypes::UnionMode::Dense; use crate::error::{ArrowError, Result}; use super::super::super::IpcField; -use super::super::deserialize::{read, skip, Node}; +use super::super::deserialize::{read, skip}; use super::super::read_basic::*; use super::super::Dictionaries; +use super::super::{Compression, IpcBuffer, Node, Version}; #[allow(clippy::too_many_arguments)] pub fn read_union( field_nodes: &mut VecDeque, data_type: DataType, ipc_field: &IpcField, - buffers: &mut VecDeque<&ipc::Schema::Buffer>, + buffers: &mut VecDeque, reader: &mut R, dictionaries: &Dictionaries, block_offset: u64, is_little_endian: bool, - compression: Option, - version: ipc::Schema::MetadataVersion, + compression: Option, + version: Version, ) -> Result { let field_node = field_nodes.pop_front().ok_or_else(|| { ArrowError::oos(format!( @@ -33,7 +32,7 @@ pub fn read_union( )) })?; - if version != ipc::Schema::MetadataVersion::V5 { + if version != Version::V5 { let _ = buffers .pop_front() .ok_or_else(|| ArrowError::oos("IPC: missing validity buffer."))?; @@ -92,7 +91,7 @@ pub fn read_union( pub fn skip_union( field_nodes: &mut VecDeque, data_type: &DataType, - buffers: &mut VecDeque<&ipc::Schema::Buffer>, + buffers: &mut VecDeque, ) -> Result<()> { let _ = field_nodes.pop_front().ok_or_else(|| { ArrowError::oos( diff --git a/src/io/ipc/read/array/utf8.rs b/src/io/ipc/read/array/utf8.rs index 8c5cee4fea7..3d9a6b4e641 100644 --- a/src/io/ipc/read/array/utf8.rs +++ b/src/io/ipc/read/array/utf8.rs @@ -1,24 +1,22 @@ use std::collections::VecDeque; use std::io::{Read, Seek}; -use arrow_format::ipc; - use crate::array::{Offset, Utf8Array}; use crate::buffer::Buffer; use crate::datatypes::DataType; use crate::error::{ArrowError, Result}; -use super::super::deserialize::Node; use super::super::read_basic::*; +use super::super::{Compression, IpcBuffer, Node}; pub fn read_utf8( field_nodes: &mut VecDeque, data_type: DataType, - buffers: &mut VecDeque<&ipc::Schema::Buffer>, + buffers: &mut VecDeque, reader: &mut R, block_offset: u64, is_little_endian: bool, - compression: Option, + compression: Option, ) -> Result> { let field_node = field_nodes.pop_front().ok_or_else(|| { ArrowError::oos(format!( @@ -64,7 +62,7 @@ pub fn read_utf8( pub fn skip_utf8( field_nodes: &mut VecDeque, - buffers: &mut VecDeque<&ipc::Schema::Buffer>, + buffers: &mut VecDeque, ) -> Result<()> { let _ = field_nodes.pop_front().ok_or_else(|| { ArrowError::oos("IPC: unable to fetch the field for utf8. The file or stream is corrupted.") diff --git a/src/io/ipc/read/common.rs b/src/io/ipc/read/common.rs index c0c7c5ab2b3..94090b7a1a1 100644 --- a/src/io/ipc/read/common.rs +++ b/src/io/ipc/read/common.rs @@ -2,8 +2,7 @@ use std::collections::VecDeque; use std::io::{Read, Seek}; use std::sync::Arc; -use arrow_format::ipc; -use arrow_format::ipc::Schema::MetadataVersion; +use arrow_format; use crate::array::*; use crate::chunk::Chunk; @@ -79,23 +78,23 @@ impl<'a, A, I: Iterator> Iterator for ProjectionIter<'a, A, I> { /// Panics iff the projection is not in increasing order (e.g. `[1, 0]` nor `[0, 1, 1]` are valid) #[allow(clippy::too_many_arguments)] pub fn read_record_batch( - batch: ipc::Message::RecordBatch, + batch: arrow_format::ipc::RecordBatchRef, fields: &[Field], ipc_schema: &IpcSchema, projection: Option<&[usize]>, dictionaries: &Dictionaries, - version: MetadataVersion, + version: arrow_format::ipc::MetadataVersion, reader: &mut R, block_offset: u64, ) -> Result>> { assert_eq!(fields.len(), ipc_schema.fields.len()); let buffers = batch - .buffers() - .ok_or_else(|| ArrowError::oos("Unable to get buffers from IPC RecordBatch"))?; - let mut buffers: VecDeque<&ipc::Schema::Buffer> = buffers.iter().collect(); + .buffers()? + .ok_or_else(|| ArrowError::oos("IPC RecordBatch must contain buffers"))?; + let mut buffers: VecDeque = buffers.iter().collect(); let field_nodes = batch - .nodes() + .nodes()? .ok_or_else(|| ArrowError::oos("IPC RecordBatch must contain field nodes"))?; let mut field_nodes = field_nodes.iter().collect::>(); @@ -114,7 +113,7 @@ pub fn read_record_batch( dictionaries, block_offset, ipc_schema.is_little_endian, - batch.compression(), + batch.compression()?, version, )?)), ProjectionResult::NotSelected((field, _)) => { @@ -139,7 +138,7 @@ pub fn read_record_batch( dictionaries, block_offset, ipc_schema.is_little_endian, - batch.compression(), + batch.compression()?, version, ) }) @@ -204,20 +203,20 @@ fn first_dict_field<'a>( /// Read the dictionary from the buffer and provided metadata, /// updating the `dictionaries` with the resulting dictionary pub fn read_dictionary( - batch: ipc::Message::DictionaryBatch, + batch: arrow_format::ipc::DictionaryBatchRef, fields: &[Field], ipc_schema: &IpcSchema, dictionaries: &mut Dictionaries, reader: &mut R, block_offset: u64, ) -> Result<()> { - if batch.isDelta() { + if batch.is_delta()? { return Err(ArrowError::NotYetImplemented( "delta dictionary batches not supported".to_string(), )); } - let id = batch.id(); + let id = batch.id()?; let (first_field, first_ipc_field) = first_dict_field(id, fields, &ipc_schema.fields)?; // As the dictionary batch does not contain the type of the @@ -233,13 +232,13 @@ pub fn read_dictionary( }; let columns = read_record_batch( batch - .data() + .data()? .ok_or_else(|| ArrowError::oos("The dictionary batch must have data."))?, &fields, &ipc_schema, None, dictionaries, - MetadataVersion::V5, + arrow_format::ipc::MetadataVersion::V5, reader, block_offset, )?; diff --git a/src/io/ipc/read/deserialize.rs b/src/io/ipc/read/deserialize.rs index 434817eb296..56bd0632128 100644 --- a/src/io/ipc/read/deserialize.rs +++ b/src/io/ipc/read/deserialize.rs @@ -4,8 +4,8 @@ use std::{ sync::Arc, }; -use arrow_format::ipc; -use arrow_format::ipc::{Message::BodyCompression, Schema::MetadataVersion}; +use arrow_format::ipc::BodyCompressionRef; +use arrow_format::ipc::MetadataVersion; use crate::array::*; use crate::datatypes::{DataType, Field, PhysicalType}; @@ -13,20 +13,19 @@ use crate::error::Result; use crate::io::ipc::IpcField; use super::{array::*, Dictionaries}; - -pub type Node<'a> = &'a ipc::Message::FieldNode; +use super::{IpcBuffer, Node}; #[allow(clippy::too_many_arguments)] pub fn read( field_nodes: &mut VecDeque, field: &Field, ipc_field: &IpcField, - buffers: &mut VecDeque<&ipc::Schema::Buffer>, + buffers: &mut VecDeque, reader: &mut R, dictionaries: &Dictionaries, block_offset: u64, is_little_endian: bool, - compression: Option, + compression: Option, version: MetadataVersion, ) -> Result> { use PhysicalType::*; @@ -215,7 +214,7 @@ pub fn read( pub fn skip( field_nodes: &mut VecDeque, data_type: &DataType, - buffers: &mut VecDeque<&ipc::Schema::Buffer>, + buffers: &mut VecDeque, ) -> Result<()> { use PhysicalType::*; match data_type.to_physical_type() { diff --git a/src/io/ipc/read/mod.rs b/src/io/ipc/read/mod.rs index f36fbb6283e..710376a8d95 100644 --- a/src/io/ipc/read/mod.rs +++ b/src/io/ipc/read/mod.rs @@ -19,8 +19,13 @@ mod stream; pub use common::{read_dictionary, read_record_batch}; pub use reader::{read_file_metadata, FileMetadata, FileReader}; -pub use schema::fb_to_schema; +pub use schema::deserialize_schema; pub use stream::{read_stream_metadata, StreamMetadata, StreamReader, StreamState}; // how dictionaries are tracked in this crate pub type Dictionaries = HashMap>; + +pub(crate) type Node<'a> = arrow_format::ipc::FieldNodeRef<'a>; +pub(crate) type IpcBuffer<'a> = arrow_format::ipc::BufferRef<'a>; +pub(crate) type Compression<'a> = arrow_format::ipc::BodyCompressionRef<'a>; +pub(crate) type Version = arrow_format::ipc::MetadataVersion; diff --git a/src/io/ipc/read/read_basic.rs b/src/io/ipc/read/read_basic.rs index d634e2f6e52..ccddcc25e0a 100644 --- a/src/io/ipc/read/read_basic.rs +++ b/src/io/ipc/read/read_basic.rs @@ -1,15 +1,13 @@ use std::io::{Read, Seek, SeekFrom}; use std::{collections::VecDeque, convert::TryInto}; -use arrow_format::ipc; -use arrow_format::ipc::Message::{BodyCompression, CompressionType}; - use crate::buffer::Buffer; use crate::error::{ArrowError, Result}; use crate::{bitmap::Bitmap, types::NativeType}; use super::super::compression; use super::super::endianess::is_native_little_endian; +use super::{Compression, IpcBuffer, Node}; fn read_swapped( reader: &mut R, @@ -85,7 +83,7 @@ fn read_compressed_buffer( buffer_length: usize, length: usize, is_little_endian: bool, - compression: BodyCompression, + compression: Compression, ) -> Result> { if is_little_endian != is_native_little_endian() { return Err(ArrowError::NotYetImplemented( @@ -104,28 +102,25 @@ fn read_compressed_buffer( let out_slice = bytemuck::cast_slice_mut(&mut buffer); - match compression.codec() { - CompressionType::LZ4_FRAME => { + match compression.codec()? { + arrow_format::ipc::CompressionType::Lz4Frame => { compression::decompress_lz4(&slice[8..], out_slice)?; Ok(buffer) } - CompressionType::ZSTD => { + arrow_format::ipc::CompressionType::Zstd => { compression::decompress_zstd(&slice[8..], out_slice)?; Ok(buffer) } - _ => Err(ArrowError::NotYetImplemented( - "Compression format".to_string(), - )), } } pub fn read_buffer( - buf: &mut VecDeque<&ipc::Schema::Buffer>, + buf: &mut VecDeque, length: usize, // in slots reader: &mut R, block_offset: u64, is_little_endian: bool, - compression: Option, + compression: Option, ) -> Result> { let buf = buf .pop_front() @@ -170,7 +165,7 @@ fn read_uncompressed_bitmap( fn read_compressed_bitmap( length: usize, bytes: usize, - compression: BodyCompression, + compression: Compression, reader: &mut R, ) -> Result> { let mut buffer = vec![0; (length + 7) / 8]; @@ -180,28 +175,25 @@ fn read_compressed_bitmap( let mut slice = vec![0u8; bytes]; reader.read_exact(&mut slice)?; - match compression.codec() { - CompressionType::LZ4_FRAME => { + match compression.codec()? { + arrow_format::ipc::CompressionType::Lz4Frame => { compression::decompress_lz4(&slice[8..], &mut buffer)?; Ok(buffer) } - CompressionType::ZSTD => { + arrow_format::ipc::CompressionType::Zstd => { compression::decompress_zstd(&slice[8..], &mut buffer)?; Ok(buffer) } - _ => Err(ArrowError::NotYetImplemented( - "Non LZ4 compressed IPC".to_string(), - )), } } pub fn read_bitmap( - buf: &mut VecDeque<&ipc::Schema::Buffer>, + buf: &mut VecDeque, length: usize, reader: &mut R, block_offset: u64, _: bool, - compression: Option, + compression: Option, ) -> Result { let buf = buf .pop_front() @@ -221,12 +213,12 @@ pub fn read_bitmap( } pub fn read_validity( - buffers: &mut VecDeque<&ipc::Schema::Buffer>, - field_node: &ipc::Message::FieldNode, + buffers: &mut VecDeque, + field_node: Node, reader: &mut R, block_offset: u64, is_little_endian: bool, - compression: Option, + compression: Option, ) -> Result> { Ok(if field_node.null_count() > 0 { Some(read_bitmap( diff --git a/src/io/ipc/read/reader.rs b/src/io/ipc/read/reader.rs index f20dfba31e2..2fdf9df6c57 100644 --- a/src/io/ipc/read/reader.rs +++ b/src/io/ipc/read/reader.rs @@ -1,10 +1,6 @@ use std::io::{Read, Seek, SeekFrom}; use std::sync::Arc; -use arrow_format::ipc; -use arrow_format::ipc::flatbuffers::VerifierOptions; -use arrow_format::ipc::File::Block; - use crate::array::Array; use crate::chunk::Chunk; use crate::datatypes::{Field, Schema}; @@ -15,6 +11,7 @@ use super::super::{ARROW_MAGIC, CONTINUATION_MARKER}; use super::common::*; use super::schema::fb_to_schema; use super::Dictionaries; +use arrow_format::ipc::planus::{ReadAsRoot, ToOwned, Vector}; #[derive(Debug, Clone)] pub struct FileMetadata { @@ -27,13 +24,10 @@ pub struct FileMetadata { /// The blocks in the file /// /// A block indicates the regions in the file to read to get data - blocks: Vec, + blocks: Vec, /// Dictionaries associated to each dict_id dictionaries: Dictionaries, - - /// FileMetadata version - version: ipc::Schema::MetadataVersion, } /// Arrow File reader @@ -70,28 +64,27 @@ fn read_dictionaries( reader: &mut R, fields: &[Field], ipc_schema: &IpcSchema, - blocks: &[Block], + blocks: Vector, ) -> Result { let mut dictionaries = Default::default(); let mut data = vec![]; for block in blocks { let offset = block.offset() as u64; - let length = block.metaDataLength() as u64; + let length = block.meta_data_length() as u64; read_dictionary_message(reader, offset, &mut data)?; - let message = ipc::Message::root_as_message(&data).map_err(|err| { + let message = arrow_format::ipc::MessageRef::read_as_root(&data).map_err(|err| { ArrowError::OutOfSpec(format!("Unable to get root as message: {:?}", err)) })?; - match message.header_type() { - ipc::Message::MessageHeader::DictionaryBatch => { + let header = message + .header()? + .ok_or_else(|| ArrowError::oos("Message must have an header"))?; + + match header { + arrow_format::ipc::MessageHeaderRef::DictionaryBatch(batch) => { let block_offset = offset + length; - let batch = message.header_as_dictionary_batch().ok_or_else(|| { - ArrowError::OutOfSpec( - "The dictionary message does not have a dictionary header. The file is corrupted.".to_string(), - ) - })?; read_dictionary( batch, fields, @@ -140,29 +133,19 @@ pub fn read_file_metadata(reader: &mut R) -> Result(reader: &mut R) -> Result>>()?, dictionaries, - version: footer.version(), }) } fn get_serialized_batch<'a>( - message: &'a ipc::Message::Message, -) -> Result> { - match message.header_type() { - ipc::Message::MessageHeader::Schema => Err(ArrowError::OutOfSpec( + message: &'a arrow_format::ipc::MessageRef, +) -> Result> { + let header = message.header()?.ok_or_else(|| { + ArrowError::oos("IPC: unable to fetch the message header. The file or stream is corrupted.") + })?; + match header { + arrow_format::ipc::MessageHeaderRef::Schema(_) => Err(ArrowError::OutOfSpec( "Not expecting a schema when messages are read".to_string(), )), - ipc::Message::MessageHeader::RecordBatch => { - message.header_as_record_batch().ok_or_else(|| { - ArrowError::OutOfSpec("Unable to read IPC message as record batch".to_string()) - }) - } + arrow_format::ipc::MessageHeaderRef::RecordBatch(batch) => Ok(batch), t => Err(ArrowError::OutOfSpec(format!( "Reading types other than record batches not yet supported, unable to read {:?}", t @@ -209,7 +193,7 @@ pub fn read_batch( let block = metadata.blocks[block]; // read length - reader.seek(SeekFrom::Start(block.offset() as u64))?; + reader.seek(SeekFrom::Start(block.offset as u64))?; let mut meta_buf = [0; 4]; reader.read_exact(&mut meta_buf)?; if meta_buf == CONTINUATION_MARKER { @@ -222,8 +206,8 @@ pub fn read_batch( block_data.resize(meta_len, 0); reader.read_exact(block_data)?; - let message = ipc::Message::root_as_message(&block_data[..]) - .map_err(|err| ArrowError::OutOfSpec(format!("Unable to get root as footer: {:?}", err)))?; + let message = arrow_format::ipc::MessageRef::read_as_root(&block_data[..]) + .map_err(|err| ArrowError::oos(format!("Unable parse message: {:?}", err)))?; let batch = get_serialized_batch(&message)?; @@ -233,9 +217,9 @@ pub fn read_batch( &metadata.ipc_schema, projection, &metadata.dictionaries, - metadata.version, + message.version()?, reader, - block.offset() as u64 + block.metaDataLength() as u64, + block.offset as u64 + block.meta_data_length as u64, ) } diff --git a/src/io/ipc/read/schema.rs b/src/io/ipc/read/schema.rs index 47ae2c83ac9..7dbeeedfd73 100644 --- a/src/io/ipc/read/schema.rs +++ b/src/io/ipc/read/schema.rs @@ -1,8 +1,4 @@ -mod ipc { - pub use arrow_format::ipc::File::*; - pub use arrow_format::ipc::Message::*; - pub use arrow_format::ipc::Schema::*; -} +use arrow_format::ipc::planus::ReadAsRoot; use crate::{ datatypes::{ @@ -26,8 +22,8 @@ fn try_unzip_vec>>(iter: I) -> Result<(V Ok((a, b)) } -fn deserialize_field(ipc_field: ipc::Field) -> Result<(Field, IpcField)> { - let metadata = read_metadata(&ipc_field); +fn deserialize_field(ipc_field: arrow_format::ipc::FieldRef) -> Result<(Field, IpcField)> { + let metadata = read_metadata(&ipc_field)?; let extension = get_extension(&metadata); @@ -35,33 +31,34 @@ fn deserialize_field(ipc_field: ipc::Field) -> Result<(Field, IpcField)> { let field = Field { name: ipc_field - .name() + .name()? .ok_or_else(|| ArrowError::oos("Every field in IPC must have a name"))? .to_string(), data_type, - is_nullable: ipc_field.nullable(), + is_nullable: ipc_field.nullable()?, metadata, }; Ok((field, ipc_field_)) } -fn read_metadata(field: &ipc::Field) -> Metadata { - if let Some(list) = field.custom_metadata() { +fn read_metadata(field: &arrow_format::ipc::FieldRef) -> Result { + Ok(if let Some(list) = field.custom_metadata()? { let mut metadata_map = Metadata::new(); for kv in list { - if let (Some(k), Some(v)) = (kv.key(), kv.value()) { + let kv = kv?; + if let (Some(k), Some(v)) = (kv.key()?, kv.value()?) { metadata_map.insert(k.to_string(), v.to_string()); } } metadata_map } else { Metadata::default() - } + }) } -fn deserialize_integer(int: ipc::Int) -> Result { - Ok(match (int.bitWidth(), int.is_signed()) { +fn deserialize_integer(int: arrow_format::ipc::IntRef) -> Result { + Ok(match (int.bit_width()?, int.is_signed()?) { (8, true) => IntegerType::Int8, (8, false) => IntegerType::UInt8, (16, true) => IntegerType::Int16, @@ -78,22 +75,32 @@ fn deserialize_integer(int: ipc::Int) -> Result { }) } +fn deserialize_timeunit(time_unit: arrow_format::ipc::TimeUnit) -> Result { + use arrow_format::ipc::TimeUnit::*; + Ok(match time_unit { + Second => TimeUnit::Second, + Millisecond => TimeUnit::Millisecond, + Microsecond => TimeUnit::Microsecond, + Nanosecond => TimeUnit::Nanosecond, + }) +} + /// Get the Arrow data type from the flatbuffer Field table fn get_data_type( - field: ipc::Field, + field: arrow_format::ipc::FieldRef, extension: Extension, may_be_dictionary: bool, ) -> Result<(DataType, IpcField)> { - if let Some(dictionary) = field.dictionary() { + if let Some(dictionary) = field.dictionary()? { if may_be_dictionary { let int = dictionary - .indexType() + .index_type()? .ok_or_else(|| ArrowError::oos("indexType is mandatory in Dictionary."))?; let index_type = deserialize_integer(int)?; let (inner, mut ipc_field) = get_data_type(field, extension, false)?; - ipc_field.dictionary_id = Some(dictionary.id()); + ipc_field.dictionary_id = Some(dictionary.id()?); return Ok(( - DataType::Dictionary(index_type, Box::new(inner), dictionary.isOrdered()), + DataType::Dictionary(index_type, Box::new(inner), dictionary.is_ordered()?), ipc_field, )); } @@ -108,66 +115,49 @@ fn get_data_type( )); } - Ok(match field.type_type() { - ipc::Type::Null => (DataType::Null, IpcField::default()), - ipc::Type::Bool => (DataType::Boolean, IpcField::default()), - ipc::Type::Int => { - let int = field - .type_as_int() - .ok_or_else(|| ArrowError::oos("IPC: Integer type must be an integer"))?; + let type_ = field + .type_()? + .ok_or_else(|| ArrowError::oos("IPC: field type is mandatory"))?; + + use arrow_format::ipc::TypeRef::*; + Ok(match type_ { + Null(_) => (DataType::Null, IpcField::default()), + Bool(_) => (DataType::Boolean, IpcField::default()), + Int(int) => { let data_type = deserialize_integer(int)?.into(); (data_type, IpcField::default()) } - ipc::Type::Binary => (DataType::Binary, IpcField::default()), - ipc::Type::LargeBinary => (DataType::LargeBinary, IpcField::default()), - ipc::Type::Utf8 => (DataType::Utf8, IpcField::default()), - ipc::Type::LargeUtf8 => (DataType::LargeUtf8, IpcField::default()), - ipc::Type::FixedSizeBinary => { - let fsb = field.type_as_fixed_size_binary().ok_or_else(|| { - ArrowError::oos("IPC: FixedSizeBinary type must be a FixedSizeBinary") - })?; - ( - DataType::FixedSizeBinary(fsb.byteWidth() as usize), - IpcField::default(), - ) - } - ipc::Type::FloatingPoint => { - let float = field.type_as_floating_point().ok_or_else(|| { - ArrowError::oos("IPC: FloatingPoint type must be a FloatingPoint") - })?; - let data_type = match float.precision() { - ipc::Precision::HALF => DataType::Float16, - ipc::Precision::SINGLE => DataType::Float32, - ipc::Precision::DOUBLE => DataType::Float64, - z => return Err(ArrowError::nyi(format!("IPC: float of precision {:?}", z))), + Binary(_) => (DataType::Binary, IpcField::default()), + LargeBinary(_) => (DataType::LargeBinary, IpcField::default()), + Utf8(_) => (DataType::Utf8, IpcField::default()), + LargeUtf8(_) => (DataType::LargeUtf8, IpcField::default()), + FixedSizeBinary(fixed) => ( + DataType::FixedSizeBinary(fixed.byte_width()? as usize), + IpcField::default(), + ), + FloatingPoint(float) => { + let data_type = match float.precision()? { + arrow_format::ipc::Precision::Half => DataType::Float16, + arrow_format::ipc::Precision::Single => DataType::Float32, + arrow_format::ipc::Precision::Double => DataType::Float64, }; (data_type, IpcField::default()) } - ipc::Type::Date => { - let date = field - .type_as_date() - .ok_or_else(|| ArrowError::oos("IPC: Date type must be a Date"))?; - let data_type = match date.unit() { - ipc::DateUnit::DAY => DataType::Date32, - ipc::DateUnit::MILLISECOND => DataType::Date64, - z => { - return Err(ArrowError::nyi(format!( - "IPC: date unit of precision {:?}", - z - ))) - } + Date(date) => { + let data_type = match date.unit()? { + arrow_format::ipc::DateUnit::Day => DataType::Date32, + arrow_format::ipc::DateUnit::Millisecond => DataType::Date64, }; (data_type, IpcField::default()) } - ipc::Type::Time => { - let time = field - .type_as_time() - .ok_or_else(|| ArrowError::oos("IPC: Time type must be a Time"))?; - let data_type = match (time.bitWidth(), time.unit()) { - (32, ipc::TimeUnit::SECOND) => DataType::Time32(TimeUnit::Second), - (32, ipc::TimeUnit::MILLISECOND) => DataType::Time32(TimeUnit::Millisecond), - (64, ipc::TimeUnit::MICROSECOND) => DataType::Time64(TimeUnit::Microsecond), - (64, ipc::TimeUnit::NANOSECOND) => DataType::Time64(TimeUnit::Nanosecond), + Time(time) => { + let unit = deserialize_timeunit(time.unit()?)?; + + let data_type = match (time.bit_width()?, unit) { + (32, TimeUnit::Second) => DataType::Time32(TimeUnit::Second), + (32, TimeUnit::Millisecond) => DataType::Time32(TimeUnit::Millisecond), + (64, TimeUnit::Microsecond) => DataType::Time64(TimeUnit::Microsecond), + (64, TimeUnit::Nanosecond) => DataType::Time64(TimeUnit::Nanosecond), (bits, precision) => { return Err(ArrowError::nyi(format!( "Time type with bit width of {} and unit of {:?}", @@ -177,75 +167,45 @@ fn get_data_type( }; (data_type, IpcField::default()) } - ipc::Type::Timestamp => { - let timestamp = field - .type_as_timestamp() - .ok_or_else(|| ArrowError::oos("IPC: Timestamp type must be a Timestamp"))?; - let timezone: Option = timestamp.timezone().map(|tz| tz.to_string()); - let data_type = match timestamp.unit() { - ipc::TimeUnit::SECOND => DataType::Timestamp(TimeUnit::Second, timezone), - ipc::TimeUnit::MILLISECOND => DataType::Timestamp(TimeUnit::Millisecond, timezone), - ipc::TimeUnit::MICROSECOND => DataType::Timestamp(TimeUnit::Microsecond, timezone), - ipc::TimeUnit::NANOSECOND => DataType::Timestamp(TimeUnit::Nanosecond, timezone), - z => { - return Err(ArrowError::nyi(format!( - "Timestamp type with unit of {:?}", - z - ))) - } - }; - (data_type, IpcField::default()) + Timestamp(timestamp) => { + let timezone = timestamp.timezone()?.map(|tz| tz.to_string()); + let time_unit = deserialize_timeunit(timestamp.unit()?)?; + ( + DataType::Timestamp(time_unit, timezone), + IpcField::default(), + ) } - ipc::Type::Interval => { - let interval = field - .type_as_interval() - .ok_or_else(|| ArrowError::oos("IPC: Interval type must be a Interval"))?; - let data_type = match interval.unit() { - ipc::IntervalUnit::YEAR_MONTH => DataType::Interval(IntervalUnit::YearMonth), - ipc::IntervalUnit::DAY_TIME => DataType::Interval(IntervalUnit::DayTime), - ipc::IntervalUnit::MONTH_DAY_NANO => DataType::Interval(IntervalUnit::MonthDayNano), - z => { - return Err(ArrowError::nyi(format!( - "Interval type with unit of {:?}", - z - ))) + Interval(interval) => { + let data_type = match interval.unit()? { + arrow_format::ipc::IntervalUnit::YearMonth => { + DataType::Interval(IntervalUnit::YearMonth) } - }; - (data_type, IpcField::default()) - } - ipc::Type::Duration => { - let duration = field - .type_as_duration() - .ok_or_else(|| ArrowError::oos("IPC: Duration type must be a Duration"))?; - let data_type = match duration.unit() { - ipc::TimeUnit::SECOND => DataType::Duration(TimeUnit::Second), - ipc::TimeUnit::MILLISECOND => DataType::Duration(TimeUnit::Millisecond), - ipc::TimeUnit::MICROSECOND => DataType::Duration(TimeUnit::Microsecond), - ipc::TimeUnit::NANOSECOND => DataType::Duration(TimeUnit::Nanosecond), - z => { - return Err(ArrowError::nyi(format!( - "Duration type with unit of {:?}", - z - ))) + arrow_format::ipc::IntervalUnit::DayTime => { + DataType::Interval(IntervalUnit::DayTime) + } + arrow_format::ipc::IntervalUnit::MonthDayNano => { + DataType::Interval(IntervalUnit::MonthDayNano) } }; (data_type, IpcField::default()) } - ipc::Type::Decimal => { - let fsb = field - .type_as_decimal() - .ok_or_else(|| ArrowError::oos("IPC: Decimal type must be a Decimal"))?; - let data_type = DataType::Decimal(fsb.precision() as usize, fsb.scale() as usize); + Duration(duration) => { + let time_unit = deserialize_timeunit(duration.unit()?)?; + (DataType::Duration(time_unit), IpcField::default()) + } + Decimal(decimal) => { + let data_type = + DataType::Decimal(decimal.precision()? as usize, decimal.scale()? as usize); (data_type, IpcField::default()) } - ipc::Type::List => { + List(_) => { let children = field - .children() + .children()? .ok_or_else(|| ArrowError::oos("IPC: List must contain children"))?; - if children.len() != 1 { - return Err(ArrowError::oos("IPC: List must contain one child")); - } - let (field, ipc_field) = deserialize_field(children.get(0))?; + let inner = children + .get(0) + .ok_or_else(|| ArrowError::oos("IPC: List must contain one child"))??; + let (field, ipc_field) = deserialize_field(inner)?; ( DataType::List(Box::new(field)), @@ -255,14 +215,14 @@ fn get_data_type( }, ) } - ipc::Type::LargeList => { + LargeList(_) => { let children = field - .children() - .ok_or_else(|| ArrowError::oos("IPC: LargeList must contain children"))?; - if children.len() != 1 { - return Err(ArrowError::oos("IPC: LargeList must contain one child")); - } - let (field, ipc_field) = deserialize_field(children.get(0))?; + .children()? + .ok_or_else(|| ArrowError::oos("IPC: List must contain children"))?; + let inner = children + .get(0) + .ok_or_else(|| ArrowError::oos("IPC: List must contain one child"))??; + let (field, ipc_field) = deserialize_field(inner)?; ( DataType::LargeList(Box::new(field)), @@ -272,18 +232,16 @@ fn get_data_type( }, ) } - ipc::Type::FixedSizeList => { - let fsl = field.type_as_fixed_size_list().ok_or_else(|| { - ArrowError::oos("IPC: FixedSizeList type must be a FixedSizeList") - })?; - let size = fsl.listSize() as usize; + FixedSizeList(list) => { let children = field - .children() + .children()? .ok_or_else(|| ArrowError::oos("IPC: FixedSizeList must contain children"))?; - if children.len() != 1 { - return Err(ArrowError::oos("IPC: FixedSizeList must contain one child")); - } - let (field, ipc_field) = deserialize_field(children.get(0))?; + let inner = children + .get(0) + .ok_or_else(|| ArrowError::oos("IPC: FixedSizeList must contain one child"))??; + let (field, ipc_field) = deserialize_field(inner)?; + + let size = list.list_size()? as usize; ( DataType::FixedSizeList(Box::new(field), size), @@ -293,9 +251,9 @@ fn get_data_type( }, ) } - ipc::Type::Struct_ => { + Struct(_) => { let fields = field - .children() + .children()? .ok_or_else(|| ArrowError::oos("IPC: Struct must contain children"))?; if fields.is_empty() { return Err(ArrowError::oos( @@ -303,7 +261,7 @@ fn get_data_type( )); } let (fields, ipc_fields) = try_unzip_vec(fields.iter().map(|field| { - let (field, fields) = deserialize_field(field)?; + let (field, fields) = deserialize_field(field?)?; Ok((field, fields)) }))?; let ipc_field = IpcField { @@ -312,15 +270,12 @@ fn get_data_type( }; (DataType::Struct(fields), ipc_field) } - ipc::Type::Union => { - let type_ = field - .type_as_union() - .ok_or_else(|| ArrowError::oos("IPC: Union type must be a Union"))?; - let mode = UnionMode::sparse(type_.mode() == ipc::UnionMode::Sparse); - let ids = type_.typeIds().map(|x| x.iter().collect()); + Union(union_) => { + let mode = UnionMode::sparse(union_.mode()? == arrow_format::ipc::UnionMode::Sparse); + let ids = union_.type_ids()?.map(|x| x.iter().collect()); let fields = field - .children() + .children()? .ok_or_else(|| ArrowError::oos("IPC: Union must contain children"))?; if fields.is_empty() { return Err(ArrowError::oos( @@ -329,7 +284,7 @@ fn get_data_type( } let (fields, ipc_fields) = try_unzip_vec(fields.iter().map(|field| { - let (field, fields) = deserialize_field(field)?; + let (field, fields) = deserialize_field(field?)?; Ok((field, fields)) }))?; let ipc_field = IpcField { @@ -338,19 +293,16 @@ fn get_data_type( }; (DataType::Union(fields, ids, mode), ipc_field) } - ipc::Type::Map => { - let map = field - .type_as_map() - .ok_or_else(|| ArrowError::oos("IPC: Map type must be a Map"))?; - let is_sorted = map.keysSorted(); + Map(map) => { + let is_sorted = map.keys_sorted()?; let children = field - .children() + .children()? .ok_or_else(|| ArrowError::oos("IPC: Map must contain children"))?; - if children.len() != 1 { - return Err(ArrowError::oos("IPC: Map must contain one child")); - } - let (field, ipc_field) = deserialize_field(children.get(0))?; + let inner = children + .get(0) + .ok_or_else(|| ArrowError::oos("IPC: Map must contain one child"))??; + let (field, ipc_field) = deserialize_field(inner)?; let data_type = DataType::Map(Box::new(field), is_sorted); ( @@ -361,40 +313,55 @@ fn get_data_type( }, ) } - t => { - return Err(ArrowError::NotYetImplemented(format!( - "Reading {:?} from IPC", - t - ))) - } }) } +/// Deserialize an flatbuffers-encoded Schema message into [`Schema`] and [`IpcSchema`]. +pub fn deserialize_schema(bytes: &[u8]) -> Result<(Schema, IpcSchema)> { + let message = arrow_format::ipc::MessageRef::read_as_root(bytes) + .map_err(|err| ArrowError::oos(format!("Unable deserialize message: {:?}", err)))?; + + let schema = match message.header()?.ok_or_else(|| { + ArrowError::oos("Unable to convert flight data header to a record batch".to_string()) + })? { + arrow_format::ipc::MessageHeaderRef::Schema(schema) => Ok(schema), + _ => Err(ArrowError::nyi( + "flight currently only supports reading RecordBatch messages", + )), + }?; + + fb_to_schema(schema) +} + /// Deserialize the raw Schema table from IPC format to Schema data type -pub fn fb_to_schema(fb: ipc::Schema) -> Result<(Schema, IpcSchema)> { - let fields = fb - .fields() +pub(super) fn fb_to_schema(schema: arrow_format::ipc::SchemaRef) -> Result<(Schema, IpcSchema)> { + let fields = schema + .fields()? .ok_or_else(|| ArrowError::oos("IPC: Schema must contain fields"))?; let (fields, ipc_fields) = try_unzip_vec(fields.iter().map(|field| { - let (field, fields) = deserialize_field(field)?; + let (field, fields) = deserialize_field(field?)?; Ok((field, fields)) }))?; - let is_little_endian = fb.endianness().variant_name().unwrap_or("Little") == "Little"; - - let metadata = if let Some(md_fields) = fb.custom_metadata() { - md_fields - .iter() - .filter_map(|f| { - let k = f.key(); - let v = f.value(); - k.and_then(|k| v.map(|v| (k.to_string(), v.to_string()))) - }) - .collect() - } else { - Default::default() + let is_little_endian = match schema.endianness()? { + arrow_format::ipc::Endianness::Little => true, + arrow_format::ipc::Endianness::Big => false, }; + let mut metadata = Metadata::default(); + if let Some(md_fields) = schema.custom_metadata()? { + for kv in md_fields { + let kv = kv?; + let k_str = kv.key()?; + let v_str = kv.value()?; + if let Some(k) = k_str { + if let Some(v) = v_str { + metadata.insert(k.to_string(), v.to_string()); + } + } + } + } + Ok(( Schema { fields, metadata }, IpcSchema { diff --git a/src/io/ipc/read/stream.rs b/src/io/ipc/read/stream.rs index 76f156ab4e8..3439afc24ec 100644 --- a/src/io/ipc/read/stream.rs +++ b/src/io/ipc/read/stream.rs @@ -1,8 +1,8 @@ use std::io::Read; use std::sync::Arc; -use arrow_format::ipc; -use arrow_format::ipc::Schema::MetadataVersion; +use arrow_format; +use arrow_format::ipc::planus::ReadAsRoot; use crate::array::Array; use crate::chunk::Chunk; @@ -20,7 +20,7 @@ pub struct StreamMetadata { /// The schema that is read from the stream's first message pub schema: Schema, - pub version: MetadataVersion, + pub version: arrow_format::ipc::MetadataVersion, pub ipc_schema: IpcSchema, } @@ -42,15 +42,23 @@ pub fn read_stream_metadata(reader: &mut R) -> Result { let mut meta_buffer = vec![0; meta_len as usize]; reader.read_exact(&mut meta_buffer)?; - let message = ipc::Message::root_as_message(meta_buffer.as_slice()).map_err(|err| { - ArrowError::OutOfSpec(format!("Unable to get root as message: {:?}", err)) - })?; - let version = message.version(); + let message = + arrow_format::ipc::MessageRef::read_as_root(meta_buffer.as_slice()).map_err(|err| { + ArrowError::OutOfSpec(format!("Unable to get root as message: {:?}", err)) + })?; + let version = message.version()?; // message header is a Schema, so read it - let ipc_schema: ipc::Schema::Schema = message - .header_as_schema() - .ok_or_else(|| ArrowError::OutOfSpec("Unable to read IPC message as schema".to_string()))?; - let (schema, ipc_schema) = fb_to_schema(ipc_schema)?; + let header = message + .header()? + .ok_or_else(|| ArrowError::oos("Unable to read the first IPC message"))?; + let schema = if let arrow_format::ipc::MessageHeaderRef::Schema(schema) = header { + schema + } else { + return Err(ArrowError::oos( + "The first IPC message of the stream must be a schema", + )); + }; + let (schema, ipc_schema) = fb_to_schema(schema)?; Ok(StreamMetadata { schema, @@ -134,21 +142,19 @@ fn read_next( message_buffer.resize(meta_length, 0); reader.read_exact(message_buffer)?; - let message = ipc::Message::root_as_message(message_buffer).map_err(|err| { + let message = arrow_format::ipc::MessageRef::read_as_root(message_buffer).map_err(|err| { ArrowError::OutOfSpec(format!("Unable to get root as message: {:?}", err)) })?; + let header = message.header()?.ok_or_else(|| { + ArrowError::oos("IPC: unable to fetch the message header. The file or stream is corrupted.") + })?; - match message.header_type() { - ipc::Message::MessageHeader::Schema => Err(ArrowError::OutOfSpec( - "Not expecting a schema when messages are read".to_string(), - )), - ipc::Message::MessageHeader::RecordBatch => { - let batch = message.header_as_record_batch().ok_or_else(|| { - ArrowError::OutOfSpec("Unable to read IPC message as record batch".to_string()) - })?; + match header { + arrow_format::ipc::MessageHeaderRef::Schema(_) => Err(ArrowError::oos("A stream ")), + arrow_format::ipc::MessageHeaderRef::RecordBatch(batch) => { // read the block that makes up the record batch into a buffer data_buffer.clear(); - data_buffer.resize(message.bodyLength() as usize, 0); + data_buffer.resize(message.body_length()? as usize, 0); reader.read_exact(data_buffer)?; let mut reader = std::io::Cursor::new(data_buffer); @@ -165,12 +171,9 @@ fn read_next( ) .map(|x| Some(StreamState::Some(x))) } - ipc::Message::MessageHeader::DictionaryBatch => { - let batch = message.header_as_dictionary_batch().ok_or_else(|| { - ArrowError::OutOfSpec("Unable to read IPC message as dictionary batch".to_string()) - })?; + arrow_format::ipc::MessageHeaderRef::DictionaryBatch(batch) => { // read the block that makes up the dictionary batch into a buffer - let mut buf = vec![0; message.bodyLength() as usize]; + let mut buf = vec![0; message.body_length()? as usize]; reader.read_exact(&mut buf)?; let mut dict_reader = std::io::Cursor::new(buf); @@ -187,7 +190,6 @@ fn read_next( // read the next message until we encounter a RecordBatch message read_next(reader, metadata, dictionaries, message_buffer, data_buffer) } - ipc::Message::MessageHeader::NONE => Ok(Some(StreamState::Waiting)), t => Err(ArrowError::OutOfSpec(format!( "Reading types other than record batches not yet supported, unable to read {:?} ", t diff --git a/src/io/ipc/write/common.rs b/src/io/ipc/write/common.rs index 54af2ec1f7a..2db05ee4afc 100644 --- a/src/io/ipc/write/common.rs +++ b/src/io/ipc/write/common.rs @@ -1,8 +1,6 @@ use std::sync::Arc; -use arrow_format::ipc; -use arrow_format::ipc::flatbuffers::FlatBufferBuilder; -use arrow_format::ipc::Message::CompressionType; +use arrow_format::ipc::planus::Builder; use crate::array::*; use crate::chunk::Chunk; @@ -195,13 +193,28 @@ pub fn encode_chunk( Ok((encoded_dictionaries, encoded_message)) } +fn serialize_compression( + compression: Option, +) -> Option> { + if let Some(compression) = compression { + let codec = match compression { + Compression::LZ4 => arrow_format::ipc::CompressionType::Lz4Frame, + Compression::ZSTD => arrow_format::ipc::CompressionType::Zstd, + }; + Some(Box::new(arrow_format::ipc::BodyCompression { + codec, + method: arrow_format::ipc::BodyCompressionMethod::Buffer, + })) + } else { + None + } +} + /// Write [`Chunk`] into two sets of bytes, one for the header (ipc::Schema::Message) and the /// other for the batch's data fn columns_to_bytes(columns: &Chunk>, options: &WriteOptions) -> EncodedData { - let mut fbb = FlatBufferBuilder::new(); - - let mut nodes: Vec = vec![]; - let mut buffers: Vec = vec![]; + let mut nodes: Vec = vec![]; + let mut buffers: Vec = vec![]; let mut arrow_data: Vec = vec![]; let mut offset = 0; for array in columns.arrays() { @@ -216,45 +229,27 @@ fn columns_to_bytes(columns: &Chunk>, options: &WriteOptions) -> ) } - // write data - let buffers = fbb.create_vector(&buffers); - let nodes = fbb.create_vector(&nodes); - - let compression = if let Some(compression) = options.compression { - let compression = match compression { - Compression::LZ4 => CompressionType::LZ4_FRAME, - Compression::ZSTD => CompressionType::ZSTD, - }; - let mut compression_builder = ipc::Message::BodyCompressionBuilder::new(&mut fbb); - compression_builder.add_codec(compression); - Some(compression_builder.finish()) - } else { - None + let compression = serialize_compression(options.compression); + + let message = arrow_format::ipc::Message { + version: arrow_format::ipc::MetadataVersion::V5, + header: Some(arrow_format::ipc::MessageHeader::RecordBatch(Box::new( + arrow_format::ipc::RecordBatch { + length: columns.len() as i64, + nodes: Some(nodes), + buffers: Some(buffers), + compression, + }, + ))), + body_length: arrow_data.len() as i64, + custom_metadata: None, }; - let root = { - let mut batch_builder = ipc::Message::RecordBatchBuilder::new(&mut fbb); - batch_builder.add_length(columns.len() as i64); - batch_builder.add_nodes(nodes); - batch_builder.add_buffers(buffers); - if let Some(compression) = compression { - batch_builder.add_compression(compression) - } - let b = batch_builder.finish(); - b.as_union_value() - }; - // create an ipc::Schema::Message - let mut message = ipc::Message::MessageBuilder::new(&mut fbb); - message.add_version(ipc::Schema::MetadataVersion::V5); - message.add_header_type(ipc::Message::MessageHeader::RecordBatch); - message.add_bodyLength(arrow_data.len() as i64); - message.add_header(root); - let root = message.finish(); - fbb.finish(root, None); - let finished_data = fbb.finished_data(); + let mut builder = Builder::new(); + let ipc_message = builder.finish(&message, None); EncodedData { - ipc_message: finished_data.to_vec(), + ipc_message: ipc_message.to_vec(), arrow_data, } } @@ -267,10 +262,8 @@ fn dictionary_batch_to_bytes( options: &WriteOptions, is_little_endian: bool, ) -> EncodedData { - let mut fbb = FlatBufferBuilder::new(); - - let mut nodes: Vec = vec![]; - let mut buffers: Vec = vec![]; + let mut nodes: Vec = vec![]; + let mut buffers: Vec = vec![]; let mut arrow_data: Vec = vec![]; let length = write_dictionary( @@ -284,54 +277,31 @@ fn dictionary_batch_to_bytes( false, ); - // write data - let buffers = fbb.create_vector(&buffers); - let nodes = fbb.create_vector(&nodes); - - let compression = if let Some(compression) = options.compression { - let compression = match compression { - Compression::LZ4 => CompressionType::LZ4_FRAME, - Compression::ZSTD => CompressionType::ZSTD, - }; - let mut compression_builder = ipc::Message::BodyCompressionBuilder::new(&mut fbb); - compression_builder.add_codec(compression); - Some(compression_builder.finish()) - } else { - None - }; - - let root = { - let mut batch_builder = ipc::Message::RecordBatchBuilder::new(&mut fbb); - batch_builder.add_length(length as i64); - batch_builder.add_nodes(nodes); - batch_builder.add_buffers(buffers); - if let Some(compression) = compression { - batch_builder.add_compression(compression) - } - batch_builder.finish() - }; - - let root = { - let mut batch_builder = ipc::Message::DictionaryBatchBuilder::new(&mut fbb); - batch_builder.add_id(dict_id); - batch_builder.add_data(root); - batch_builder.finish().as_union_value() - }; - - let root = { - let mut message_builder = ipc::Message::MessageBuilder::new(&mut fbb); - message_builder.add_version(ipc::Schema::MetadataVersion::V5); - message_builder.add_header_type(ipc::Message::MessageHeader::DictionaryBatch); - message_builder.add_bodyLength(arrow_data.len() as i64); - message_builder.add_header(root); - message_builder.finish() + let compression = serialize_compression(options.compression); + + let message = arrow_format::ipc::Message { + version: arrow_format::ipc::MetadataVersion::V5, + header: Some(arrow_format::ipc::MessageHeader::DictionaryBatch(Box::new( + arrow_format::ipc::DictionaryBatch { + id: dict_id, + data: Some(Box::new(arrow_format::ipc::RecordBatch { + length: length as i64, + nodes: Some(nodes), + buffers: Some(buffers), + compression, + })), + is_delta: false, + }, + ))), + body_length: arrow_data.len() as i64, + custom_metadata: None, }; - fbb.finish(root, None); - let finished_data = fbb.finished_data(); + let mut builder = Builder::new(); + let ipc_message = builder.finish(&message, None); EncodedData { - ipc_message: finished_data.to_vec(), + ipc_message: ipc_message.to_vec(), arrow_data, } } @@ -396,6 +366,7 @@ impl DictionaryTracker { } /// Stores the encoded data, which is an ipc::Schema::Message, and optional Arrow data +#[derive(Debug)] pub struct EncodedData { /// An encoded ipc::Schema::Message pub ipc_message: Vec, diff --git a/src/io/ipc/write/schema.rs b/src/io/ipc/write/schema.rs index 30f347a7bdf..0636cb9e6c0 100644 --- a/src/io/ipc/write/schema.rs +++ b/src/io/ipc/write/schema.rs @@ -1,675 +1,328 @@ -use arrow_format::ipc::flatbuffers::{ - FlatBufferBuilder, ForwardsUOffset, UnionWIPOffset, Vector, WIPOffset, -}; -mod ipc { - pub use arrow_format::ipc::File::*; - pub use arrow_format::ipc::Message::*; - pub use arrow_format::ipc::Schema::*; -} +use arrow_format::ipc::planus::Builder; -use crate::datatypes::{DataType, Field, IntegerType, IntervalUnit, Metadata, Schema, TimeUnit}; +use crate::datatypes::{ + DataType, Field, IntegerType, IntervalUnit, Metadata, Schema, TimeUnit, UnionMode, +}; use crate::io::ipc::endianess::is_native_little_endian; use super::super::IpcField; -/// Converts +/// Converts a [Schema] and [IpcField]s to a flatbuffers-encoded [arrow_format::ipc::Message]. pub fn schema_to_bytes(schema: &Schema, ipc_fields: &[IpcField]) -> Vec { - let mut fbb = FlatBufferBuilder::new(); - let schema = { - let fb = schema_to_fb_offset(&mut fbb, schema, ipc_fields); - fb.as_union_value() - }; - - let mut message = ipc::MessageBuilder::new(&mut fbb); - message.add_version(ipc::MetadataVersion::V5); - message.add_header_type(ipc::MessageHeader::Schema); - message.add_bodyLength(0); - message.add_header(schema); - // TODO: custom metadata - let data = message.finish(); - fbb.finish(data, None); + let schema = serialize_schema(schema, ipc_fields); - fbb.finished_data().to_vec() + let message = arrow_format::ipc::Message { + version: arrow_format::ipc::MetadataVersion::V5, + header: Some(arrow_format::ipc::MessageHeader::Schema(Box::new(schema))), + body_length: 0, + custom_metadata: None, // todo: allow writing custom metadata + }; + let mut builder = Builder::new(); + let footer_data = builder.finish(&message, None); + footer_data.to_vec() } -pub fn schema_to_fb_offset<'a>( - fbb: &mut FlatBufferBuilder<'a>, - schema: &Schema, - ipc_fields: &[IpcField], -) -> WIPOffset> { +pub fn serialize_schema(schema: &Schema, ipc_fields: &[IpcField]) -> arrow_format::ipc::Schema { + let endianness = if is_native_little_endian() { + arrow_format::ipc::Endianness::Little + } else { + arrow_format::ipc::Endianness::Big + }; + let fields = schema .fields .iter() .zip(ipc_fields.iter()) - .map(|(field, ipc_field)| build_field(fbb, field, ipc_field)) + .map(|(field, ipc_field)| serialize_field(field, ipc_field)) .collect::>(); let mut custom_metadata = vec![]; - for (k, v) in &schema.metadata { - let fb_key_name = fbb.create_string(k.as_str()); - let fb_val_name = fbb.create_string(v.as_str()); - - let mut kv_builder = ipc::KeyValueBuilder::new(fbb); - kv_builder.add_key(fb_key_name); - kv_builder.add_value(fb_val_name); - custom_metadata.push(kv_builder.finish()); + for (key, value) in &schema.metadata { + custom_metadata.push(arrow_format::ipc::KeyValue { + key: Some(key.clone()), + value: Some(value.clone()), + }); } - - let fb_field_list = fbb.create_vector(&fields); - let fb_metadata_list = fbb.create_vector(&custom_metadata); - - let mut builder = ipc::SchemaBuilder::new(fbb); - builder.add_fields(fb_field_list); - builder.add_custom_metadata(fb_metadata_list); - builder.add_endianness(if is_native_little_endian() { - ipc::Endianness::Little + let custom_metadata = if custom_metadata.is_empty() { + None } else { - ipc::Endianness::Big - }); - builder.finish() -} + Some(custom_metadata) + }; -pub(crate) struct FbFieldType<'b> { - pub(crate) type_type: ipc::Type, - pub(crate) type_: WIPOffset, - pub(crate) children: Option>>>>, + arrow_format::ipc::Schema { + endianness, + fields: Some(fields), + custom_metadata, + features: None, // todo add this one + } } -fn write_metadata<'a>( - fbb: &mut FlatBufferBuilder<'a>, - metadata: &Metadata, - kv_vec: &mut Vec>>, -) { +fn write_metadata(metadata: &Metadata, kv_vec: &mut Vec) { for (k, v) in metadata { if k != "ARROW:extension:name" && k != "ARROW:extension:metadata" { - let kv_args = ipc::KeyValueArgs { - key: Some(fbb.create_string(k.as_str())), - value: Some(fbb.create_string(v.as_str())), + let entry = arrow_format::ipc::KeyValue { + key: Some(k.clone()), + value: Some(v.clone()), }; - kv_vec.push(ipc::KeyValue::create(fbb, &kv_args)); + kv_vec.push(entry); } } } -fn write_extension<'a>( - fbb: &mut FlatBufferBuilder<'a>, +fn write_extension( name: &str, metadata: &Option, - kv_vec: &mut Vec>>, + kv_vec: &mut Vec, ) { // metadata if let Some(metadata) = metadata { - let kv_args = ipc::KeyValueArgs { - key: Some(fbb.create_string("ARROW:extension:metadata")), - value: Some(fbb.create_string(metadata.as_str())), + let entry = arrow_format::ipc::KeyValue { + key: Some("ARROW:extension:metadata".to_string()), + value: Some(metadata.clone()), }; - kv_vec.push(ipc::KeyValue::create(fbb, &kv_args)); + kv_vec.push(entry); } // name - let kv_args = ipc::KeyValueArgs { - key: Some(fbb.create_string("ARROW:extension:name")), - value: Some(fbb.create_string(name)), + let entry = arrow_format::ipc::KeyValue { + key: Some("ARROW:extension:name".to_string()), + value: Some(name.to_string()), }; - kv_vec.push(ipc::KeyValue::create(fbb, &kv_args)); + kv_vec.push(entry); } /// Create an IPC Field from an Arrow Field -pub(crate) fn build_field<'a>( - fbb: &mut FlatBufferBuilder<'a>, - field: &Field, - ipc_field: &IpcField, -) -> WIPOffset> { +pub(crate) fn serialize_field(field: &Field, ipc_field: &IpcField) -> arrow_format::ipc::Field { // custom metadata. let mut kv_vec = vec![]; if let DataType::Extension(name, _, metadata) = field.data_type() { - write_extension(fbb, name, metadata, &mut kv_vec); + write_extension(name, metadata, &mut kv_vec); } - let fb_field_name = fbb.create_string(field.name.as_str()); - let field_type = get_fb_field_type(field.data_type(), ipc_field, field.is_nullable, fbb); + let type_ = serialize_type(field.data_type()); + let children = serialize_children(field.data_type(), ipc_field); - let fb_dictionary = - if let DataType::Dictionary(index_type, inner, is_ordered) = field.data_type() { - if let DataType::Extension(name, _, metadata) = inner.as_ref() { - write_extension(fbb, name, metadata, &mut kv_vec); - } - Some(get_fb_dictionary( - index_type, - ipc_field - .dictionary_id - .expect("All Dictionary types have `dict_id`"), - *is_ordered, - fbb, - )) - } else { - None - }; + let dictionary = if let DataType::Dictionary(index_type, inner, is_ordered) = field.data_type() + { + if let DataType::Extension(name, _, metadata) = inner.as_ref() { + write_extension(name, metadata, &mut kv_vec); + } + Some(serialize_dictionary( + index_type, + ipc_field + .dictionary_id + .expect("All Dictionary types have `dict_id`"), + *is_ordered, + )) + } else { + None + }; - write_metadata(fbb, &field.metadata, &mut kv_vec); + write_metadata(&field.metadata, &mut kv_vec); - let fb_metadata = if !kv_vec.is_empty() { - Some(fbb.create_vector(&kv_vec)) + let custom_metadata = if !kv_vec.is_empty() { + Some(kv_vec) } else { None }; - let mut field_builder = ipc::FieldBuilder::new(fbb); - field_builder.add_name(fb_field_name); - if let Some(dictionary) = fb_dictionary { - field_builder.add_dictionary(dictionary) + arrow_format::ipc::Field { + name: Some(field.name.clone()), + nullable: field.is_nullable, + type_: Some(type_), + dictionary: dictionary.map(Box::new), + children: Some(children), + custom_metadata, } - field_builder.add_type_type(field_type.type_type); - field_builder.add_nullable(field.is_nullable); - match field_type.children { - None => {} - Some(children) => field_builder.add_children(children), - }; - field_builder.add_type_(field_type.type_); +} - if let Some(fb_metadata) = fb_metadata { - field_builder.add_custom_metadata(fb_metadata); +fn serialize_time_unit(unit: &TimeUnit) -> arrow_format::ipc::TimeUnit { + match unit { + TimeUnit::Second => arrow_format::ipc::TimeUnit::Second, + TimeUnit::Millisecond => arrow_format::ipc::TimeUnit::Millisecond, + TimeUnit::Microsecond => arrow_format::ipc::TimeUnit::Microsecond, + TimeUnit::Nanosecond => arrow_format::ipc::TimeUnit::Nanosecond, } - - field_builder.finish() } -fn type_to_field_type(data_type: &DataType) -> ipc::Type { +fn serialize_type(data_type: &DataType) -> arrow_format::ipc::Type { + use arrow_format::ipc; use DataType::*; match data_type { - Null => ipc::Type::Null, - Boolean => ipc::Type::Bool, - UInt8 | UInt16 | UInt32 | UInt64 | Int8 | Int16 | Int32 | Int64 => ipc::Type::Int, - Float16 | Float32 | Float64 => ipc::Type::FloatingPoint, - Decimal(_, _) => ipc::Type::Decimal, - Binary => ipc::Type::Binary, - LargeBinary => ipc::Type::LargeBinary, - Utf8 => ipc::Type::Utf8, - LargeUtf8 => ipc::Type::LargeUtf8, - FixedSizeBinary(_) => ipc::Type::FixedSizeBinary, - Date32 | Date64 => ipc::Type::Date, - Duration(_) => ipc::Type::Duration, - Time32(_) | Time64(_) => ipc::Type::Time, - Timestamp(_, _) => ipc::Type::Timestamp, - Interval(_) => ipc::Type::Interval, - List(_) => ipc::Type::List, - LargeList(_) => ipc::Type::LargeList, - FixedSizeList(_, _) => ipc::Type::FixedSizeList, - Union(_, _, _) => ipc::Type::Union, - Map(_, _) => ipc::Type::Map, - Struct(_) => ipc::Type::Struct_, - Dictionary(_, v, _) => type_to_field_type(v), - Extension(_, v, _) => type_to_field_type(v), + Null => ipc::Type::Null(Box::new(ipc::Null {})), + Boolean => ipc::Type::Bool(Box::new(ipc::Bool {})), + UInt8 => ipc::Type::Int(Box::new(ipc::Int { + bit_width: 8, + is_signed: false, + })), + UInt16 => ipc::Type::Int(Box::new(ipc::Int { + bit_width: 16, + is_signed: false, + })), + UInt32 => ipc::Type::Int(Box::new(ipc::Int { + bit_width: 32, + is_signed: false, + })), + UInt64 => ipc::Type::Int(Box::new(ipc::Int { + bit_width: 64, + is_signed: false, + })), + Int8 => ipc::Type::Int(Box::new(ipc::Int { + bit_width: 8, + is_signed: true, + })), + Int16 => ipc::Type::Int(Box::new(ipc::Int { + bit_width: 16, + is_signed: true, + })), + Int32 => ipc::Type::Int(Box::new(ipc::Int { + bit_width: 32, + is_signed: true, + })), + Int64 => ipc::Type::Int(Box::new(ipc::Int { + bit_width: 64, + is_signed: true, + })), + Float16 => ipc::Type::FloatingPoint(Box::new(ipc::FloatingPoint { + precision: ipc::Precision::Half, + })), + Float32 => ipc::Type::FloatingPoint(Box::new(ipc::FloatingPoint { + precision: ipc::Precision::Single, + })), + Float64 => ipc::Type::FloatingPoint(Box::new(ipc::FloatingPoint { + precision: ipc::Precision::Double, + })), + Decimal(precision, scale) => ipc::Type::Decimal(Box::new(ipc::Decimal { + precision: *precision as i32, + scale: *scale as i32, + bit_width: 128, + })), + Binary => ipc::Type::Binary(Box::new(ipc::Binary {})), + LargeBinary => ipc::Type::LargeBinary(Box::new(ipc::LargeBinary {})), + Utf8 => ipc::Type::Utf8(Box::new(ipc::Utf8 {})), + LargeUtf8 => ipc::Type::LargeUtf8(Box::new(ipc::LargeUtf8 {})), + FixedSizeBinary(size) => ipc::Type::FixedSizeBinary(Box::new(ipc::FixedSizeBinary { + byte_width: *size as i32, + })), + Date32 => ipc::Type::Date(Box::new(ipc::Date { + unit: ipc::DateUnit::Day, + })), + Date64 => ipc::Type::Date(Box::new(ipc::Date { + unit: ipc::DateUnit::Millisecond, + })), + Duration(unit) => ipc::Type::Duration(Box::new(ipc::Duration { + unit: serialize_time_unit(unit), + })), + Time32(unit) => ipc::Type::Time(Box::new(ipc::Time { + unit: serialize_time_unit(unit), + bit_width: 32, + })), + Time64(unit) => ipc::Type::Time(Box::new(ipc::Time { + unit: serialize_time_unit(unit), + bit_width: 64, + })), + Timestamp(unit, tz) => ipc::Type::Timestamp(Box::new(ipc::Timestamp { + unit: serialize_time_unit(unit), + timezone: tz.as_ref().cloned(), + })), + Interval(unit) => ipc::Type::Interval(Box::new(ipc::Interval { + unit: match unit { + IntervalUnit::YearMonth => ipc::IntervalUnit::YearMonth, + IntervalUnit::DayTime => ipc::IntervalUnit::DayTime, + IntervalUnit::MonthDayNano => ipc::IntervalUnit::MonthDayNano, + }, + })), + List(_) => ipc::Type::List(Box::new(ipc::List {})), + LargeList(_) => ipc::Type::LargeList(Box::new(ipc::LargeList {})), + FixedSizeList(_, size) => ipc::Type::FixedSizeList(Box::new(ipc::FixedSizeList { + list_size: *size as i32, + })), + Union(_, type_ids, mode) => ipc::Type::Union(Box::new(ipc::Union { + mode: match mode { + UnionMode::Dense => ipc::UnionMode::Dense, + UnionMode::Sparse => ipc::UnionMode::Sparse, + }, + type_ids: type_ids.clone(), + })), + Map(_, keys_sorted) => ipc::Type::Map(Box::new(ipc::Map { + keys_sorted: *keys_sorted, + })), + Struct(_) => ipc::Type::Struct(Box::new(ipc::Struct {})), + Dictionary(_, v, _) => serialize_type(v), + Extension(_, v, _) => serialize_type(v), } } -/// Get the IPC type of a data type -pub(crate) fn get_fb_field_type<'a>( - data_type: &DataType, - ipc_field: &IpcField, - is_nullable: bool, - fbb: &mut FlatBufferBuilder<'a>, -) -> FbFieldType<'a> { +fn serialize_children(data_type: &DataType, ipc_field: &IpcField) -> Vec { use DataType::*; - let type_type = type_to_field_type(data_type); - - // some IPC implementations expect an empty list for child data, instead of a null value. - // An empty field list is thus returned for primitive types - let empty_fields: Vec> = vec![]; match data_type { - Null => FbFieldType { - type_type, - type_: ipc::NullBuilder::new(fbb).finish().as_union_value(), - children: Some(fbb.create_vector(&empty_fields[..])), - }, - Boolean => FbFieldType { - type_type, - type_: ipc::BoolBuilder::new(fbb).finish().as_union_value(), - children: Some(fbb.create_vector(&empty_fields[..])), - }, - Int8 | Int16 | Int32 | Int64 | UInt8 | UInt16 | UInt32 | UInt64 => { - let children = fbb.create_vector(&empty_fields[..]); - let mut builder = ipc::IntBuilder::new(fbb); - if matches!(data_type, UInt8 | UInt16 | UInt32 | UInt64) { - builder.add_is_signed(false); - } else { - builder.add_is_signed(true); - } - match data_type { - Int8 | UInt8 => builder.add_bitWidth(8), - Int16 | UInt16 => builder.add_bitWidth(16), - Int32 | UInt32 => builder.add_bitWidth(32), - Int64 | UInt64 => builder.add_bitWidth(64), - _ => {} - }; - FbFieldType { - type_type, - type_: builder.finish().as_union_value(), - children: Some(children), - } - } - Float16 | Float32 | Float64 => { - let children = fbb.create_vector(&empty_fields[..]); - let mut builder = ipc::FloatingPointBuilder::new(fbb); - match data_type { - Float16 => builder.add_precision(ipc::Precision::HALF), - Float32 => builder.add_precision(ipc::Precision::SINGLE), - Float64 => builder.add_precision(ipc::Precision::DOUBLE), - _ => {} - }; - FbFieldType { - type_type, - type_: builder.finish().as_union_value(), - children: Some(children), - } - } - Binary => FbFieldType { - type_type, - type_: ipc::BinaryBuilder::new(fbb).finish().as_union_value(), - children: Some(fbb.create_vector(&empty_fields[..])), - }, - LargeBinary => FbFieldType { - type_type, - type_: ipc::LargeBinaryBuilder::new(fbb).finish().as_union_value(), - children: Some(fbb.create_vector(&empty_fields[..])), - }, - Utf8 => FbFieldType { - type_type, - type_: ipc::Utf8Builder::new(fbb).finish().as_union_value(), - children: Some(fbb.create_vector(&empty_fields[..])), - }, - LargeUtf8 => FbFieldType { - type_type, - type_: ipc::LargeUtf8Builder::new(fbb).finish().as_union_value(), - children: Some(fbb.create_vector(&empty_fields[..])), - }, - FixedSizeBinary(len) => { - let mut builder = ipc::FixedSizeBinaryBuilder::new(fbb); - builder.add_byteWidth(*len as i32); - FbFieldType { - type_type, - type_: builder.finish().as_union_value(), - children: Some(fbb.create_vector(&empty_fields[..])), - } - } - Date32 => { - let mut builder = ipc::DateBuilder::new(fbb); - builder.add_unit(ipc::DateUnit::DAY); - FbFieldType { - type_type, - type_: builder.finish().as_union_value(), - children: Some(fbb.create_vector(&empty_fields[..])), - } - } - Date64 => { - let mut builder = ipc::DateBuilder::new(fbb); - builder.add_unit(ipc::DateUnit::MILLISECOND); - FbFieldType { - type_type, - type_: builder.finish().as_union_value(), - children: Some(fbb.create_vector(&empty_fields[..])), - } - } - Time32(unit) | Time64(unit) => { - let mut builder = ipc::TimeBuilder::new(fbb); - match unit { - TimeUnit::Second => { - builder.add_bitWidth(32); - builder.add_unit(ipc::TimeUnit::SECOND); - } - TimeUnit::Millisecond => { - builder.add_bitWidth(32); - builder.add_unit(ipc::TimeUnit::MILLISECOND); - } - TimeUnit::Microsecond => { - builder.add_bitWidth(64); - builder.add_unit(ipc::TimeUnit::MICROSECOND); - } - TimeUnit::Nanosecond => { - builder.add_bitWidth(64); - builder.add_unit(ipc::TimeUnit::NANOSECOND); - } - } - FbFieldType { - type_type, - type_: builder.finish().as_union_value(), - children: Some(fbb.create_vector(&empty_fields[..])), - } - } - Timestamp(unit, tz) => { - let tz = tz.clone().unwrap_or_else(String::new); - let tz_str = fbb.create_string(tz.as_str()); - let mut builder = ipc::TimestampBuilder::new(fbb); - let time_unit = match unit { - TimeUnit::Second => ipc::TimeUnit::SECOND, - TimeUnit::Millisecond => ipc::TimeUnit::MILLISECOND, - TimeUnit::Microsecond => ipc::TimeUnit::MICROSECOND, - TimeUnit::Nanosecond => ipc::TimeUnit::NANOSECOND, - }; - builder.add_unit(time_unit); - if !tz.is_empty() { - builder.add_timezone(tz_str); - } - FbFieldType { - type_type, - type_: builder.finish().as_union_value(), - children: Some(fbb.create_vector(&empty_fields[..])), - } - } - Interval(unit) => { - let mut builder = ipc::IntervalBuilder::new(fbb); - let interval_unit = match unit { - IntervalUnit::YearMonth => ipc::IntervalUnit::YEAR_MONTH, - IntervalUnit::DayTime => ipc::IntervalUnit::DAY_TIME, - IntervalUnit::MonthDayNano => ipc::IntervalUnit::MONTH_DAY_NANO, - }; - builder.add_unit(interval_unit); - FbFieldType { - type_type, - type_: builder.finish().as_union_value(), - children: Some(fbb.create_vector(&empty_fields[..])), - } - } - Duration(unit) => { - let mut builder = ipc::DurationBuilder::new(fbb); - let time_unit = match unit { - TimeUnit::Second => ipc::TimeUnit::SECOND, - TimeUnit::Millisecond => ipc::TimeUnit::MILLISECOND, - TimeUnit::Microsecond => ipc::TimeUnit::MICROSECOND, - TimeUnit::Nanosecond => ipc::TimeUnit::NANOSECOND, - }; - builder.add_unit(time_unit); - FbFieldType { - type_type, - type_: builder.finish().as_union_value(), - children: Some(fbb.create_vector(&empty_fields[..])), - } - } - List(ref list_type) => { - let child = build_field(fbb, list_type, &ipc_field.fields[0]); - FbFieldType { - type_type, - type_: ipc::ListBuilder::new(fbb).finish().as_union_value(), - children: Some(fbb.create_vector(&[child])), - } - } - LargeList(ref list_type) => { - let child = build_field(fbb, list_type, &ipc_field.fields[0]); - FbFieldType { - type_type, - type_: ipc::LargeListBuilder::new(fbb).finish().as_union_value(), - children: Some(fbb.create_vector(&[child])), - } - } - FixedSizeList(ref list_type, len) => { - let child = build_field(fbb, list_type, &ipc_field.fields[0]); - let mut builder = ipc::FixedSizeListBuilder::new(fbb); - builder.add_listSize(*len as i32); - FbFieldType { - type_type, - type_: builder.finish().as_union_value(), - children: Some(fbb.create_vector(&[child])), - } - } - Struct(fields) => { - let children: Vec<_> = fields - .iter() - .zip(ipc_field.fields.iter()) - .map(|(field, ipc_field)| build_field(fbb, field, ipc_field)) - .collect(); - - FbFieldType { - type_type, - type_: ipc::Struct_Builder::new(fbb).finish().as_union_value(), - children: Some(fbb.create_vector(&children[..])), - } - } - Dictionary(_, value_type, _) => { - // In this library, the dictionary "type" is a logical construct. Here we - // pass through to the value type, as we've already captured the index - // type in the DictionaryEncoding metadata in the parent field - get_fb_field_type(value_type, ipc_field, is_nullable, fbb) - } - Extension(_, value_type, _) => get_fb_field_type(value_type, ipc_field, is_nullable, fbb), - Decimal(precision, scale) => { - let mut builder = ipc::DecimalBuilder::new(fbb); - builder.add_precision(*precision as i32); - builder.add_scale(*scale as i32); - builder.add_bitWidth(128); - FbFieldType { - type_type, - type_: builder.finish().as_union_value(), - children: Some(fbb.create_vector(&empty_fields[..])), - } - } - Union(fields, ids, mode) => { - let children: Vec<_> = fields - .iter() - .zip(ipc_field.fields.iter()) - .map(|(field, ipc_field)| build_field(fbb, field, ipc_field)) - .collect(); - - let ids = ids.as_ref().map(|ids| fbb.create_vector(ids)); - - let mut builder = ipc::UnionBuilder::new(fbb); - builder.add_mode(if mode.is_sparse() { - ipc::UnionMode::Sparse - } else { - ipc::UnionMode::Dense - }); - - if let Some(ids) = ids { - builder.add_typeIds(ids); - } - FbFieldType { - type_type, - type_: builder.finish().as_union_value(), - children: Some(fbb.create_vector(&children)), - } - } - Map(field, keys_sorted) => { - let child = build_field(fbb, field, &ipc_field.fields[0]); - let mut field_type = ipc::MapBuilder::new(fbb); - field_type.add_keysSorted(*keys_sorted); - FbFieldType { - type_type: ipc::Type::Map, - type_: field_type.finish().as_union_value(), - children: Some(fbb.create_vector(&[child])), - } + Null + | Boolean + | Int8 + | Int16 + | Int32 + | Int64 + | UInt8 + | UInt16 + | UInt32 + | UInt64 + | Float16 + | Float32 + | Float64 + | Timestamp(_, _) + | Date32 + | Date64 + | Time32(_) + | Time64(_) + | Duration(_) + | Interval(_) + | Binary + | FixedSizeBinary(_) + | LargeBinary + | Utf8 + | LargeUtf8 + | Decimal(_, _) => vec![], + FixedSizeList(inner, _) | LargeList(inner) | List(inner) | Map(inner, _) => { + vec![serialize_field(inner, &ipc_field.fields[0])] } + Union(fields, _, _) | Struct(fields) => fields + .iter() + .zip(ipc_field.fields.iter()) + .map(|(field, ipc)| serialize_field(field, ipc)) + .collect(), + Dictionary(_, inner, _) => serialize_children(inner, ipc_field), + Extension(_, inner, _) => serialize_children(inner, ipc_field), } } /// Create an IPC dictionary encoding -pub(crate) fn get_fb_dictionary<'a>( +pub(crate) fn serialize_dictionary( index_type: &IntegerType, dict_id: i64, dict_is_ordered: bool, - fbb: &mut FlatBufferBuilder<'a>, -) -> WIPOffset> { +) -> arrow_format::ipc::DictionaryEncoding { use IntegerType::*; - // We assume that the dictionary index type (as an integer) has already been - // validated elsewhere, and can safely assume we are dealing with integers - let mut index_builder = ipc::IntBuilder::new(fbb); - - match index_type { - Int8 | Int16 | Int32 | Int64 => index_builder.add_is_signed(true), - UInt8 | UInt16 | UInt32 | UInt64 => index_builder.add_is_signed(false), - } - - match index_type { - Int8 | UInt8 => index_builder.add_bitWidth(8), - Int16 | UInt16 => index_builder.add_bitWidth(16), - Int32 | UInt32 => index_builder.add_bitWidth(32), - Int64 | UInt64 => index_builder.add_bitWidth(64), - } - - let index_builder = index_builder.finish(); - - let mut builder = ipc::DictionaryEncodingBuilder::new(fbb); - builder.add_id(dict_id); - builder.add_indexType(index_builder); - builder.add_isOrdered(dict_is_ordered); - - builder.finish() -} - -/* -#[cfg(test)] -mod tests { - use super::*; - use crate::datatypes::{DataType, Field, Schema}; - - /// Serialize a schema in IPC format - fn schema_to_fb(schema: &Schema) -> FlatBufferBuilder { - let mut fbb = FlatBufferBuilder::new(); - - let root = schema_to_fb_offset(&mut fbb, schema); - - fbb.finish(root, None); - - fbb - } + let is_signed = match index_type { + Int8 | Int16 | Int32 | Int64 => true, + UInt8 | UInt16 | UInt32 | UInt64 => false, + }; - #[test] - fn convert_schema_round_trip() { - let md: HashMap = [("Key".to_string(), "value".to_string())] - .iter() - .cloned() - .collect(); - let field_md: BTreeMap = [("k".to_string(), "v".to_string())] - .iter() - .cloned() - .collect(); - let schema = Schema::new_from( - vec![ - { - let mut f = Field::new("uint8", DataType::UInt8, false); - f.set_metadata(Some(field_md)); - f - }, - Field::new("uint16", DataType::UInt16, true), - Field::new("uint32", DataType::UInt32, false), - Field::new("uint64", DataType::UInt64, true), - Field::new("int8", DataType::Int8, true), - Field::new("int16", DataType::Int16, false), - Field::new("int32", DataType::Int32, true), - Field::new("int64", DataType::Int64, false), - Field::new("float16", DataType::Float16, true), - Field::new("float32", DataType::Float32, false), - Field::new("float64", DataType::Float64, true), - Field::new("null", DataType::Null, false), - Field::new("bool", DataType::Boolean, false), - Field::new("date32", DataType::Date32, false), - Field::new("date64", DataType::Date64, true), - Field::new("time32[s]", DataType::Time32(TimeUnit::Second), true), - Field::new("time32[ms]", DataType::Time32(TimeUnit::Millisecond), false), - Field::new("time64[us]", DataType::Time64(TimeUnit::Microsecond), false), - Field::new("time64[ns]", DataType::Time64(TimeUnit::Nanosecond), true), - Field::new( - "timestamp[s]", - DataType::Timestamp(TimeUnit::Second, None), - false, - ), - Field::new( - "timestamp[ms]", - DataType::Timestamp(TimeUnit::Millisecond, None), - true, - ), - Field::new( - "timestamp[us]", - DataType::Timestamp( - TimeUnit::Microsecond, - Some("Africa/Johannesburg".to_string()), - ), - false, - ), - Field::new( - "timestamp[ns]", - DataType::Timestamp(TimeUnit::Nanosecond, None), - true, - ), - Field::new( - "interval[ym]", - DataType::Interval(IntervalUnit::YearMonth), - true, - ), - Field::new( - "interval[dt]", - DataType::Interval(IntervalUnit::DayTime), - true, - ), - Field::new("utf8", DataType::Utf8, false), - Field::new("binary", DataType::Binary, false), - Field::new( - "list[u8]", - DataType::List(Box::new(Field::new("item", DataType::UInt8, false))), - true, - ), - Field::new( - "list[struct]", - DataType::List(Box::new(Field::new( - "struct", - DataType::Struct(vec![ - Field::new("float32", DataType::UInt8, false), - Field::new("int32", DataType::Int32, true), - Field::new("bool", DataType::Boolean, true), - ]), - true, - ))), - false, - ), - Field::new( - "struct]>]>", - DataType::Struct(vec![ - Field::new("int64", DataType::Int64, true), - Field::new( - "list[struct]>]", - DataType::List(Box::new(Field::new( - "struct", - DataType::Struct(vec![ - Field::new("date32", DataType::Date32, true), - Field::new( - "list[struct<>]", - DataType::List(Box::new(Field::new( - "struct", - DataType::Struct(vec![]), - false, - ))), - false, - ), - ]), - false, - ))), - false, - ), - ]), - false, - ), - Field::new("struct<>", DataType::Struct(vec![]), true), - Field::new_dict( - "dictionary", - DataType::Dictionary(IntegerType::Int32, Box::new(DataType::Utf8), true), - true, - 123, - ), - Field::new_dict( - "dictionary", - DataType::Dictionary(IntegerType::UInt8, Box::new(DataType::UInt32), true), - true, - 123, - ), - Field::new("decimal", DataType::Decimal(10, 6), false), - ], - md, - ); + let bit_width = match index_type { + Int8 | UInt8 => 8, + Int16 | UInt16 => 16, + Int32 | UInt32 => 32, + Int64 | UInt64 => 64, + }; - let fb = schema_to_fb(&schema); + let index_type = arrow_format::ipc::Int { + bit_width, + is_signed, + }; - // read back fields - let ipc = ipc::root_as_schema(fb.finished_data()).unwrap(); - let (schema2, _) = fb_to_schema(ipc); - assert_eq!(schema, schema2); + arrow_format::ipc::DictionaryEncoding { + id: dict_id, + index_type: Some(Box::new(index_type)), + is_ordered: dict_is_ordered, + dictionary_kind: arrow_format::ipc::DictionaryKind::DenseArray, } } - */ diff --git a/src/io/ipc/write/serialize.rs b/src/io/ipc/write/serialize.rs index aa6d3d31f26..c8f535c850f 100644 --- a/src/io/ipc/write/serialize.rs +++ b/src/io/ipc/write/serialize.rs @@ -1,4 +1,4 @@ -use arrow_format::ipc::{Message, Schema}; +use arrow_format::ipc; use crate::{ array::*, @@ -14,7 +14,7 @@ use super::common::{pad_to_8, Compression}; fn _write_primitive( array: &PrimitiveArray, - buffers: &mut Vec, + buffers: &mut Vec, arrow_data: &mut Vec, offset: &mut i64, is_little_endian: bool, @@ -41,7 +41,7 @@ fn _write_primitive( fn write_primitive( array: &dyn Array, - buffers: &mut Vec, + buffers: &mut Vec, arrow_data: &mut Vec, offset: &mut i64, is_little_endian: bool, @@ -60,7 +60,7 @@ fn write_primitive( fn write_boolean( array: &dyn Array, - buffers: &mut Vec, + buffers: &mut Vec, arrow_data: &mut Vec, offset: &mut i64, _: bool, @@ -91,7 +91,7 @@ fn write_generic_binary( validity: Option<&Bitmap>, offsets: &[O], values: &[u8], - buffers: &mut Vec, + buffers: &mut Vec, arrow_data: &mut Vec, offset: &mut i64, is_little_endian: bool, @@ -139,7 +139,7 @@ fn write_generic_binary( fn write_binary( array: &dyn Array, - buffers: &mut Vec, + buffers: &mut Vec, arrow_data: &mut Vec, offset: &mut i64, is_little_endian: bool, @@ -160,7 +160,7 @@ fn write_binary( fn write_utf8( array: &dyn Array, - buffers: &mut Vec, + buffers: &mut Vec, arrow_data: &mut Vec, offset: &mut i64, is_little_endian: bool, @@ -181,7 +181,7 @@ fn write_utf8( fn write_fixed_size_binary( array: &dyn Array, - buffers: &mut Vec, + buffers: &mut Vec, arrow_data: &mut Vec, offset: &mut i64, _is_little_endian: bool, @@ -204,9 +204,9 @@ fn write_fixed_size_binary( fn write_list( array: &dyn Array, - buffers: &mut Vec, + buffers: &mut Vec, arrow_data: &mut Vec, - nodes: &mut Vec, + nodes: &mut Vec, offset: &mut i64, is_little_endian: bool, compression: Option, @@ -262,9 +262,9 @@ fn write_list( pub fn write_struct( array: &dyn Array, - buffers: &mut Vec, + buffers: &mut Vec, arrow_data: &mut Vec, - nodes: &mut Vec, + nodes: &mut Vec, offset: &mut i64, is_little_endian: bool, compression: Option, @@ -293,9 +293,9 @@ pub fn write_struct( pub fn write_union( array: &dyn Array, - buffers: &mut Vec, + buffers: &mut Vec, arrow_data: &mut Vec, - nodes: &mut Vec, + nodes: &mut Vec, offset: &mut i64, is_little_endian: bool, compression: Option, @@ -336,9 +336,9 @@ pub fn write_union( fn write_map( array: &dyn Array, - buffers: &mut Vec, + buffers: &mut Vec, arrow_data: &mut Vec, - nodes: &mut Vec, + nodes: &mut Vec, offset: &mut i64, is_little_endian: bool, compression: Option, @@ -394,9 +394,9 @@ fn write_map( fn write_fixed_size_list( array: &dyn Array, - buffers: &mut Vec, + buffers: &mut Vec, arrow_data: &mut Vec, - nodes: &mut Vec, + nodes: &mut Vec, offset: &mut i64, is_little_endian: bool, compression: Option, @@ -425,9 +425,9 @@ fn write_fixed_size_list( #[allow(clippy::too_many_arguments)] pub fn _write_dictionary( array: &dyn Array, - buffers: &mut Vec, + buffers: &mut Vec, arrow_data: &mut Vec, - nodes: &mut Vec, + nodes: &mut Vec, offset: &mut i64, is_little_endian: bool, compression: Option, @@ -461,9 +461,9 @@ pub fn _write_dictionary( #[allow(clippy::too_many_arguments)] pub fn write_dictionary( array: &dyn Array, - buffers: &mut Vec, + buffers: &mut Vec, arrow_data: &mut Vec, - nodes: &mut Vec, + nodes: &mut Vec, offset: &mut i64, is_little_endian: bool, compression: Option, @@ -490,17 +490,17 @@ pub fn write_dictionary( pub fn write( array: &dyn Array, - buffers: &mut Vec, + buffers: &mut Vec, arrow_data: &mut Vec, - nodes: &mut Vec, + nodes: &mut Vec, offset: &mut i64, is_little_endian: bool, compression: Option, ) { - nodes.push(Message::FieldNode::new( - array.len() as i64, - array.null_count() as i64, - )); + nodes.push(ipc::FieldNode { + length: array.len() as i64, + null_count: array.null_count() as i64, + }); use PhysicalType::*; match array.data_type().to_physical_type() { Null => (), @@ -637,7 +637,7 @@ fn pad_buffer_to_8(buffer: &mut Vec, length: usize) { /// writes `bytes` to `arrow_data` updating `buffers` and `offset` and guaranteeing a 8 byte boundary. fn write_bytes( bytes: &[u8], - buffers: &mut Vec, + buffers: &mut Vec, arrow_data: &mut Vec, offset: &mut i64, compression: Option, @@ -660,14 +660,17 @@ fn write_bytes( pad_buffer_to_8(arrow_data, arrow_data.len() - start); let total_len = (arrow_data.len() - start) as i64; - buffers.push(Schema::Buffer::new(*offset, total_len)); + buffers.push(ipc::Buffer { + offset: *offset, + length: total_len, + }); *offset += total_len; } fn write_bitmap( bitmap: Option<&Bitmap>, length: usize, - buffers: &mut Vec, + buffers: &mut Vec, arrow_data: &mut Vec, offset: &mut i64, compression: Option, @@ -686,7 +689,10 @@ fn write_bitmap( } } None => { - buffers.push(Schema::Buffer::new(*offset, 0)); + buffers.push(ipc::Buffer { + offset: *offset, + length: 0, + }); } } } @@ -694,7 +700,7 @@ fn write_bitmap( /// writes `bytes` to `arrow_data` updating `buffers` and `offset` and guaranteeing a 8 byte boundary. fn write_buffer( buffer: &[T], - buffers: &mut Vec, + buffers: &mut Vec, arrow_data: &mut Vec, offset: &mut i64, is_little_endian: bool, @@ -710,7 +716,10 @@ fn write_buffer( pad_buffer_to_8(arrow_data, arrow_data.len() - start); let total_len = (arrow_data.len() - start) as i64; - buffers.push(Schema::Buffer::new(*offset, total_len)); + buffers.push(ipc::Buffer { + offset: *offset, + length: total_len, + }); *offset += total_len; } @@ -798,7 +807,7 @@ fn _write_compressed_buffer( #[inline] fn write_buffer_from_iter>( buffer: I, - buffers: &mut Vec, + buffers: &mut Vec, arrow_data: &mut Vec, offset: &mut i64, is_little_endian: bool, @@ -815,6 +824,9 @@ fn write_buffer_from_iter>( pad_buffer_to_8(arrow_data, arrow_data.len() - start); let total_len = (arrow_data.len() - start) as i64; - buffers.push(Schema::Buffer::new(*offset, total_len)); + buffers.push(ipc::Buffer { + offset: *offset, + length: total_len, + }); *offset += total_len; } diff --git a/src/io/ipc/write/writer.rs b/src/io/ipc/write/writer.rs index 4d6cfe8976b..19546fc74aa 100644 --- a/src/io/ipc/write/writer.rs +++ b/src/io/ipc/write/writer.rs @@ -1,7 +1,6 @@ use std::{io::Write, sync::Arc}; -use arrow_format::ipc; -use arrow_format::ipc::flatbuffers::FlatBufferBuilder; +use arrow_format::ipc::planus::Builder; use super::{ super::IpcField, @@ -28,9 +27,9 @@ pub struct FileWriter { /// The number of bytes between each block of bytes, as an offset for random access block_offsets: usize, /// Dictionary blocks that will be written as part of the IPC footer - dictionary_blocks: Vec, + dictionary_blocks: Vec, /// Record blocks that will be written as part of the IPC footer - record_blocks: Vec, + record_blocks: Vec, /// Whether the writer footer has been written, and the writer is finished finished: bool, /// Keeps track of dictionaries that have been written @@ -105,21 +104,26 @@ impl FileWriter { &self.options, )?; + // add all dictionaries for encoded_dictionary in encoded_dictionaries { let (meta, data) = write_message(&mut self.writer, encoded_dictionary)?; - let block = ipc::File::Block::new(self.block_offsets as i64, meta as i32, data as i64); + let block = arrow_format::ipc::Block { + offset: self.block_offsets as i64, + meta_data_length: meta as i32, + body_length: data as i64, + }; self.dictionary_blocks.push(block); self.block_offsets += meta + data; } let (meta, data) = write_message(&mut self.writer, encoded_message)?; // add a record block for the footer - let block = ipc::File::Block::new( - self.block_offsets as i64, - meta as i32, // TODO: is this still applicable? - data as i64, - ); + let block = arrow_format::ipc::Block { + offset: self.block_offsets as i64, + meta_data_length: meta as i32, // TODO: is this still applicable? + body_length: data as i64, + }; self.record_blocks.push(block); self.block_offsets += meta + data; Ok(()) @@ -130,21 +134,17 @@ impl FileWriter { // write EOS write_continuation(&mut self.writer, 0)?; - let mut fbb = FlatBufferBuilder::new(); - let dictionaries = fbb.create_vector(&self.dictionary_blocks); - let record_batches = fbb.create_vector(&self.record_blocks); - let schema = schema::schema_to_fb_offset(&mut fbb, &self.schema, &self.ipc_fields); - - let root = { - let mut footer_builder = ipc::File::FooterBuilder::new(&mut fbb); - footer_builder.add_version(ipc::Schema::MetadataVersion::V5); - footer_builder.add_schema(schema); - footer_builder.add_dictionaries(dictionaries); - footer_builder.add_recordBatches(record_batches); - footer_builder.finish() + let schema = schema::serialize_schema(&self.schema, &self.ipc_fields); + + let root = arrow_format::ipc::Footer { + version: arrow_format::ipc::MetadataVersion::V5, + schema: Some(Box::new(schema)), + dictionaries: Some(std::mem::take(&mut self.dictionary_blocks)), + record_batches: Some(std::mem::take(&mut self.record_blocks)), + custom_metadata: None, }; - fbb.finish(root, None); - let footer_data = fbb.finished_data(); + let mut builder = Builder::new(); + let footer_data = builder.finish(&root, None); self.writer.write_all(footer_data)?; self.writer .write_all(&(footer_data.len() as i32).to_le_bytes())?; diff --git a/src/io/parquet/read/schema/metadata.rs b/src/io/parquet/read/schema/metadata.rs index 9683182def7..a54406bc6a8 100644 --- a/src/io/parquet/read/schema/metadata.rs +++ b/src/io/parquet/read/schema/metadata.rs @@ -1,12 +1,10 @@ use std::collections::HashMap; -use arrow_format::ipc; - pub use parquet2::metadata::KeyValue; use crate::datatypes::Schema; use crate::error::{ArrowError, Result}; -use crate::io::ipc::read::fb_to_schema; +use crate::io::ipc::read::deserialize_schema; use super::super::super::ARROW_SCHEMA_META_KEY; @@ -33,22 +31,7 @@ fn get_arrow_schema_from_metadata(encoded_meta: &str) -> Result { } else { bytes.as_slice() }; - match ipc::Message::root_as_message(slice) { - Ok(message) => message - .header_as_schema() - .ok_or_else(|| { - ArrowError::OutOfSpec("the message is not Arrow Schema".to_string()) - }) - .and_then(fb_to_schema) - .map(|x| x.0), - Err(err) => { - // The flatbuffers implementation returns an error on verification error. - Err(ArrowError::OutOfSpec(format!( - "Unable to get root as message stored in {}: {:?}", - ARROW_SCHEMA_META_KEY, err - ))) - } - } + deserialize_schema(slice).map(|x| x.0) } Err(err) => { // The C++ implementation returns an error if the schema can't be parsed.