diff --git a/src/datatypes/mod.rs b/src/datatypes/mod.rs index b9c4a4994c7..de3a3494226 100644 --- a/src/datatypes/mod.rs +++ b/src/datatypes/mod.rs @@ -9,26 +9,17 @@ pub use schema::Schema; pub(crate) use field::{get_extension, Extension, Metadata}; -/// The set of datatypes that are supported by this implementation of Apache Arrow. -/// -/// The Arrow specification on data types includes some more types. -/// See also [`Schema.fbs`](https://github.com/apache/arrow/blob/master/format/Schema.fbs) -/// for Arrow's specification. -/// -/// The variants of this enum include primitive fixed size types as well as parametric or -/// nested types. -/// Currently the Rust implementation supports the following nested types: -/// - `List` -/// - `Struct` -/// -/// Nested types can themselves be nested within other arrays. -/// For more information on these types please see -/// [the physical memory layout of Apache Arrow](https://arrow.apache.org/docs/format/Columnar.html#physical-memory-layout). +/// The set of supported logical types. +/// Each variant uniquely identifies a logical type, which define specific semantics to the data (e.g. how it should be represented). +/// A [`DataType`] has an unique corresponding [`PhysicalType`], obtained via [`DataType::to_physical_type`], +/// which uniquely identifies an in-memory representation of data. +/// The [`DataType::Extension`] is special in that it augments a [`DataType`] with metadata to support custom types. +/// Use `to_logical_type` to desugar such type and return its correspoding logical type. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum DataType { - /// Null type, representing an array without values or validity, only a length. + /// Null type Null, - /// A boolean datatype representing the values `true` and `false`. + /// `true` and `false`. Boolean, /// A signed 8-bit integer. Int8, @@ -223,6 +214,17 @@ impl DataType { Extension(_, key, _) => key.to_physical_type(), } } + + /// Returns `&self` for all but [`DataType::Extension`]. For [`DataType::Extension`], + /// (recursively) returns the inner [`DataType`]. + /// Never returns the variant [`DataType::Extension`]. + pub fn to_logical_type(&self) -> &DataType { + use DataType::*; + match self { + Extension(_, key, _) => key.to_logical_type(), + _ => self, + } + } } fn to_dictionary_index_type(data_type: &DataType) -> DictionaryIndexType { diff --git a/src/io/ipc/convert.rs b/src/io/ipc/convert.rs index b0765bf5fa8..c42b72e041f 100644 --- a/src/io/ipc/convert.rs +++ b/src/io/ipc/convert.rs @@ -333,6 +333,29 @@ fn write_metadata<'a>( } } +fn write_extension<'a>( + fbb: &mut FlatBufferBuilder<'a>, + name: &str, + metadata: &Option, + kv_vec: &mut Vec>>, +) { + // metadata + if let Some(metadata) = metadata { + let kv_args = ipc::KeyValueArgs { + key: Some(fbb.create_string("ARROW:extension:metadata")), + value: Some(fbb.create_string(metadata.as_str())), + }; + kv_vec.push(ipc::KeyValue::create(fbb, &kv_args)); + } + + // name + let kv_args = ipc::KeyValueArgs { + key: Some(fbb.create_string("ARROW:extension:name")), + value: Some(fbb.create_string(name)), + }; + kv_vec.push(ipc::KeyValue::create(fbb, &kv_args)); +} + /// Create an IPC Field from an Arrow Field pub(crate) fn build_field<'a>( fbb: &mut FlatBufferBuilder<'a>, @@ -341,39 +364,16 @@ pub(crate) fn build_field<'a>( // custom metadata. let mut kv_vec = vec![]; if let DataType::Extension(name, _, metadata) = field.data_type() { - // append extension information. - - // metadata - if let Some(metadata) = metadata { - let kv_args = ipc::KeyValueArgs { - key: Some(fbb.create_string("ARROW:extension:metadata")), - value: Some(fbb.create_string(metadata.as_str())), - }; - kv_vec.push(ipc::KeyValue::create(fbb, &kv_args)); - } - - // name - let kv_args = ipc::KeyValueArgs { - key: Some(fbb.create_string("ARROW:extension:name")), - value: Some(fbb.create_string(name.as_str())), - }; - kv_vec.push(ipc::KeyValue::create(fbb, &kv_args)); + write_extension(fbb, name, metadata, &mut kv_vec); } - if let Some(metadata) = field.metadata() { - if !metadata.is_empty() { - write_metadata(fbb, metadata, &mut kv_vec); - } - }; - let fb_metadata = if !kv_vec.is_empty() { - Some(fbb.create_vector(&kv_vec)) - } else { - None - }; let fb_field_name = fbb.create_string(field.name().as_str()); let field_type = get_fb_field_type(field.data_type(), field.is_nullable(), fbb); - let fb_dictionary = if let Dictionary(index_type, _) = field.data_type() { + let fb_dictionary = if let Dictionary(index_type, inner) = field.data_type() { + if let DataType::Extension(name, _, metadata) = inner.as_ref() { + write_extension(fbb, name, metadata, &mut kv_vec); + } Some(get_fb_dictionary( index_type, field @@ -388,6 +388,17 @@ pub(crate) fn build_field<'a>( None }; + if let Some(metadata) = field.metadata() { + if !metadata.is_empty() { + write_metadata(fbb, metadata, &mut kv_vec); + } + }; + let fb_metadata = if !kv_vec.is_empty() { + Some(fbb.create_vector(&kv_vec)) + } else { + None + }; + let mut field_builder = ipc::FieldBuilder::new(fbb); field_builder.add_name(fb_field_name); if let Some(dictionary) = fb_dictionary { diff --git a/src/io/parquet/read/binary/dictionary.rs b/src/io/parquet/read/binary/dictionary.rs index 31a64683cbe..3a8ce73eaf2 100644 --- a/src/io/parquet/read/binary/dictionary.rs +++ b/src/io/parquet/read/binary/dictionary.rs @@ -32,7 +32,7 @@ fn read_dict_optional( { let length = indices.len() + additional; values.extend_from_slice(dict.values()); - offsets.extend( + offsets.extend_from_trusted_len_iter( dict.offsets() .iter() .map(|x| O::from_usize(*x as usize).unwrap()), @@ -152,6 +152,10 @@ where )? } + if offsets.len() == 0 { + // the array is empty and thus we need to push the first offset ourselves. + offsets.push(O::zero()); + }; let keys = PrimitiveArray::from_data(K::DATA_TYPE, indices.into(), validity.into()); let data_type = DictionaryArray::::get_child(&data_type).clone(); let values = Arc::new(Utf8Array::from_data( diff --git a/src/io/parquet/read/fixed_size_binary.rs b/src/io/parquet/read/fixed_size_binary.rs index 43a4e3d1ff1..2a02524785b 100644 --- a/src/io/parquet/read/fixed_size_binary.rs +++ b/src/io/parquet/read/fixed_size_binary.rs @@ -22,12 +22,11 @@ pub(crate) fn read_dict_buffer( validity_buffer: &[u8], indices_buffer: &[u8], additional: usize, - size: i32, + size: usize, dict: &FixedLenByteArrayPageDict, values: &mut MutableBuffer, validity: &mut MutableBitmap, ) { - let size = size as usize; let length = values.len() * size + additional; let dict_values = dict.values(); @@ -75,11 +74,10 @@ pub(crate) fn read_optional( validity_buffer: &[u8], values_buffer: &[u8], additional: usize, - size: i32, + size: usize, values: &mut MutableBuffer, validity: &mut MutableBitmap, ) { - let size = size as usize; let length = values.len() * size + additional; assert_eq!(values_buffer.len() % size, 0); @@ -122,16 +120,16 @@ pub(crate) fn read_optional( pub(crate) fn read_required( buffer: &[u8], additional: usize, - size: i32, + size: usize, values: &mut MutableBuffer, ) { - assert_eq!(buffer.len(), additional * size as usize); + assert_eq!(buffer.len(), additional * size); values.extend_from_slice(buffer); } pub fn iter_to_array( mut iter: I, - size: i32, + data_type: DataType, metadata: &ColumnChunkMetaData, ) -> Result where @@ -139,8 +137,10 @@ where E: Clone, I: StreamingIterator>, { + let size = *FixedSizeBinaryArray::get_size(&data_type) as usize; + let capacity = metadata.num_values() as usize; - let mut values = MutableBuffer::::with_capacity(capacity * size as usize); + let mut values = MutableBuffer::::with_capacity(capacity * size); let mut validity = MutableBitmap::with_capacity(capacity); while let Some(page) = iter.next() { extend_from_page( @@ -153,7 +153,7 @@ where } Ok(FixedSizeBinaryArray::from_data( - DataType::FixedSizeBinary(size), + data_type, values.into(), validity.into(), )) @@ -161,7 +161,7 @@ where pub async fn stream_to_array( pages: I, - size: i32, + data_type: DataType, metadata: &ColumnChunkMetaData, ) -> Result where @@ -169,8 +169,10 @@ where E: Clone, I: Stream>, { + let size = *FixedSizeBinaryArray::get_size(&data_type) as usize; + let capacity = metadata.num_values() as usize; - let mut values = MutableBuffer::::with_capacity(capacity * size as usize); + let mut values = MutableBuffer::::with_capacity(capacity * size); let mut validity = MutableBitmap::with_capacity(capacity); pin_mut!(pages); // needed for iteration @@ -186,7 +188,7 @@ where } Ok(FixedSizeBinaryArray::from_data( - DataType::FixedSizeBinary(size), + data_type, values.into(), validity.into(), )) @@ -194,7 +196,7 @@ where pub(crate) fn extend_from_page( page: &DataPage, - size: i32, + size: usize, descriptor: &ColumnDescriptor, values: &mut MutableBuffer, validity: &mut MutableBitmap, diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index 212677e0f52..d3855847d72 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -93,7 +93,7 @@ fn dict_read< panic!() }; - match values_data_type { + match values_data_type.to_logical_type() { UInt8 => primitive::iter_to_dict_array::( iter, metadata, @@ -169,7 +169,7 @@ pub fn page_iter_to_array< data_type: DataType, ) -> Result> { use DataType::*; - match data_type { + match data_type.to_logical_type() { // INT32 UInt8 => primitive::iter_to_array(iter, metadata, data_type, |x: i32| x as u8), UInt16 => primitive::iter_to_array(iter, metadata, data_type, |x: i32| x as u16), @@ -207,7 +207,7 @@ pub fn page_iter_to_array< Binary | Utf8 => binary::iter_to_array::(iter, metadata, &data_type), LargeBinary | LargeUtf8 => binary::iter_to_array::(iter, metadata, &data_type), FixedSizeBinary(size) => Ok(Box::new(fixed_size_binary::iter_to_array( - iter, size, metadata, + iter, data_type, metadata, )?)), List(ref inner) => match inner.data_type() { @@ -247,7 +247,7 @@ pub fn page_iter_to_array< binary::iter_to_array_nested::(iter, metadata, data_type) } other => Err(ArrowError::NotYetImplemented(format!( - "The conversion of {:?} to arrow still not implemented", + "Reading {:?} from parquet still not implemented", other ))), }, @@ -265,7 +265,7 @@ pub fn page_iter_to_array< }, other => Err(ArrowError::NotYetImplemented(format!( - "The conversion of {:?} to arrow still not implemented", + "Reading {:?} from parquet still not implemented", other ))), } @@ -278,7 +278,7 @@ pub async fn page_stream_to_array Result> { use DataType::*; - match data_type { + match data_type.to_logical_type() { // INT32 UInt8 => primitive::stream_to_array(pages, metadata, data_type, |x: i32| x as u8).await, UInt16 => primitive::stream_to_array(pages, metadata, data_type, |x: i32| x as u16).await, @@ -321,7 +321,7 @@ pub async fn page_stream_to_array(pages, metadata, &data_type).await } FixedSizeBinary(size) => Ok(Box::new( - fixed_size_binary::stream_to_array(pages, size, metadata).await?, + fixed_size_binary::stream_to_array(pages, data_type, metadata).await?, )), other => Err(ArrowError::NotYetImplemented(format!( "Async conversion of {:?}", diff --git a/src/io/parquet/write/dictionary.rs b/src/io/parquet/write/dictionary.rs index c379040d8e4..7fa69a0a951 100644 --- a/src/io/parquet/write/dictionary.rs +++ b/src/io/parquet/write/dictionary.rs @@ -136,7 +136,7 @@ where match encoding { Encoding::PlainDictionary | Encoding::RleDictionary => { // write DictPage - let dict_page = match array.values().data_type() { + let dict_page = match array.values().data_type().to_logical_type() { DataType::Int8 => dyn_prim!(i8, i32, array), DataType::Int16 => dyn_prim!(i16, i32, array), DataType::Int32 | DataType::Date32 | DataType::Time32(_) => { diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index b8102aed8c4..b22e3e6db7a 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -141,7 +141,7 @@ pub fn array_to_page( ))); } - match data_type { + match data_type.to_logical_type() { DataType::Boolean => { boolean::array_to_page(array.as_any().downcast_ref().unwrap(), options, descriptor) } diff --git a/src/io/parquet/write/schema.rs b/src/io/parquet/write/schema.rs index efd03955cf4..0781f7e1eb3 100644 --- a/src/io/parquet/write/schema.rs +++ b/src/io/parquet/write/schema.rs @@ -45,7 +45,7 @@ pub fn to_parquet_type(field: &Field) -> Result { Repetition::Required }; // create type from field - match field.data_type() { + match field.data_type().to_logical_type() { DataType::Null => Ok(ParquetType::try_from_primitive( name, PhysicalType::Int32, diff --git a/tests/it/io/ipc/write/file.rs b/tests/it/io/ipc/write/file.rs index cc7d292b223..449a057eeca 100644 --- a/tests/it/io/ipc/write/file.rs +++ b/tests/it/io/ipc/write/file.rs @@ -162,6 +162,12 @@ fn write_100_decimal() -> Result<()> { test_file("1.0.0-bigendian", "generated_decimal") } +#[test] +fn write_100_extension() -> Result<()> { + test_file("1.0.0-littleendian", "generated_extension")?; + test_file("1.0.0-bigendian", "generated_extension") +} + #[test] fn write_100_union() -> Result<()> { test_file("1.0.0-littleendian", "generated_union")?; diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 742fcfdcbf9..a84219316dd 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -458,6 +458,12 @@ fn roundtrip_100_dict() -> Result<()> { test_file("1.0.0-bigendian", "generated_dictionary") } +#[test] +fn roundtrip_100_extension() -> Result<()> { + test_file("1.0.0-littleendian", "generated_extension")?; + test_file("1.0.0-bigendian", "generated_extension") +} + /// Tests that when arrow-specific types (Duration and LargeUtf8) are written to parquet, we can rountrip its /// logical types. #[test]