Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
allow to read other logical types from parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
sundy-li committed Jul 19, 2022
1 parent 7a5a3f9 commit 4cdf6ff
Showing 1 changed file with 76 additions and 69 deletions.
145 changes: 76 additions & 69 deletions src/io/parquet/read/deserialize/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ fn columns_to_iter_recursive<'a, I: 'a>(
where
I: DataPages,
{
use DataType::*;
use crate::datatypes::PhysicalType::*;
use crate::datatypes::PrimitiveType::*;

if init.is_empty() && is_primitive(&field.data_type) {
return Ok(Box::new(
page_iter_to_arrays(
Expand All @@ -114,13 +116,13 @@ where
));
}

Ok(match field.data_type().to_logical_type() {
Ok(match field.data_type().to_physical_type() {
Boolean => {
init.push(InitNested::Primitive(field.is_nullable));
types.pop();
boolean::iter_to_arrays_nested(columns.pop().unwrap(), init, chunk_size)
}
Int8 => {
Primitive(Int8) => {
init.push(InitNested::Primitive(field.is_nullable));
types.pop();
primitive::iter_to_arrays_nested(
Expand All @@ -131,7 +133,7 @@ where
|x: i32| x as i8,
)
}
Int16 => {
Primitive(Int16) => {
init.push(InitNested::Primitive(field.is_nullable));
types.pop();
primitive::iter_to_arrays_nested(
Expand All @@ -142,7 +144,7 @@ where
|x: i32| x as i16,
)
}
Int32 => {
Primitive(Int32) => {
init.push(InitNested::Primitive(field.is_nullable));
types.pop();
primitive::iter_to_arrays_nested(
Expand All @@ -153,7 +155,7 @@ where
|x: i32| x,
)
}
Int64 => {
Primitive(Int64) => {
init.push(InitNested::Primitive(field.is_nullable));
types.pop();
primitive::iter_to_arrays_nested(
Expand All @@ -164,7 +166,7 @@ where
|x: i64| x,
)
}
UInt8 => {
Primitive(UInt8) => {
init.push(InitNested::Primitive(field.is_nullable));
types.pop();
primitive::iter_to_arrays_nested(
Expand All @@ -175,7 +177,7 @@ where
|x: i32| x as u8,
)
}
UInt16 => {
Primitive(UInt16) => {
init.push(InitNested::Primitive(field.is_nullable));
types.pop();
primitive::iter_to_arrays_nested(
Expand All @@ -186,7 +188,7 @@ where
|x: i32| x as u16,
)
}
UInt32 => {
Primitive(UInt32) => {
init.push(InitNested::Primitive(field.is_nullable));
let type_ = types.pop().unwrap();
match type_.physical_type {
Expand All @@ -212,7 +214,7 @@ where
}
}
}
UInt64 => {
Primitive(UInt64) => {
init.push(InitNested::Primitive(field.is_nullable));
types.pop();
primitive::iter_to_arrays_nested(
Expand All @@ -223,7 +225,7 @@ where
|x: i64| x as u64,
)
}
Float32 => {
Primitive(Float32) => {
init.push(InitNested::Primitive(field.is_nullable));
types.pop();
primitive::iter_to_arrays_nested(
Expand All @@ -234,7 +236,7 @@ where
|x: f32| x,
)
}
Float64 => {
Primitive(Float64) => {
init.push(InitNested::Primitive(field.is_nullable));
types.pop();
primitive::iter_to_arrays_nested(
Expand Down Expand Up @@ -285,63 +287,68 @@ where
chunk_size,
)
}
List(inner) | LargeList(inner) | FixedSizeList(inner, _) => {
init.push(InitNested::List(field.is_nullable));
let iter = columns_to_iter_recursive(
columns,
types,
inner.as_ref().clone(),
init,
chunk_size,
)?;
let iter = iter.map(move |x| {
let (mut nested, array) = x?;
let array = create_list(field.data_type().clone(), &mut nested, array);
Ok((nested, array))
});
Box::new(iter) as _
}
Struct(fields) => {
let columns = fields
.iter()
.rev()
.map(|f| {
let mut init = init.clone();
init.push(InitNested::Struct(field.is_nullable));
let n = n_columns(&f.data_type);
let columns = columns.drain(columns.len() - n..).collect();
let types = types.drain(types.len() - n..).collect();
columns_to_iter_recursive(columns, types, f.clone(), init, chunk_size)
})
.collect::<Result<Vec<_>>>()?;
let columns = columns.into_iter().rev().collect();
Box::new(struct_::StructIterator::new(columns, fields.clone()))
}
Map(inner, _) => {
init.push(InitNested::List(field.is_nullable));
let iter = columns_to_iter_recursive(
columns,
types,
inner.as_ref().clone(),
init,
chunk_size,
)?;
Box::new(iter.map(move |x| {
let (nested, inner) = x?;
let array = MapArray::new(
field.data_type().clone(),
vec![0, inner.len() as i32].into(),
inner,
None,
);
Ok((nested, array.boxed()))
}))
}
other => {
return Err(Error::nyi(format!(
"Deserializing type {other:?} from parquet"
)))
}

_ => match field.data_type().to_logical_type() {
DataType::List(inner)
| DataType::LargeList(inner)
| DataType::FixedSizeList(inner, _) => {
init.push(InitNested::List(field.is_nullable));
let iter = columns_to_iter_recursive(
columns,
types,
inner.as_ref().clone(),
init,
chunk_size,
)?;
let iter = iter.map(move |x| {
let (mut nested, array) = x?;
let array = create_list(field.data_type().clone(), &mut nested, array);
Ok((nested, array))
});
Box::new(iter) as _
}
DataType::Struct(fields) => {
let columns = fields
.iter()
.rev()
.map(|f| {
let mut init = init.clone();
init.push(InitNested::Struct(field.is_nullable));
let n = n_columns(&f.data_type);
let columns = columns.drain(columns.len() - n..).collect();
let types = types.drain(types.len() - n..).collect();
columns_to_iter_recursive(columns, types, f.clone(), init, chunk_size)
})
.collect::<Result<Vec<_>>>()?;
let columns = columns.into_iter().rev().collect();
Box::new(struct_::StructIterator::new(columns, fields.clone()))
}
DataType::Map(inner, _) => {
init.push(InitNested::List(field.is_nullable));
let iter = columns_to_iter_recursive(
columns,
types,
inner.as_ref().clone(),
init,
chunk_size,
)?;
Box::new(iter.map(move |x| {
let (nested, inner) = x?;
let array = MapArray::new(
field.data_type().clone(),
vec![0, inner.len() as i32].into(),
inner,
None,
);
Ok((nested, array.boxed()))
}))
}
other => {
return Err(Error::nyi(format!(
"Deserializing type {other:?} from parquet"
)))
}
},
})
}

Expand Down

0 comments on commit 4cdf6ff

Please sign in to comment.