diff --git a/src/io/flight/mod.rs b/src/io/flight/mod.rs index ac4f0773f34..75b270c7c68 100644 --- a/src/io/flight/mod.rs +++ b/src/io/flight/mod.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::convert::TryFrom; use std::sync::Arc; @@ -122,7 +123,7 @@ pub fn deserialize_batch( data: &FlightData, schema: Arc, is_little_endian: bool, - dictionaries_by_field: &[Option>], + dictionaries: &HashMap>, ) -> Result { // check that the data_header is a record batch message let message = ipc::Message::root_as_message(&data.data_header[..]) @@ -141,7 +142,7 @@ pub fn deserialize_batch( schema.clone(), None, is_little_endian, - dictionaries_by_field, + dictionaries, ipc::Schema::MetadataVersion::V5, &mut reader, 0, diff --git a/src/io/ipc/read/array/binary.rs b/src/io/ipc/read/array/binary.rs index cae97b1a1eb..485e2451637 100644 --- a/src/io/ipc/read/array/binary.rs +++ b/src/io/ipc/read/array/binary.rs @@ -25,7 +25,7 @@ pub fn read_binary( where Vec: TryInto + TryInto<::Bytes>, { - let field_node = field_nodes.pop_front().unwrap().0; + let field_node = field_nodes.pop_front().unwrap(); let validity = read_validity( buffers, diff --git a/src/io/ipc/read/array/boolean.rs b/src/io/ipc/read/array/boolean.rs index d9ea780d170..bab7b4250ee 100644 --- a/src/io/ipc/read/array/boolean.rs +++ b/src/io/ipc/read/array/boolean.rs @@ -19,7 +19,7 @@ pub fn read_boolean( is_little_endian: bool, compression: Option, ) -> Result { - let field_node = field_nodes.pop_front().unwrap().0; + let field_node = field_nodes.pop_front().unwrap(); let length = field_node.length() as usize; let validity = read_validity( diff --git a/src/io/ipc/read/array/dictionary.rs b/src/io/ipc/read/array/dictionary.rs index 0357457d014..021ca65913a 100644 --- a/src/io/ipc/read/array/dictionary.rs +++ b/src/io/ipc/read/array/dictionary.rs @@ -1,19 +1,24 @@ -use std::collections::VecDeque; +use std::collections::{HashMap, VecDeque}; use std::convert::TryInto; use std::io::{Read, Seek}; +use std::sync::Arc; use arrow_format::ipc; -use crate::array::{DictionaryArray, DictionaryKey}; +use crate::array::{Array, DictionaryArray, DictionaryKey}; +use crate::datatypes::Field; use crate::error::Result; use super::super::deserialize::Node; use super::{read_primitive, skip_primitive}; +#[allow(clippy::too_many_arguments)] pub fn read_dictionary( field_nodes: &mut VecDeque, + field: &Field, buffers: &mut VecDeque<&ipc::Schema::Buffer>, reader: &mut R, + dictionaries: &HashMap>, block_offset: u64, compression: Option, is_little_endian: bool, @@ -21,7 +26,10 @@ pub fn read_dictionary( where Vec: TryInto, { - let values = field_nodes.front().unwrap().1.as_ref().unwrap(); + let values = dictionaries + .get(&(field.dict_id().unwrap() as usize)) + .unwrap() + .clone(); let keys = read_primitive( field_nodes, @@ -33,7 +41,7 @@ where compression, )?; - Ok(DictionaryArray::::from_data(keys, values.clone())) + Ok(DictionaryArray::::from_data(keys, values)) } pub fn skip_dictionary( diff --git a/src/io/ipc/read/array/fixed_size_binary.rs b/src/io/ipc/read/array/fixed_size_binary.rs index 094b34d2f78..b2547016e4b 100644 --- a/src/io/ipc/read/array/fixed_size_binary.rs +++ b/src/io/ipc/read/array/fixed_size_binary.rs @@ -19,7 +19,7 @@ pub fn read_fixed_size_binary( is_little_endian: bool, compression: Option, ) -> Result { - let field_node = field_nodes.pop_front().unwrap().0; + let field_node = field_nodes.pop_front().unwrap(); let validity = read_validity( buffers, diff --git a/src/io/ipc/read/array/fixed_size_list.rs b/src/io/ipc/read/array/fixed_size_list.rs index 3213eecc296..274b50fe490 100644 --- a/src/io/ipc/read/array/fixed_size_list.rs +++ b/src/io/ipc/read/array/fixed_size_list.rs @@ -1,9 +1,10 @@ -use std::collections::VecDeque; +use std::collections::{HashMap, VecDeque}; use std::io::{Read, Seek}; +use std::sync::Arc; use arrow_format::ipc; -use crate::array::FixedSizeListArray; +use crate::array::{Array, FixedSizeListArray}; use crate::datatypes::DataType; use crate::error::Result; @@ -16,12 +17,13 @@ pub fn read_fixed_size_list( data_type: DataType, buffers: &mut VecDeque<&ipc::Schema::Buffer>, reader: &mut R, + dictionaries: &HashMap>, block_offset: u64, is_little_endian: bool, compression: Option, version: ipc::Schema::MetadataVersion, ) -> Result { - let field_node = field_nodes.pop_front().unwrap().0; + let field_node = field_nodes.pop_front().unwrap(); let validity = read_validity( buffers, @@ -32,13 +34,14 @@ pub fn read_fixed_size_list( compression, )?; - let (value_data_type, _) = FixedSizeListArray::get_child_and_size(&data_type); + let (field, _) = FixedSizeListArray::get_child_and_size(&data_type); let values = read( field_nodes, - value_data_type.data_type().clone(), + field, buffers, reader, + dictionaries, block_offset, is_little_endian, compression, diff --git a/src/io/ipc/read/array/list.rs b/src/io/ipc/read/array/list.rs index ab55bb7bdba..65495cd3aa3 100644 --- a/src/io/ipc/read/array/list.rs +++ b/src/io/ipc/read/array/list.rs @@ -1,10 +1,11 @@ -use std::collections::VecDeque; +use std::collections::{HashMap, VecDeque}; use std::convert::TryInto; use std::io::{Read, Seek}; +use std::sync::Arc; use arrow_format::ipc; -use crate::array::{ListArray, Offset}; +use crate::array::{Array, ListArray, Offset}; use crate::buffer::Buffer; use crate::datatypes::DataType; use crate::error::Result; @@ -18,6 +19,7 @@ pub fn read_list( data_type: DataType, buffers: &mut VecDeque<&ipc::Schema::Buffer>, reader: &mut R, + dictionaries: &HashMap>, block_offset: u64, is_little_endian: bool, compression: Option, @@ -26,7 +28,7 @@ pub fn read_list( where Vec: TryInto, { - let field_node = field_nodes.pop_front().unwrap().0; + let field_node = field_nodes.pop_front().unwrap(); let validity = read_validity( buffers, @@ -48,13 +50,14 @@ where // Older versions of the IPC format sometimes do not report an offset .or_else(|_| Result::Ok(Buffer::::from(&[O::default()])))?; - let value_data_type = ListArray::::get_child_type(&data_type).clone(); + let field = ListArray::::get_child_field(&data_type); let values = read( field_nodes, - value_data_type, + field, buffers, reader, + dictionaries, block_offset, is_little_endian, compression, diff --git a/src/io/ipc/read/array/map.rs b/src/io/ipc/read/array/map.rs index a11151d6ec9..bbd518e8d4b 100644 --- a/src/io/ipc/read/array/map.rs +++ b/src/io/ipc/read/array/map.rs @@ -1,9 +1,10 @@ -use std::collections::VecDeque; +use std::collections::{HashMap, VecDeque}; use std::io::{Read, Seek}; +use std::sync::Arc; use arrow_format::ipc; -use crate::array::MapArray; +use crate::array::{Array, MapArray}; use crate::buffer::Buffer; use crate::datatypes::DataType; use crate::error::Result; @@ -17,12 +18,13 @@ pub fn read_map( data_type: DataType, buffers: &mut VecDeque<&ipc::Schema::Buffer>, reader: &mut R, + dictionaries: &HashMap>, block_offset: u64, is_little_endian: bool, compression: Option, version: ipc::Schema::MetadataVersion, ) -> Result { - let field_node = field_nodes.pop_front().unwrap().0; + let field_node = field_nodes.pop_front().unwrap(); let validity = read_validity( buffers, @@ -44,13 +46,14 @@ pub fn read_map( // Older versions of the IPC format sometimes do not report an offset .or_else(|_| Result::Ok(Buffer::::from(&[0i32])))?; - let value_data_type = MapArray::get_field(&data_type).data_type().clone(); + let field = MapArray::get_field(&data_type); let field = read( field_nodes, - value_data_type, + field, buffers, reader, + dictionaries, block_offset, is_little_endian, compression, diff --git a/src/io/ipc/read/array/null.rs b/src/io/ipc/read/array/null.rs index d1cdcb0e499..2885a620ffd 100644 --- a/src/io/ipc/read/array/null.rs +++ b/src/io/ipc/read/array/null.rs @@ -7,7 +7,7 @@ use super::super::deserialize::Node; pub fn read_null(field_nodes: &mut VecDeque, data_type: DataType) -> NullArray { NullArray::from_data( data_type, - field_nodes.pop_front().unwrap().0.length() as usize, + field_nodes.pop_front().unwrap().length() as usize, ) } diff --git a/src/io/ipc/read/array/primitive.rs b/src/io/ipc/read/array/primitive.rs index df851e2c4ff..f43eb8f2703 100644 --- a/src/io/ipc/read/array/primitive.rs +++ b/src/io/ipc/read/array/primitive.rs @@ -22,7 +22,7 @@ pub fn read_primitive( where Vec: TryInto, { - let field_node = field_nodes.pop_front().unwrap().0; + let field_node = field_nodes.pop_front().unwrap(); let validity = read_validity( buffers, diff --git a/src/io/ipc/read/array/struct_.rs b/src/io/ipc/read/array/struct_.rs index 312a68ea262..775774291d3 100644 --- a/src/io/ipc/read/array/struct_.rs +++ b/src/io/ipc/read/array/struct_.rs @@ -1,9 +1,10 @@ -use std::collections::VecDeque; +use std::collections::{HashMap, VecDeque}; use std::io::{Read, Seek}; +use std::sync::Arc; use arrow_format::ipc; -use crate::array::StructArray; +use crate::array::{Array, StructArray}; use crate::datatypes::DataType; use crate::error::Result; @@ -16,12 +17,13 @@ pub fn read_struct( data_type: DataType, buffers: &mut VecDeque<&ipc::Schema::Buffer>, reader: &mut R, + dictionaries: &HashMap>, block_offset: u64, is_little_endian: bool, compression: Option, version: ipc::Schema::MetadataVersion, ) -> Result { - let field_node = field_nodes.pop_front().unwrap().0; + let field_node = field_nodes.pop_front().unwrap(); let validity = read_validity( buffers, @@ -39,9 +41,10 @@ pub fn read_struct( .map(|field| { read( field_nodes, - field.data_type().clone(), + field, buffers, reader, + dictionaries, block_offset, is_little_endian, compression, diff --git a/src/io/ipc/read/array/union.rs b/src/io/ipc/read/array/union.rs index 66cab751c42..87afdfb582e 100644 --- a/src/io/ipc/read/array/union.rs +++ b/src/io/ipc/read/array/union.rs @@ -1,9 +1,10 @@ -use std::collections::VecDeque; +use std::collections::{HashMap, VecDeque}; use std::io::{Read, Seek}; +use std::sync::Arc; use arrow_format::ipc; -use crate::array::UnionArray; +use crate::array::{Array, UnionArray}; use crate::datatypes::DataType; use crate::datatypes::UnionMode::Dense; use crate::error::Result; @@ -17,12 +18,13 @@ pub fn read_union( data_type: DataType, buffers: &mut VecDeque<&ipc::Schema::Buffer>, reader: &mut R, + dictionaries: &HashMap>, block_offset: u64, is_little_endian: bool, compression: Option, version: ipc::Schema::MetadataVersion, ) -> Result { - let field_node = field_nodes.pop_front().unwrap().0; + let field_node = field_nodes.pop_front().unwrap(); if version != ipc::Schema::MetadataVersion::V5 { let _ = buffers.pop_front().unwrap(); @@ -61,9 +63,10 @@ pub fn read_union( .map(|field| { read( field_nodes, - field.data_type().clone(), + field, buffers, reader, + dictionaries, block_offset, is_little_endian, compression, diff --git a/src/io/ipc/read/array/utf8.rs b/src/io/ipc/read/array/utf8.rs index d11ae7a12fa..93d024b9f13 100644 --- a/src/io/ipc/read/array/utf8.rs +++ b/src/io/ipc/read/array/utf8.rs @@ -25,7 +25,7 @@ pub fn read_utf8( where Vec: TryInto + TryInto<::Bytes>, { - let field_node = field_nodes.pop_front().unwrap().0; + let field_node = field_nodes.pop_front().unwrap(); let validity = read_validity( buffers, diff --git a/src/io/ipc/read/common.rs b/src/io/ipc/read/common.rs index aa0fa7b6490..b8846636168 100644 --- a/src/io/ipc/read/common.rs +++ b/src/io/ipc/read/common.rs @@ -98,7 +98,7 @@ pub fn read_record_batch( schema: Arc, projection: Option<(&[usize], Arc)>, is_little_endian: bool, - dictionaries: &[Option], + dictionaries: &HashMap>, version: MetadataVersion, reader: &mut R, block_offset: u64, @@ -111,13 +111,7 @@ pub fn read_record_batch( ArrowError::Ipc("Unable to get field nodes from IPC RecordBatch".to_string()) })?; - // This is a bug fix: we should have one dictionary per node, not schema field - let dictionaries = dictionaries.iter().chain(std::iter::repeat(&None)); - - let mut field_nodes = field_nodes - .iter() - .zip(dictionaries) - .collect::>(); + let mut field_nodes = field_nodes.iter().collect::>(); let (schema, columns) = if let Some(projection) = projection { let projected_schema = projection.1.clone(); @@ -128,9 +122,10 @@ pub fn read_record_batch( .map(|maybe_field| match maybe_field { ProjectionResult::Selected(field) => Some(read( &mut field_nodes, - field.data_type().clone(), + field, &mut buffers, reader, + dictionaries, block_offset, is_little_endian, batch.compression(), @@ -151,9 +146,10 @@ pub fn read_record_batch( .map(|field| { read( &mut field_nodes, - field.data_type().clone(), + field, &mut buffers, reader, + dictionaries, block_offset, is_little_endian, batch.compression(), @@ -167,12 +163,12 @@ pub fn read_record_batch( } /// Read the dictionary from the buffer and provided metadata, -/// updating the `dictionaries_by_field` with the resulting dictionary +/// updating the `dictionaries` with the resulting dictionary pub fn read_dictionary( batch: ipc::Message::DictionaryBatch, schema: &Schema, is_little_endian: bool, - dictionaries_by_field: &mut [Option], + dictionaries: &mut HashMap>, reader: &mut R, block_offset: u64, ) -> Result<()> { @@ -204,7 +200,7 @@ pub fn read_dictionary( schema, None, is_little_endian, - dictionaries_by_field, + dictionaries, MetadataVersion::V5, reader, block_offset, @@ -217,16 +213,7 @@ pub fn read_dictionary( ArrowError::InvalidArgumentError("dictionary id not found in schema".to_string()) })?; - // for all fields with this dictionary id, update the dictionaries vector - // in the reader. Note that a dictionary batch may be shared between many fields. - // We don't currently record the isOrdered field. This could be general - // attributes of arrays. - for (i, field) in schema.fields().iter().enumerate() { - if field.dict_id() == Some(id) { - // Add (possibly multiple) array refs to the dictionaries array. - dictionaries_by_field[i] = Some(dictionary_values.clone()); - } - } + dictionaries.insert(id as usize, dictionary_values); Ok(()) } diff --git a/src/io/ipc/read/deserialize.rs b/src/io/ipc/read/deserialize.rs index 1d243e5256d..f533a63af40 100644 --- a/src/io/ipc/read/deserialize.rs +++ b/src/io/ipc/read/deserialize.rs @@ -1,9 +1,4 @@ -//! Arrow IPC File and Stream Readers -//! -//! The `FileReader` and `StreamReader` have similar interfaces, -//! however the `FileReader` expects a reader that supports `Seek`ing - -use std::collections::VecDeque; +use std::collections::{HashMap, VecDeque}; use std::{ io::{Read, Seek}, sync::Arc, @@ -13,25 +8,28 @@ use arrow_format::ipc; use arrow_format::ipc::{Message::BodyCompression, Schema::MetadataVersion}; use crate::array::*; -use crate::datatypes::{DataType, PhysicalType}; +use crate::datatypes::{DataType, Field, PhysicalType}; use crate::error::Result; use super::array::*; -pub type Node<'a> = (&'a ipc::Message::FieldNode, &'a Option>); +pub type Node<'a> = &'a ipc::Message::FieldNode; #[allow(clippy::too_many_arguments)] pub fn read( field_nodes: &mut VecDeque, - data_type: DataType, + field: &Field, buffers: &mut VecDeque<&ipc::Schema::Buffer>, reader: &mut R, + dictionaries: &HashMap>, block_offset: u64, is_little_endian: bool, compression: Option, version: MetadataVersion, ) -> Result> { use PhysicalType::*; + let data_type = field.data_type().clone(); + match data_type.to_physical_type() { Null => { let array = read_null(field_nodes, data_type); @@ -124,6 +122,7 @@ pub fn read( data_type, buffers, reader, + dictionaries, block_offset, is_little_endian, compression, @@ -135,6 +134,7 @@ pub fn read( data_type, buffers, reader, + dictionaries, block_offset, is_little_endian, compression, @@ -146,6 +146,7 @@ pub fn read( data_type, buffers, reader, + dictionaries, block_offset, is_little_endian, compression, @@ -157,6 +158,7 @@ pub fn read( data_type, buffers, reader, + dictionaries, block_offset, is_little_endian, compression, @@ -167,8 +169,10 @@ pub fn read( match_integer_type!(key_type, |$T| { read_dictionary::<$T, _>( field_nodes, + field, buffers, reader, + dictionaries, block_offset, compression, is_little_endian, @@ -181,6 +185,7 @@ pub fn read( data_type, buffers, reader, + dictionaries, block_offset, is_little_endian, compression, @@ -192,6 +197,7 @@ pub fn read( data_type, buffers, reader, + dictionaries, block_offset, is_little_endian, compression, diff --git a/src/io/ipc/read/reader.rs b/src/io/ipc/read/reader.rs index 6f2707a6412..0f025e4451c 100644 --- a/src/io/ipc/read/reader.rs +++ b/src/io/ipc/read/reader.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashMap; use std::io::{Read, Seek, SeekFrom}; use std::sync::Arc; @@ -30,8 +31,6 @@ use super::super::convert; use super::super::{ARROW_MAGIC, CONTINUATION_MARKER}; use super::common::*; -type ArrayRef = Arc; - #[derive(Debug, Clone)] pub struct FileMetadata { /// The schema that is read from the file header @@ -45,10 +44,8 @@ pub struct FileMetadata { /// The total number of blocks, which may contain record batches and other types total_blocks: usize, - /// Optional dictionaries for each schema field. - /// - /// Dictionaries may be appended to in the streaming format. - dictionaries_by_field: Vec>, + /// Dictionaries associated to each dict_id + dictionaries: HashMap>, /// FileMetadata version version: ipc::Schema::MetadataVersion, @@ -122,8 +119,8 @@ pub fn read_file_metadata(reader: &mut R) -> Result(reader: &mut R) -> Result(reader: &mut R) -> Result( metadata.schema.clone(), projection, metadata.is_little_endian, - &metadata.dictionaries_by_field, + &metadata.dictionaries, metadata.version, reader, block.offset() as u64 + block.metaDataLength() as u64, diff --git a/src/io/ipc/read/stream.rs b/src/io/ipc/read/stream.rs index bd412e7b213..d915d9fa79a 100644 --- a/src/io/ipc/read/stream.rs +++ b/src/io/ipc/read/stream.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashMap; use std::io::Read; use std::sync::Arc; @@ -30,8 +31,6 @@ use super::super::convert; use super::super::CONTINUATION_MARKER; use super::common::*; -type ArrayRef = Arc; - #[derive(Debug)] pub struct StreamMetadata { /// The schema that is read from the stream's first message @@ -113,7 +112,7 @@ impl StreamState { pub fn read_next( reader: &mut R, metadata: &StreamMetadata, - dictionaries_by_field: &mut Vec>, + dictionaries: &mut HashMap>, ) -> Result> { // determine metadata length let mut meta_size: [u8; 4] = [0; 4]; @@ -172,7 +171,7 @@ pub fn read_next( metadata.schema.clone(), None, metadata.is_little_endian, - dictionaries_by_field, + dictionaries, metadata.version, &mut reader, 0, @@ -193,13 +192,13 @@ pub fn read_next( batch, &metadata.schema, metadata.is_little_endian, - dictionaries_by_field, + dictionaries, &mut dict_reader, 0, )?; // read the next message until we encounter a RecordBatch - read_next(reader, metadata, dictionaries_by_field) + read_next(reader, metadata, dictionaries) } ipc::Message::MessageHeader::NONE => Ok(Some(StreamState::Waiting)), t => Err(ArrowError::Ipc(format!( @@ -218,7 +217,7 @@ pub fn read_next( pub struct StreamReader { reader: R, metadata: StreamMetadata, - dictionaries_by_field: Vec>, + dictionaries: HashMap>, finished: bool, } @@ -229,11 +228,10 @@ impl StreamReader { /// encounter a schema. /// To check if the reader is done, use `is_finished(self)` pub fn new(reader: R, metadata: StreamMetadata) -> Self { - let fields = metadata.schema.fields().len(); Self { reader, metadata, - dictionaries_by_field: vec![None; fields], + dictionaries: Default::default(), finished: false, } } @@ -252,11 +250,7 @@ impl StreamReader { if self.finished { return Ok(None); } - let batch = read_next( - &mut self.reader, - &self.metadata, - &mut self.dictionaries_by_field, - )?; + let batch = read_next(&mut self.reader, &self.metadata, &mut self.dictionaries)?; if batch.is_none() { self.finished = true; } diff --git a/tests/it/io/ipc/read/file.rs b/tests/it/io/ipc/read/file.rs index c869b385857..602c4151a9a 100644 --- a/tests/it/io/ipc/read/file.rs +++ b/tests/it/io/ipc/read/file.rs @@ -134,6 +134,12 @@ fn read_generated_100_non_canonical_map() -> Result<()> { test_file("1.0.0-bigendian", "generated_map_non_canonical") } +#[test] +fn read_generated_100_nested_dictionary() -> Result<()> { + test_file("1.0.0-littleendian", "generated_nested_dictionary")?; + test_file("1.0.0-bigendian", "generated_nested_dictionary") +} + #[test] fn read_generated_017_union() -> Result<()> { test_file("0.17.1", "generated_union")