Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added support for roundtrip extension types in parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Sep 11, 2021
1 parent 30bada9 commit 357ad2b
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 41 deletions.
36 changes: 19 additions & 17 deletions src/datatypes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,17 @@ pub use schema::Schema;

pub(crate) use field::{get_extension, Extension, Metadata};

/// The set of datatypes that are supported by this implementation of Apache Arrow.
///
/// The Arrow specification on data types includes some more types.
/// See also [`Schema.fbs`](https://github.com/apache/arrow/blob/master/format/Schema.fbs)
/// for Arrow's specification.
///
/// The variants of this enum include primitive fixed size types as well as parametric or
/// nested types.
/// Currently the Rust implementation supports the following nested types:
/// - `List<T>`
/// - `Struct<T, U, V, ...>`
///
/// Nested types can themselves be nested within other arrays.
/// For more information on these types please see
/// [the physical memory layout of Apache Arrow](https://arrow.apache.org/docs/format/Columnar.html#physical-memory-layout).
/// The set of supported logical types.
/// Each variant uniquely identifies a logical type, which define specific semantics to the data (e.g. how it should be represented).
/// A [`DataType`] has an unique corresponding [`PhysicalType`], obtained via [`DataType::to_physical_type`],
/// which uniquely identifies an in-memory representation of data.
/// The [`DataType::Extension`] is special in that it augments a [`DataType`] with metadata to support custom types.
/// Use `to_logical_type` to desugar such type and return its correspoding logical type.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum DataType {
/// Null type, representing an array without values or validity, only a length.
/// Null type
Null,
/// A boolean datatype representing the values `true` and `false`.
/// `true` and `false`.
Boolean,
/// A signed 8-bit integer.
Int8,
Expand Down Expand Up @@ -223,6 +214,17 @@ impl DataType {
Extension(_, key, _) => key.to_physical_type(),
}
}

/// Returns `&self` for all but [`DataType::Extension`]. For [`DataType::Extension`],
/// (recursively) returns the inner [`DataType`].
/// Never returns the variant [`DataType::Extension`].
pub fn to_logical_type(&self) -> &DataType {
use DataType::*;
match self {
Extension(_, key, _) => key.to_logical_type(),
_ => self,
}
}
}

fn to_dictionary_index_type(data_type: &DataType) -> DictionaryIndexType {
Expand Down
6 changes: 5 additions & 1 deletion src/io/parquet/read/binary/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ fn read_dict_optional<K, O>(
{
let length = indices.len() + additional;
values.extend_from_slice(dict.values());
offsets.extend(
offsets.extend_from_trusted_len_iter(
dict.offsets()
.iter()
.map(|x| O::from_usize(*x as usize).unwrap()),
Expand Down Expand Up @@ -152,6 +152,10 @@ where
)?
}

if offsets.len() == 0 {
// the array is empty and thus we need to push the first offset ourselves.
offsets.push(O::zero());
};
let keys = PrimitiveArray::from_data(K::DATA_TYPE, indices.into(), validity.into());
let data_type = DictionaryArray::<K>::get_child(&data_type).clone();
let values = Arc::new(Utf8Array::from_data(
Expand Down
28 changes: 15 additions & 13 deletions src/io/parquet/read/fixed_size_binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@ pub(crate) fn read_dict_buffer(
validity_buffer: &[u8],
indices_buffer: &[u8],
additional: usize,
size: i32,
size: usize,
dict: &FixedLenByteArrayPageDict,
values: &mut MutableBuffer<u8>,
validity: &mut MutableBitmap,
) {
let size = size as usize;
let length = values.len() * size + additional;
let dict_values = dict.values();

Expand Down Expand Up @@ -75,11 +74,10 @@ pub(crate) fn read_optional(
validity_buffer: &[u8],
values_buffer: &[u8],
additional: usize,
size: i32,
size: usize,
values: &mut MutableBuffer<u8>,
validity: &mut MutableBitmap,
) {
let size = size as usize;
let length = values.len() * size + additional;

assert_eq!(values_buffer.len() % size, 0);
Expand Down Expand Up @@ -122,25 +120,27 @@ pub(crate) fn read_optional(
pub(crate) fn read_required(
buffer: &[u8],
additional: usize,
size: i32,
size: usize,
values: &mut MutableBuffer<u8>,
) {
assert_eq!(buffer.len(), additional * size as usize);
assert_eq!(buffer.len(), additional * size);
values.extend_from_slice(buffer);
}

pub fn iter_to_array<I, E>(
mut iter: I,
size: i32,
data_type: DataType,
metadata: &ColumnChunkMetaData,
) -> Result<FixedSizeBinaryArray>
where
ArrowError: From<E>,
E: Clone,
I: StreamingIterator<Item = std::result::Result<DataPage, E>>,
{
let size = *FixedSizeBinaryArray::get_size(&data_type) as usize;

let capacity = metadata.num_values() as usize;
let mut values = MutableBuffer::<u8>::with_capacity(capacity * size as usize);
let mut values = MutableBuffer::<u8>::with_capacity(capacity * size);
let mut validity = MutableBitmap::with_capacity(capacity);
while let Some(page) = iter.next() {
extend_from_page(
Expand All @@ -153,24 +153,26 @@ where
}

Ok(FixedSizeBinaryArray::from_data(
DataType::FixedSizeBinary(size),
data_type,
values.into(),
validity.into(),
))
}

pub async fn stream_to_array<I, E>(
pages: I,
size: i32,
data_type: DataType,
metadata: &ColumnChunkMetaData,
) -> Result<FixedSizeBinaryArray>
where
ArrowError: From<E>,
E: Clone,
I: Stream<Item = std::result::Result<DataPage, E>>,
{
let size = *FixedSizeBinaryArray::get_size(&data_type) as usize;

let capacity = metadata.num_values() as usize;
let mut values = MutableBuffer::<u8>::with_capacity(capacity * size as usize);
let mut values = MutableBuffer::<u8>::with_capacity(capacity * size);
let mut validity = MutableBitmap::with_capacity(capacity);

pin_mut!(pages); // needed for iteration
Expand All @@ -186,15 +188,15 @@ where
}

Ok(FixedSizeBinaryArray::from_data(
DataType::FixedSizeBinary(size),
data_type,
values.into(),
validity.into(),
))
}

pub(crate) fn extend_from_page(
page: &DataPage,
size: i32,
size: usize,
descriptor: &ColumnDescriptor,
values: &mut MutableBuffer<u8>,
validity: &mut MutableBitmap,
Expand Down
14 changes: 7 additions & 7 deletions src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ fn dict_read<
panic!()
};

match values_data_type {
match values_data_type.to_logical_type() {
UInt8 => primitive::iter_to_dict_array::<K, _, _, _, _, _>(
iter,
metadata,
Expand Down Expand Up @@ -169,7 +169,7 @@ pub fn page_iter_to_array<
data_type: DataType,
) -> Result<Box<dyn Array>> {
use DataType::*;
match data_type {
match data_type.to_logical_type() {
// INT32
UInt8 => primitive::iter_to_array(iter, metadata, data_type, |x: i32| x as u8),
UInt16 => primitive::iter_to_array(iter, metadata, data_type, |x: i32| x as u16),
Expand Down Expand Up @@ -207,7 +207,7 @@ pub fn page_iter_to_array<
Binary | Utf8 => binary::iter_to_array::<i32, _, _>(iter, metadata, &data_type),
LargeBinary | LargeUtf8 => binary::iter_to_array::<i64, _, _>(iter, metadata, &data_type),
FixedSizeBinary(size) => Ok(Box::new(fixed_size_binary::iter_to_array(
iter, size, metadata,
iter, data_type, metadata,
)?)),

List(ref inner) => match inner.data_type() {
Expand Down Expand Up @@ -247,7 +247,7 @@ pub fn page_iter_to_array<
binary::iter_to_array_nested::<i64, _, _>(iter, metadata, data_type)
}
other => Err(ArrowError::NotYetImplemented(format!(
"The conversion of {:?} to arrow still not implemented",
"Reading {:?} from parquet still not implemented",
other
))),
},
Expand All @@ -265,7 +265,7 @@ pub fn page_iter_to_array<
},

other => Err(ArrowError::NotYetImplemented(format!(
"The conversion of {:?} to arrow still not implemented",
"Reading {:?} from parquet still not implemented",
other
))),
}
Expand All @@ -278,7 +278,7 @@ pub async fn page_stream_to_array<I: Stream<Item = std::result::Result<DataPage,
data_type: DataType,
) -> Result<Box<dyn Array>> {
use DataType::*;
match data_type {
match data_type.to_logical_type() {
// INT32
UInt8 => primitive::stream_to_array(pages, metadata, data_type, |x: i32| x as u8).await,
UInt16 => primitive::stream_to_array(pages, metadata, data_type, |x: i32| x as u16).await,
Expand Down Expand Up @@ -321,7 +321,7 @@ pub async fn page_stream_to_array<I: Stream<Item = std::result::Result<DataPage,
binary::stream_to_array::<i64, _, _>(pages, metadata, &data_type).await
}
FixedSizeBinary(size) => Ok(Box::new(
fixed_size_binary::stream_to_array(pages, size, metadata).await?,
fixed_size_binary::stream_to_array(pages, data_type, metadata).await?,
)),
other => Err(ArrowError::NotYetImplemented(format!(
"Async conversion of {:?}",
Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/write/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ where
match encoding {
Encoding::PlainDictionary | Encoding::RleDictionary => {
// write DictPage
let dict_page = match array.values().data_type() {
let dict_page = match array.values().data_type().to_logical_type() {
DataType::Int8 => dyn_prim!(i8, i32, array),
DataType::Int16 => dyn_prim!(i16, i32, array),
DataType::Int32 | DataType::Date32 | DataType::Time32(_) => {
Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ pub fn array_to_page(
)));
}

match data_type {
match data_type.to_logical_type() {
DataType::Boolean => {
boolean::array_to_page(array.as_any().downcast_ref().unwrap(), options, descriptor)
}
Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/write/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub fn to_parquet_type(field: &Field) -> Result<ParquetType> {
Repetition::Required
};
// create type from field
match field.data_type() {
match field.data_type().to_logical_type() {
DataType::Null => Ok(ParquetType::try_from_primitive(
name,
PhysicalType::Int32,
Expand Down
6 changes: 6 additions & 0 deletions tests/it/io/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,12 @@ fn roundtrip_100_dict() -> Result<()> {
test_file("1.0.0-bigendian", "generated_dictionary")
}

#[test]
fn roundtrip_100_extension() -> Result<()> {
test_file("1.0.0-littleendian", "generated_extension")?;
test_file("1.0.0-bigendian", "generated_extension")
}

/// Tests that when arrow-specific types (Duration and LargeUtf8) are written to parquet, we can rountrip its
/// logical types.
#[test]
Expand Down

0 comments on commit 357ad2b

Please sign in to comment.