From e623200ac73088647f28b994d0571dad61eb9e92 Mon Sep 17 00:00:00 2001 From: Frank Murphy Date: Thu, 4 Aug 2022 17:15:59 -0400 Subject: [PATCH 01/21] Support for (pandas) record-oriented serialization --- src/io/json/write/mod.rs | 78 ++++++++++++++++++++++++++++++++++++++- tests/it/io/json/mod.rs | 10 +++++ tests/it/io/json/write.rs | 45 +++++++++++++++++++++- 3 files changed, 131 insertions(+), 2 deletions(-) diff --git a/src/io/json/write/mod.rs b/src/io/json/write/mod.rs index 2278a636f2c..53cc3f20e68 100644 --- a/src/io/json/write/mod.rs +++ b/src/io/json/write/mod.rs @@ -5,8 +5,11 @@ mod utf8; pub use fallible_streaming_iterator::*; pub(crate) use serialize::new_serializer; use serialize::serialize; +use std::io::Write; -use crate::{array::Array, error::Error}; +use crate::{ + array::Array, chunk::Chunk, datatypes::Schema, error::Error, io::iterator::StreamingIterator, +}; /// [`FallibleStreamingIterator`] that serializes an [`Array`] to bytes of valid JSON /// # Implementation @@ -59,6 +62,79 @@ where } } +/// [`FallibleStreamingIterator`] that serializes a [`Chunk`] into bytes of JSON +/// in a (pandas-compatible) record-oriented format. +/// +/// # Implementation +/// Advancing this iterator is CPU-bounded. +pub struct RecordSerializer<'a> { + schema: Schema, + index: usize, + end: usize, + iterators: Vec + Send + Sync + 'a>>, + buffer: Vec, +} + +impl<'a> RecordSerializer<'a> { + /// Creates a new [`RecordSerializer`]. + pub fn new(schema: Schema, chunk: &'a Chunk, buffer: Vec) -> Self + where + A: AsRef, + { + let end = chunk.len(); + let iterators = chunk + .arrays() + .iter() + .map(|arr| new_serializer(arr.as_ref())) + .collect(); + + Self { + schema, + index: 0, + end, + iterators, + buffer, + } + } +} + +impl<'a> FallibleStreamingIterator for RecordSerializer<'a> { + type Item = [u8]; + + type Error = Error; + + fn advance(&mut self) -> Result<(), Error> { + self.buffer.clear(); + if self.index == self.end { + return Ok(()); + } + + let mut is_first_row = true; + write!(&mut self.buffer, "{{")?; + for (f, ref mut it) in self.schema.fields.iter().zip(self.iterators.iter_mut()) { + if !is_first_row { + write!(&mut self.buffer, ",")?; + } + write!(&mut self.buffer, "\"{}\":", f.name)?; + + self.buffer.extend_from_slice(it.next().unwrap()); + is_first_row = false; + } + write!(&mut self.buffer, "}}")?; + + self.index += 1; + Ok(()) + } + + fn get(&self) -> Option<&Self::Item> { + if !self.buffer.is_empty() { + Some(&self.buffer) + } else { + None + } + } +} + /// Writes valid JSON from an iterator of (assumed JSON-encoded) bytes to `writer` pub fn write(writer: &mut W, mut blocks: I) -> Result<(), Error> where diff --git a/tests/it/io/json/mod.rs b/tests/it/io/json/mod.rs index e84971431ea..59a68f8dd86 100644 --- a/tests/it/io/json/mod.rs +++ b/tests/it/io/json/mod.rs @@ -2,6 +2,8 @@ mod read; mod write; use arrow2::array::*; +use arrow2::chunk::Chunk; +use arrow2::datatypes::Schema; use arrow2::error::Result; use arrow2::io::json::write as json_write; @@ -12,3 +14,11 @@ fn write_batch(array: Box) -> Result> { json_write::write(&mut buf, &mut serializer)?; Ok(buf) } + +fn write_record_batch>(schema: Schema, chunk: Chunk) -> Result> { + let mut serializer = json_write::RecordSerializer::new(schema, &chunk, vec![]); + + let mut buf = vec![]; + json_write::write(&mut buf, &mut serializer)?; + Ok(buf) +} diff --git a/tests/it/io/json/write.rs b/tests/it/io/json/write.rs index f98ef1fc71f..f224a612173 100644 --- a/tests/it/io/json/write.rs +++ b/tests/it/io/json/write.rs @@ -2,7 +2,7 @@ use arrow2::{ array::*, bitmap::Bitmap, buffer::Buffer, - datatypes::{DataType, Field, TimeUnit}, + datatypes::{DataType, Field, Metadata, Schema, TimeUnit}, error::Result, }; @@ -211,6 +211,49 @@ fn nested_list() -> Result<()> { test!(array, expected) } +#[test] +fn nested_list_records() -> Result<()> { + let iter = vec![ + vec![Some(vec![Some(1), Some(2)]), Some(vec![Some(3)])], + vec![], + vec![Some(vec![Some(4), Some(5), Some(6)])], + ]; + + let iter = iter.into_iter().map(Some); + + let inner = MutableListArray::>::new_with_field( + MutablePrimitiveArray::::new(), + "b", + false, + ); + let mut c1 = + MutableListArray::>>::new_with_field( + inner, "c1", false, + ); + c1.try_extend(iter).unwrap(); + let c1: ListArray = c1.into(); + + let c2 = Utf8Array::::from(&vec![Some("foo"), Some("bar"), None]); + + let schema = Schema { + fields: vec![ + Field::new("c1", c1.data_type().clone(), true), + Field::new("c2", c2.data_type().clone(), true), + ], + metadata: Metadata::default(), + }; + + let arrays: Vec> = vec![Box::new(c1), Box::new(c2)]; + let chunk = Chunk::new(arrays); + + let expected = + r#"[{"c1":[[1,2],[3]],"c2":"foo"},{"c1":[],"c2":"bar"},{"c1":[[4,5,6]],"c2":null}]"#; + + let buf = write_record_batch(schema, chunk)?; + assert_eq!(String::from_utf8(buf).unwrap(), expected); + Ok(()) +} + #[test] fn list_of_struct() -> Result<()> { let inner = vec![Field::new("c121", DataType::Utf8, false)]; From 1efef6cfdde474e53551d186841cf8c88a61dd3b Mon Sep 17 00:00:00 2001 From: Frank Murphy Date: Thu, 4 Aug 2022 17:26:36 -0400 Subject: [PATCH 02/21] Define Preallocate for mutable arrays --- src/array/binary/mutable.rs | 10 +++++++++- src/array/boolean/mutable.rs | 8 +++++++- src/array/fixed_size_binary/mutable.rs | 8 +++++++- src/array/list/mutable.rs | 8 +++++++- src/array/mod.rs | 9 +++++++++ src/array/primitive/mutable.rs | 8 +++++++- src/array/utf8/mutable.rs | 8 +++++++- 7 files changed, 53 insertions(+), 6 deletions(-) diff --git a/src/array/binary/mutable.rs b/src/array/binary/mutable.rs index c1ee4ba8a6e..936f9d074bf 100644 --- a/src/array/binary/mutable.rs +++ b/src/array/binary/mutable.rs @@ -1,7 +1,9 @@ use std::{iter::FromIterator, sync::Arc}; use crate::{ - array::{specification::check_offsets, Array, MutableArray, Offset, TryExtend, TryPush}, + array::{ + specification::check_offsets, Array, MutableArray, Offset, Preallocate, TryExtend, TryPush, + }, bitmap::MutableBitmap, datatypes::DataType, error::{Error, Result}, @@ -186,6 +188,12 @@ impl MutableBinaryArray { } } +impl Preallocate for MutableBinaryArray { + fn with_capacity(capacity: usize) -> Self { + MutableBinaryArray::with_capacity(capacity) + } +} + impl MutableArray for MutableBinaryArray { fn len(&self) -> usize { self.offsets.len() - 1 diff --git a/src/array/boolean/mutable.rs b/src/array/boolean/mutable.rs index 83fac4b7ef5..d40522a961e 100644 --- a/src/array/boolean/mutable.rs +++ b/src/array/boolean/mutable.rs @@ -2,7 +2,7 @@ use std::iter::FromIterator; use std::sync::Arc; use crate::{ - array::{Array, MutableArray, TryExtend, TryPush}, + array::{Array, MutableArray, Preallocate, TryExtend, TryPush}, bitmap::MutableBitmap, datatypes::{DataType, PhysicalType}, error::Result, @@ -453,6 +453,12 @@ impl>> FromIterator for MutableBoolea } } +impl Preallocate for MutableBooleanArray { + fn with_capacity(capacity: usize) -> Self { + MutableBooleanArray::with_capacity(capacity) + } +} + impl MutableArray for MutableBooleanArray { fn len(&self) -> usize { self.values.len() diff --git a/src/array/fixed_size_binary/mutable.rs b/src/array/fixed_size_binary/mutable.rs index 3a02158cc88..c5ef7176b79 100644 --- a/src/array/fixed_size_binary/mutable.rs +++ b/src/array/fixed_size_binary/mutable.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use crate::{ - array::{Array, MutableArray}, + array::{Array, MutableArray, Preallocate}, bitmap::MutableBitmap, datatypes::DataType, error::{Error, Result}, @@ -211,6 +211,12 @@ impl MutableFixedSizeBinaryArray { } } +impl Preallocate for MutableFixedSizeBinaryArray { + fn with_capacity(capacity: usize) -> Self { + MutableFixedSizeBinaryArray::with_capacity(capacity, 0) + } +} + impl MutableArray for MutableFixedSizeBinaryArray { fn len(&self) -> usize { self.values.len() / self.size diff --git a/src/array/list/mutable.rs b/src/array/list/mutable.rs index 71283104fbc..373f676153b 100644 --- a/src/array/list/mutable.rs +++ b/src/array/list/mutable.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use crate::{ - array::{Array, MutableArray, Offset, TryExtend, TryPush}, + array::{Array, MutableArray, Offset, Preallocate, TryExtend, TryPush}, bitmap::MutableBitmap, datatypes::{DataType, Field}, error::{Error, Result}, @@ -211,6 +211,12 @@ impl MutableListArray { } } +impl Preallocate for MutableListArray { + fn with_capacity(capacity: usize) -> Self { + MutableListArray::with_capacity(capacity) + } +} + impl MutableArray for MutableListArray { fn len(&self) -> usize { self.offsets.len() - 1 diff --git a/src/array/mod.rs b/src/array/mod.rs index 6a053932589..2de03c02359 100644 --- a/src/array/mod.rs +++ b/src/array/mod.rs @@ -113,6 +113,15 @@ pub trait Array: Send + Sync + dyn_clone::DynClone + 'static { dyn_clone::clone_trait_object!(Array); +/// A trait describing an array with a backing store that can be preallocated to +/// a given size. +pub trait Preallocate { + /// Create this array with a given capacity. + fn with_capacity(capacity: usize) -> Self + where + Self: Sized; +} + /// A trait describing a mutable array; i.e. an array whose values can be changed. /// Mutable arrays cannot be cloned but can be mutated in place, /// thereby making them useful to perform numeric operations without allocations. diff --git a/src/array/primitive/mutable.rs b/src/array/primitive/mutable.rs index 5f2014db319..3b238b857a1 100644 --- a/src/array/primitive/mutable.rs +++ b/src/array/primitive/mutable.rs @@ -2,7 +2,7 @@ use std::{iter::FromIterator, sync::Arc}; use crate::bitmap::Bitmap; use crate::{ - array::{Array, MutableArray, TryExtend, TryPush}, + array::{Array, MutableArray, Preallocate, TryExtend, TryPush}, bitmap::MutableBitmap, datatypes::DataType, error::{Error, Result}, @@ -378,6 +378,12 @@ impl TryPush> for MutablePrimitiveArray { } } +impl Preallocate for MutablePrimitiveArray { + fn with_capacity(capacity: usize) -> Self { + MutablePrimitiveArray::with_capacity(capacity) + } +} + impl MutableArray for MutablePrimitiveArray { fn len(&self) -> usize { self.values.len() diff --git a/src/array/utf8/mutable.rs b/src/array/utf8/mutable.rs index 4cc83da9011..fcff8204685 100644 --- a/src/array/utf8/mutable.rs +++ b/src/array/utf8/mutable.rs @@ -2,7 +2,7 @@ use std::{iter::FromIterator, sync::Arc}; use crate::array::physical_binary::*; use crate::{ - array::{Array, MutableArray, Offset, TryExtend, TryPush}, + array::{Array, MutableArray, Offset, Preallocate, TryExtend, TryPush}, bitmap::{ utils::{BitmapIter, ZipValidity}, Bitmap, MutableBitmap, @@ -247,6 +247,12 @@ impl MutableUtf8Array { } } +impl Preallocate for MutableUtf8Array { + fn with_capacity(capacity: usize) -> Self { + MutableUtf8Array::with_capacity(capacity) + } +} + impl MutableArray for MutableUtf8Array { fn len(&self) -> usize { self.len() From fa2b740602b91821680c4345197cccf2d09db07e Mon Sep 17 00:00:00 2001 From: Frank Murphy Date: Thu, 4 Aug 2022 17:30:35 -0400 Subject: [PATCH 03/21] Add expand and unsafe_expand to MutableListArray --- src/array/list/mutable.rs | 75 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 73 insertions(+), 2 deletions(-) diff --git a/src/array/list/mutable.rs b/src/array/list/mutable.rs index 373f676153b..6c9a2fc894f 100644 --- a/src/array/list/mutable.rs +++ b/src/array/list/mutable.rs @@ -1,10 +1,14 @@ use std::sync::Arc; use crate::{ - array::{Array, MutableArray, Offset, Preallocate, TryExtend, TryPush}, + array::{ + specification::try_check_offsets, Array, MutableArray, Offset, Preallocate, TryExtend, + TryPush, + }, bitmap::MutableBitmap, datatypes::{DataType, Field}, error::{Error, Result}, + trusted_len::TrustedLen, }; use super::ListArray; @@ -152,6 +156,69 @@ impl MutableListArray { } } + /// Expand this array, using elements from the underlying backing array. + /// Assumes the expansion begins at the highest previous offset, or zero if + /// this [MutableListArray] is currently empty. + /// + /// Panics if: + /// - the new offsets are not in monotonic increasing order. + /// - any new offset is not in bounds of the backing array. + /// - the passed iterator has no upper bound. + pub fn expand(&mut self, expansion: II) + where + II: IntoIterator> + TrustedLen, + { + let current_len = self.offsets.len(); + let (_, upper) = expansion.size_hint(); + let upper = upper.expect("iterator must have upper bound"); + if current_len == 0 && upper > 0 { + self.offsets.push(O::zero()); + } + // safety: checked below + unsafe { self.unsafe_expand(expansion) }; + if self.offsets.len() > current_len { + // check all inserted offsets + try_check_offsets(&self.offsets[current_len..], self.values.len()) + .expect("invalid offsets"); + } + // else expansion is empty, and this is trivially safe. + } + + /// Expand this array. Assumes that `offsets` are in order, and do not + /// overrun the underlying `values` backing array. + /// + /// Also assumes the expansion begins at the highest previous offset, or + /// zero if the array is currently empty. + /// + /// Panics if the passed iterator has no upper bound. + pub unsafe fn unsafe_expand(&mut self, expansion: II) + where + II: IntoIterator> + TrustedLen, + { + let (_, upper) = expansion.size_hint(); + let upper = upper.expect("iterator must have upper bound"); + let final_size = self.len() + upper; + self.offsets.reserve(upper); + + for item in expansion { + match item { + Some(offset) => { + self.offsets.push(offset); + if let Some(validity) = &mut self.validity { + validity.push(true); + } + } + None => self.push_null(), + } + + if let Some(validity) = &mut self.validity { + if validity.capacity() < final_size { + validity.reserve(final_size - validity.capacity()); + } + } + } + } + /// The values pub fn mut_values(&mut self) -> &mut M { &mut self.values @@ -209,6 +276,10 @@ impl MutableListArray { validity.shrink_to_fit() } } + + fn len(&self) -> usize { + self.offsets.len() - 1 + } } impl Preallocate for MutableListArray { @@ -219,7 +290,7 @@ impl Preallocate for MutableList impl MutableArray for MutableListArray { fn len(&self) -> usize { - self.offsets.len() - 1 + MutableListArray::len(self) } fn validity(&self) -> Option<&MutableBitmap> { From 06f3511c73e6e39805a541483e04e02bc0bd357c Mon Sep 17 00:00:00 2001 From: Frank Murphy Date: Thu, 4 Aug 2022 17:31:37 -0400 Subject: [PATCH 04/21] Trivial but useful impl of MutableArray for Box --- src/array/mod.rs | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/src/array/mod.rs b/src/array/mod.rs index 2de03c02359..c0e9f141e45 100644 --- a/src/array/mod.rs +++ b/src/array/mod.rs @@ -17,6 +17,7 @@ //! Most arrays contain a [`MutableArray`] counterpart that is neither clonable nor slicable, but //! can be operated in-place. use std::any::Any; +use std::sync::Arc; use crate::error::Result; use crate::{ @@ -179,6 +180,45 @@ pub trait MutableArray: std::fmt::Debug + Send + Sync { fn shrink_to_fit(&mut self); } +impl MutableArray for Box { + fn len(&self) -> usize { + self.as_ref().len() + } + + fn validity(&self) -> Option<&MutableBitmap> { + self.as_ref().validity() + } + + fn as_box(&mut self) -> Box { + self.as_mut().as_box() + } + + fn as_arc(&mut self) -> Arc { + self.as_mut().as_arc() + } + + fn data_type(&self) -> &DataType { + self.as_ref().data_type() + } + + fn as_any(&self) -> &dyn std::any::Any { + self.as_ref().as_any() + } + + fn as_mut_any(&mut self) -> &mut dyn std::any::Any { + self.as_mut().as_mut_any() + } + + #[inline] + fn push_null(&mut self) { + self.as_mut().push_null() + } + + fn shrink_to_fit(&mut self) { + self.as_mut().shrink_to_fit(); + } +} + macro_rules! general_dyn { ($array:expr, $ty:ty, $f:expr) => {{ let array = $array.as_any().downcast_ref::<$ty>().unwrap(); From 5ba0ddb966ed3bf028e8dbe66d9388a436e1f35c Mon Sep 17 00:00:00 2001 From: Frank Murphy Date: Thu, 4 Aug 2022 17:34:23 -0400 Subject: [PATCH 05/21] JSON (pandas) record-oriented deserializer --- src/io/json/read/deserialize.rs | 303 +++++++++++++++++++++++++++---- src/io/json/read/infer_schema.rs | 42 ++++- src/io/json/read/mod.rs | 4 +- tests/it/io/json/read.rs | 83 +++++++++ 4 files changed, 398 insertions(+), 34 deletions(-) diff --git a/src/io/json/read/deserialize.rs b/src/io/json/read/deserialize.rs index b7add12bc97..ddc07e6c70c 100644 --- a/src/io/json/read/deserialize.rs +++ b/src/io/json/read/deserialize.rs @@ -9,9 +9,10 @@ use json_deserializer::{Number, Value}; use crate::{ array::*, bitmap::MutableBitmap, - datatypes::{DataType, IntervalUnit}, + chunk::Chunk, + datatypes::{DataType, Field, IntervalUnit, PhysicalType, Schema}, error::Error, - types::NativeType, + types::{f16, NativeType, PrimitiveType}, }; /// A function that converts a &Value into an optional tuple of a byte slice and a Value. @@ -55,12 +56,15 @@ fn build_extract(data_type: &DataType) -> Extract { } } -fn deserialize_boolean<'a, A: Borrow>>(rows: &[A]) -> BooleanArray { +fn deserialize_boolean_into<'a, A: Borrow>>( + target: &mut MutableBooleanArray, + rows: &[A], +) { let iter = rows.iter().map(|row| match row.borrow() { Value::Bool(v) => Some(v), _ => None, }); - BooleanArray::from_trusted_len_iter(iter) + target.extend_trusted_len(iter); } fn deserialize_int_single(number: Number) -> T @@ -153,32 +157,36 @@ where } } -fn deserialize_int<'a, T: NativeType + lexical_core::FromLexical + Pow10, A: Borrow>>( +fn deserialize_int_into< + 'a, + T: NativeType + lexical_core::FromLexical + Pow10, + A: Borrow>, +>( + target: &mut MutablePrimitiveArray, rows: &[A], - data_type: DataType, -) -> PrimitiveArray { +) { let iter = rows.iter().map(|row| match row.borrow() { Value::Number(number) => Some(deserialize_int_single(*number)), Value::Bool(number) => Some(if *number { T::one() } else { T::default() }), _ => None, }); - PrimitiveArray::from_trusted_len_iter(iter).to(data_type) + target.extend_trusted_len(iter); } -fn deserialize_float< +fn deserialize_float_into< 'a, T: NativeType + lexical_core::FromLexical + Powi10, A: Borrow>, >( + target: &mut MutablePrimitiveArray, rows: &[A], - data_type: DataType, -) -> PrimitiveArray { +) { let iter = rows.iter().map(|row| match row.borrow() { Value::Number(number) => Some(deserialize_float_single(number)), Value::Bool(number) => Some(if *number { T::one() } else { T::default() }), _ => None, }); - PrimitiveArray::from_trusted_len_iter(iter).to(data_type) + target.extend_trusted_len(iter); } fn deserialize_binary<'a, O: Offset, A: Borrow>>(rows: &[A]) -> BinaryArray { @@ -189,12 +197,14 @@ fn deserialize_binary<'a, O: Offset, A: Borrow>>(rows: &[A]) -> Binary BinaryArray::from_trusted_len_iter(iter) } -fn deserialize_utf8<'a, O: Offset, A: Borrow>>(rows: &[A]) -> Utf8Array { - let mut array = MutableUtf8Array::::with_capacity(rows.len()); +fn deserialize_utf8_into<'a, O: Offset, A: Borrow>>( + target: &mut MutableUtf8Array, + rows: &[A], +) { let mut scratch = vec![]; for row in rows { match row.borrow() { - Value::String(v) => array.push(Some(v.as_ref())), + Value::String(v) => target.push(Some(v.as_ref())), Value::Number(number) => match number { Number::Integer(number, exponent) | Number::Float(number, exponent) => { scratch.clear(); @@ -203,11 +213,10 @@ fn deserialize_utf8<'a, O: Offset, A: Borrow>>(rows: &[A]) -> Utf8Arra scratch.extend_from_slice(*exponent); } }, - Value::Bool(v) => array.push(Some(if *v { "true" } else { "false" })), - _ => array.push_null(), + Value::Bool(v) => target.push(Some(if *v { "true" } else { "false" })), + _ => target.push_null(), } } - array.into() } fn deserialize_list<'a, O: Offset, A: Borrow>>( @@ -243,6 +252,107 @@ fn deserialize_list<'a, O: Offset, A: Borrow>>( ListArray::::new(data_type, offsets.into(), values, validity.into()) } +// TODO: due to nesting, deduplicating this from the above is trickier than +// other `deserialize_xxx_into` functions. Punting on that for now. +fn deserialize_list_into<'a, O: Offset, A: Borrow>>( + target: &mut MutableListArray>, + rows: &[A], +) { + let start = { + let empty = vec![]; + let inner: Vec<_> = rows + .iter() + .flat_map(|row| match row.borrow() { + Value::Array(value) => value.iter(), + _ => empty.iter(), + }) + .collect(); + + let child = target.mut_values(); + let start_len = child.len(); + deserialize_into(child, &inner); + + // todo make this an Err + O::from_usize(start_len).expect("Child list size too large") + }; + + let mut position = start; + let arrays = rows.iter().map(|row| { + match row.borrow() { + Value::Array(value) => { + // todo make this an Err + position += O::from_usize(value.len()).expect("List offset is too large :/"); + Some(position.clone()) + } + _ => None, + } + }); + + // though this will always be safe, we cannot use unsafe_expand here due to + // `#![forbid(unsafe_code)]` on the io module + target.expand(arrays); +} + +fn try_deserialize_into<'a, A: Borrow>, T: NativeType>( + target: &mut Box, + rows: &[A], + deserialize_into: fn(&mut MutablePrimitiveArray, &[A]) -> (), +) -> bool { + try_generic_deserialize_into(target, rows, deserialize_into) +} + +fn try_generic_deserialize_into<'a, A: Borrow>, M: 'static>( + target: &mut Box, + rows: &[A], + deserialize_into: fn(&mut M, &[A]) -> (), +) -> bool { + if let Some(array) = target.as_mut_any().downcast_mut::() { + deserialize_into(array, rows); + true + } else { + false + } +} + +/// Deserialize `rows` by extending them into the given `target` +fn deserialize_into<'a, A: Borrow>>(target: &mut Box, rows: &[A]) { + // It'd be nice to have something like pattern matching for downcasting from Any + // I'm not aware of anything like that, which leads to this ... ugliness + if let Some(list_array) = target + .as_mut_any() + .downcast_mut::>>() + { + deserialize_list_into(list_array, rows); + } else if try_generic_deserialize_into::<_, MutableBooleanArray>( + target, + rows, + deserialize_boolean_into, + ) { + } else if try_deserialize_into::<_, f32>(target, rows, deserialize_float_into) { + } else if try_deserialize_into::<_, f64>(target, rows, deserialize_float_into) { + } else if try_deserialize_into::<_, i8>(target, rows, deserialize_int_into) { + } else if try_deserialize_into::<_, i16>(target, rows, deserialize_int_into) { + } else if try_deserialize_into::<_, i32>(target, rows, deserialize_int_into) { + } else if try_deserialize_into::<_, i64>(target, rows, deserialize_int_into) { + } else if try_deserialize_into::<_, u8>(target, rows, deserialize_int_into) { + } else if try_deserialize_into::<_, u16>(target, rows, deserialize_int_into) { + } else if try_deserialize_into::<_, u32>(target, rows, deserialize_int_into) { + } else if try_deserialize_into::<_, u64>(target, rows, deserialize_int_into) { + } else if try_generic_deserialize_into::<_, MutableUtf8Array>( + target, + rows, + deserialize_utf8_into, + ) { + } else if try_generic_deserialize_into::<_, MutableUtf8Array>( + target, + rows, + deserialize_utf8_into, + ) { + } else { + todo!(); + } +} + fn deserialize_struct<'a, A: Borrow>>(rows: &[A], data_type: DataType) -> StructArray { let fields = StructArray::get_fields(&data_type); @@ -315,20 +425,50 @@ fn deserialize_dictionary<'a, K: DictionaryKey, A: Borrow>>( DictionaryArray::::try_new(data_type, keys, values).unwrap() } +fn fill_array_from( + f: fn(&mut MutablePrimitiveArray, &[B]), + data_type: DataType, + rows: &[B], +) -> Box +where + T: NativeType, + A: From> + Array, +{ + let mut array = MutablePrimitiveArray::::with_capacity(rows.len()).to(data_type); + f(&mut array, rows); + Box::new(A::from(array)) +} + +fn fill_generic_array_from(f: fn(&mut M, &[B]), rows: &[B]) -> Box +where + M: Preallocate, + A: From + Array, +{ + let mut array = M::with_capacity(rows.len()); + f(&mut array, rows); + Box::new(A::from(array)) +} + pub(crate) fn _deserialize<'a, A: Borrow>>( rows: &[A], data_type: DataType, ) -> Box { match &data_type { DataType::Null => Box::new(NullArray::new(data_type, rows.len())), - DataType::Boolean => Box::new(deserialize_boolean(rows)), - DataType::Int8 => Box::new(deserialize_int::(rows, data_type)), - DataType::Int16 => Box::new(deserialize_int::(rows, data_type)), + DataType::Boolean => { + fill_generic_array_from::<_, _, BooleanArray>(deserialize_boolean_into, rows) + } + DataType::Int8 => { + fill_array_from::<_, _, PrimitiveArray>(deserialize_int_into, data_type, rows) + } + DataType::Int16 => { + fill_array_from::<_, _, PrimitiveArray>(deserialize_int_into, data_type, rows) + } DataType::Int32 | DataType::Date32 | DataType::Time32(_) | DataType::Interval(IntervalUnit::YearMonth) => { - Box::new(deserialize_int::(rows, data_type)) + fill_array_from::<_, _, PrimitiveArray>(deserialize_int_into, data_type, rows) } DataType::Interval(IntervalUnit::DayTime) => { unimplemented!("There is no natural representation of DayTime in JSON.") @@ -337,16 +477,34 @@ pub(crate) fn _deserialize<'a, A: Borrow>>( | DataType::Date64 | DataType::Time64(_) | DataType::Timestamp(_, _) - | DataType::Duration(_) => Box::new(deserialize_int::(rows, data_type)), - DataType::UInt8 => Box::new(deserialize_int::(rows, data_type)), - DataType::UInt16 => Box::new(deserialize_int::(rows, data_type)), - DataType::UInt32 => Box::new(deserialize_int::(rows, data_type)), - DataType::UInt64 => Box::new(deserialize_int::(rows, data_type)), + | DataType::Duration(_) => { + fill_array_from::<_, _, PrimitiveArray>(deserialize_int_into, data_type, rows) + } + DataType::UInt8 => { + fill_array_from::<_, _, PrimitiveArray>(deserialize_int_into, data_type, rows) + } + DataType::UInt16 => { + fill_array_from::<_, _, PrimitiveArray>(deserialize_int_into, data_type, rows) + } + DataType::UInt32 => { + fill_array_from::<_, _, PrimitiveArray>(deserialize_int_into, data_type, rows) + } + DataType::UInt64 => { + fill_array_from::<_, _, PrimitiveArray>(deserialize_int_into, data_type, rows) + } DataType::Float16 => unreachable!(), - DataType::Float32 => Box::new(deserialize_float::(rows, data_type)), - DataType::Float64 => Box::new(deserialize_float::(rows, data_type)), - DataType::Utf8 => Box::new(deserialize_utf8::(rows)), - DataType::LargeUtf8 => Box::new(deserialize_utf8::(rows)), + DataType::Float32 => { + fill_array_from::<_, _, PrimitiveArray>(deserialize_float_into, data_type, rows) + } + DataType::Float64 => { + fill_array_from::<_, _, PrimitiveArray>(deserialize_float_into, data_type, rows) + } + DataType::Utf8 => { + fill_generic_array_from::<_, _, Utf8Array>(deserialize_utf8_into, rows) + } + DataType::LargeUtf8 => { + fill_generic_array_from::<_, _, Utf8Array>(deserialize_utf8_into, rows) + } DataType::List(_) => Box::new(deserialize_list::(rows, data_type)), DataType::LargeList(_) => Box::new(deserialize_list::(rows, data_type)), DataType::Binary => Box::new(deserialize_binary::(rows)), @@ -383,3 +541,86 @@ pub fn deserialize(json: &Value, data_type: DataType) -> Result, _ => Err(Error::nyi("read an Array from a non-Array JSON")), } } + +fn allocate_array(f: &Field) -> Box { + use PrimitiveType::*; + match f.data_type() { + DataType::List(inner) => match inner.data_type().to_physical_type() { + PhysicalType::Primitive(Int8) => Box::new(MutablePrimitiveArray::::new()), + PhysicalType::Primitive(Int16) => Box::new(MutablePrimitiveArray::::new()), + PhysicalType::Primitive(Int32) => Box::new(MutablePrimitiveArray::::new()), + PhysicalType::Primitive(Int64) => Box::new(MutablePrimitiveArray::::new()), + PhysicalType::Primitive(Int128) => Box::new(MutablePrimitiveArray::::new()), + PhysicalType::Primitive(UInt8) => Box::new(MutablePrimitiveArray::::new()), + PhysicalType::Primitive(UInt16) => Box::new(MutablePrimitiveArray::::new()), + PhysicalType::Primitive(UInt32) => Box::new(MutablePrimitiveArray::::new()), + PhysicalType::Primitive(UInt64) => Box::new(MutablePrimitiveArray::::new()), + PhysicalType::Primitive(Float16) => Box::new(MutablePrimitiveArray::::new()), + PhysicalType::Primitive(Float32) => Box::new(MutablePrimitiveArray::::new()), + PhysicalType::Primitive(Float64) => Box::new(MutablePrimitiveArray::::new()), + PhysicalType::List => Box::new(MutableListArray::::new_from( + allocate_array(inner), + inner.data_type().clone(), + 0, + )), + _ => todo!(), + }, + _ => todo!(), + } +} + +/// Deserializes a `json` [`Value`] serialized in Pandas record format into +/// a [`Chunk`]. +/// +/// Uses the `Schema` provided, which can be inferred from arbitrary JSON with +/// [`infer_records_schema`]. +/// +/// This is CPU-bounded. +/// +/// # Errors +/// +/// This function errors iff either: +/// +/// * `json` is not a [`Value::Array`] +/// * `data_type` contains any incompatible types: +/// * [`DataType::Struct`] +/// * [`DataType::Dictionary`] +/// * [`DataType::LargeList`] +pub fn deserialize_records(json: &Value, schema: &Schema) -> Result>, Error> { + let mut results = schema + .fields + .iter() + .map(|f| (&f.name, allocate_array(&f))) + .collect::>(); + + match json { + Value::Array(rows) => { + for row in rows.iter() { + match row { + Value::Object(record) => { + for (key, value) in record.iter() { + let arr = results.get_mut(key).ok_or_else(|| { + Error::ExternalFormat(format!("unexpected key: '{}'", key)) + })?; + deserialize_into(arr, &[value]); + } + } + _ => { + return Err(Error::ExternalFormat( + "each row must be an Object".to_string(), + )) + } + } + } + } + _ => { + return Err(Error::ExternalFormat( + "outer type must be an Array".to_string(), + )) + } + } + + Ok(Chunk::new( + results.into_values().map(|mut ma| ma.as_box()).collect(), + )) +} diff --git a/src/io/json/read/infer_schema.rs b/src/io/json/read/infer_schema.rs index 38c147728a1..1bab0251ac9 100644 --- a/src/io/json/read/infer_schema.rs +++ b/src/io/json/read/infer_schema.rs @@ -5,7 +5,7 @@ use indexmap::set::IndexSet as HashSet; use json_deserializer::{Number, Value}; use crate::datatypes::*; -use crate::error::Result; +use crate::error::{Error, Result}; const ITEM_NAME: &str = "item"; @@ -21,6 +21,46 @@ pub fn infer(json: &Value) -> Result { }) } +/// Infers [`Schema`] from JSON [`Value`] in (pandas-compatible) records format. +pub fn infer_records_schema(json: &Value) -> Result { + let outer_array = match json { + Value::Array(array) => Ok(array), + _ => Err(Error::ExternalFormat( + "outer type is not an array".to_string(), + )), + }?; + + let fields = match outer_array.iter().next() { + Some(Value::Object(record)) => record + .iter() + .map(|(name, json)| { + let data_type = infer(json)?; + + Ok(Field { + name: name.clone(), + data_type: DataType::List(Box::new(Field { + name: format!("{}-records", name), + data_type, + is_nullable: true, + metadata: Metadata::default(), + })), + is_nullable: true, + metadata: Metadata::default(), + }) + }) + .collect::>>(), + None => Ok(vec![]), + _ => Err(Error::ExternalFormat( + "first element in array is not a record".to_string(), + )), + }?; + + Ok(Schema { + fields, + metadata: Metadata::default(), + }) +} + fn filter_map_nulls(dt: DataType) -> Option { if dt == DataType::Null { None diff --git a/src/io/json/read/mod.rs b/src/io/json/read/mod.rs index ed1ad17f103..686390df2b3 100644 --- a/src/io/json/read/mod.rs +++ b/src/io/json/read/mod.rs @@ -3,8 +3,8 @@ mod deserialize; mod infer_schema; pub(crate) use deserialize::_deserialize; -pub use deserialize::deserialize; +pub use deserialize::{deserialize, deserialize_records}; pub(crate) use infer_schema::coerce_data_type; -pub use infer_schema::infer; +pub use infer_schema::{infer, infer_records_schema}; pub use json_deserializer; diff --git a/tests/it/io/json/read.rs b/tests/it/io/json/read.rs index 11bd93cd264..25e41669cee 100644 --- a/tests/it/io/json/read.rs +++ b/tests/it/io/json/read.rs @@ -33,3 +33,86 @@ fn read_json() -> Result<()> { Ok(()) } + +#[test] +fn read_json_records() -> Result<()> { + let data = br#"[ + { + "a": [ + [1.1, 2, 3], + [2, 3], + [4, 5, 6] + ], + "b": [1, 2, 3] + }, + { + "a": [ + [3, 2, 1], + [3, 2], + [6, 5, 4] + ] + }, + { + "b": [7, 8, 9] + } + ]"#; + + let a_iter = vec![ + vec![ + Some(vec![Some(1.1), Some(2.), Some(3.)]), + Some(vec![Some(2.), Some(3.)]), + Some(vec![Some(4.), Some(5.), Some(6.)]), + ], + vec![ + Some(vec![Some(3.), Some(2.), Some(1.)]), + Some(vec![Some(3.), Some(2.)]), + Some(vec![Some(6.), Some(5.), Some(4.)]), + ], + ]; + + let a_iter = a_iter.into_iter().map(Some); + let a_inner = MutableListArray::>::new_with_field( + MutablePrimitiveArray::::new(), + "inner", + false, + ); + let mut a_outer = + MutableListArray::>>::new_with_field( + a_inner, "a", false, + ); + a_outer.try_extend(a_iter).unwrap(); + let a_expected: ListArray = a_outer.into(); + + let b_iter = vec![ + vec![Some(1), Some(2), Some(3)], + vec![Some(7), Some(8), Some(9)], + ]; + let b_iter = b_iter.into_iter().map(Some); + let mut b = MutableListArray::>::new_with_field( + MutablePrimitiveArray::::new(), + "b", + false, + ); + b.try_extend(b_iter).unwrap(); + let b_expected: ListArray = b.into(); + + let json = json_deserializer::parse(data)?; + + let schema = read::infer_records_schema(&json)?; + let actual = read::deserialize_records(&json, &schema)?; + + for (f, arr) in schema.fields.iter().zip(actual.arrays().iter()) { + let (expected, actual) = if f.name == "a" { + (&a_expected, arr.as_ref()) + } else if f.name == "b" { + (&b_expected, arr.as_ref()) + } else { + panic!("unexpected field found: {}", f.name); + }; + + // No idea why assert_eq! doesn't work here, but this does. + assert_eq!(format!("{:?}", expected), format!("{:?}", actual)); + } + + Ok(()) +} From 20d58b4cc9965cf77004a47c92125812f2807cfe Mon Sep 17 00:00:00 2001 From: Frank Murphy Date: Fri, 19 Aug 2022 15:03:39 -0400 Subject: [PATCH 06/21] Support fixed-size list ser/de for json records --- src/array/fixed_size_list/mutable.rs | 5 ++ src/io/json/read/deserialize.rs | 78 ++++++++++++++++++++-------- src/io/json/write/serialize.rs | 31 +++++++++++ tests/it/io/json/read.rs | 52 +++++++++++++++++++ tests/it/io/json/write.rs | 33 ++++++++++++ 5 files changed, 178 insertions(+), 21 deletions(-) diff --git a/src/array/fixed_size_list/mutable.rs b/src/array/fixed_size_list/mutable.rs index c638e84033b..0cba41a1e90 100644 --- a/src/array/fixed_size_list/mutable.rs +++ b/src/array/fixed_size_list/mutable.rs @@ -59,6 +59,11 @@ impl MutableFixedSizeListArray { } } + /// Returns the size (number of elements per slot) of this [`FixedSizeListArray`]. + pub const fn size(&self) -> usize { + self.size + } + /// The inner values pub fn values(&self) -> &M { &self.values diff --git a/src/io/json/read/deserialize.rs b/src/io/json/read/deserialize.rs index ddc07e6c70c..97b68098914 100644 --- a/src/io/json/read/deserialize.rs +++ b/src/io/json/read/deserialize.rs @@ -293,6 +293,30 @@ fn deserialize_list_into<'a, O: Offset, A: Borrow>>( target.expand(arrays); } +fn deserialize_fixed_size_list_into<'a, A: Borrow>>( + target: &mut MutableFixedSizeListArray>, + rows: &[A], +) { + for row in rows { + match row.borrow() { + Value::Array(value) => { + if value.len() == target.size() { + { + let child = target.mut_values(); + deserialize_into(child, &value); + } + // unless alignment is already off, the if above should + // prevent this from ever happening. + target.try_push_valid().expect("unaligned backing array"); + } else { + target.push_null(); + } + } + _ => target.push_null(), + } + } +} + fn try_deserialize_into<'a, A: Borrow>, T: NativeType>( target: &mut Box, rows: &[A], @@ -323,6 +347,11 @@ fn deserialize_into<'a, A: Borrow>>(target: &mut Box .downcast_mut::>>() { deserialize_list_into(list_array, rows); + } else if let Some(fixed_size_list_array) = target + .as_mut_any() + .downcast_mut::>>() + { + deserialize_fixed_size_list_into(fixed_size_list_array, rows); } else if try_generic_deserialize_into::<_, MutableBooleanArray>( target, rows, @@ -543,28 +572,35 @@ pub fn deserialize(json: &Value, data_type: DataType) -> Result, } fn allocate_array(f: &Field) -> Box { - use PrimitiveType::*; - match f.data_type() { - DataType::List(inner) => match inner.data_type().to_physical_type() { - PhysicalType::Primitive(Int8) => Box::new(MutablePrimitiveArray::::new()), - PhysicalType::Primitive(Int16) => Box::new(MutablePrimitiveArray::::new()), - PhysicalType::Primitive(Int32) => Box::new(MutablePrimitiveArray::::new()), - PhysicalType::Primitive(Int64) => Box::new(MutablePrimitiveArray::::new()), - PhysicalType::Primitive(Int128) => Box::new(MutablePrimitiveArray::::new()), - PhysicalType::Primitive(UInt8) => Box::new(MutablePrimitiveArray::::new()), - PhysicalType::Primitive(UInt16) => Box::new(MutablePrimitiveArray::::new()), - PhysicalType::Primitive(UInt32) => Box::new(MutablePrimitiveArray::::new()), - PhysicalType::Primitive(UInt64) => Box::new(MutablePrimitiveArray::::new()), - PhysicalType::Primitive(Float16) => Box::new(MutablePrimitiveArray::::new()), - PhysicalType::Primitive(Float32) => Box::new(MutablePrimitiveArray::::new()), - PhysicalType::Primitive(Float64) => Box::new(MutablePrimitiveArray::::new()), - PhysicalType::List => Box::new(MutableListArray::::new_from( - allocate_array(inner), - inner.data_type().clone(), - 0, - )), + fn allocate_inner(f: &Field) -> Box { + match f.data_type() { + DataType::Int8 => Box::new(MutablePrimitiveArray::::new()), + DataType::Int16 => Box::new(MutablePrimitiveArray::::new()), + DataType::Int32 => Box::new(MutablePrimitiveArray::::new()), + DataType::Int64 => Box::new(MutablePrimitiveArray::::new()), + DataType::UInt8 => Box::new(MutablePrimitiveArray::::new()), + DataType::UInt16 => Box::new(MutablePrimitiveArray::::new()), + DataType::UInt32 => Box::new(MutablePrimitiveArray::::new()), + DataType::UInt64 => Box::new(MutablePrimitiveArray::::new()), + DataType::Float16 => Box::new(MutablePrimitiveArray::::new()), + DataType::Float32 => Box::new(MutablePrimitiveArray::::new()), + DataType::Float64 => Box::new(MutablePrimitiveArray::::new()), + DataType::List(..) => allocate_array(f), + DataType::FixedSizeList(..) => allocate_array(f), _ => todo!(), - }, + } + } + match f.data_type() { + DataType::List(inner) => Box::new(MutableListArray::::new_from( + allocate_array(inner), + f.data_type().clone(), + 0, + )), + DataType::FixedSizeList(child, size) => Box::new(MutableFixedSizeListArray::<_>::new_from( + allocate_inner(child), + f.data_type().clone(), + *size, + )), _ => todo!(), } } diff --git a/src/io/json/write/serialize.rs b/src/io/json/write/serialize.rs index 1e2c8445eb2..9e7b54b5c43 100644 --- a/src/io/json/write/serialize.rs +++ b/src/io/json/write/serialize.rs @@ -165,6 +165,34 @@ fn list_serializer<'a, O: Offset>( )) } +fn fixed_size_list_serializer<'a>( + array: &'a FixedSizeListArray, +) -> Box + 'a + Send + Sync> { + let mut serializer = new_serializer(array.values().as_ref()); + + Box::new(BufStreamingIterator::new( + zip_validity(0..array.len(), array.validity().map(|x| x.iter())), + move |ix, buf| { + if let Some(_) = ix { + let length = array.size(); + buf.push(b'['); + let mut is_first_row = true; + for _ in 0..length { + if !is_first_row { + buf.push(b','); + } + is_first_row = false; + buf.extend(serializer.next().unwrap()); + } + buf.push(b']'); + } else { + buf.extend(b"null"); + } + }, + vec![], + )) +} + fn date_serializer<'a, T, F>( array: &'a PrimitiveArray, convert: F, @@ -226,6 +254,9 @@ pub(crate) fn new_serializer<'a>( DataType::Utf8 => utf8_serializer::(array.as_any().downcast_ref().unwrap()), DataType::LargeUtf8 => utf8_serializer::(array.as_any().downcast_ref().unwrap()), DataType::Struct(_) => struct_serializer(array.as_any().downcast_ref().unwrap()), + DataType::FixedSizeList(_, _) => { + fixed_size_list_serializer(array.as_any().downcast_ref().unwrap()) + } DataType::List(_) => list_serializer::(array.as_any().downcast_ref().unwrap()), DataType::LargeList(_) => list_serializer::(array.as_any().downcast_ref().unwrap()), DataType::Date32 => date_serializer(array.as_any().downcast_ref().unwrap(), date32_to_date), diff --git a/tests/it/io/json/read.rs b/tests/it/io/json/read.rs index 25e41669cee..e540503d0d4 100644 --- a/tests/it/io/json/read.rs +++ b/tests/it/io/json/read.rs @@ -116,3 +116,55 @@ fn read_json_records() -> Result<()> { Ok(()) } + +#[test] +fn read_json_fixed_size_records() -> Result<()> { + let data = br#"[ + { + "a": [1, 2.2, 3, 4] + }, + { + "a": [5, 6, 7, 8] + }, + { + "a": [7, 8, 9] + } + ]"#; + + let a_iter = vec![ + Some(vec![Some(1.), Some(2.2), Some(3.), Some(4.)]), + Some(vec![Some(5.), Some(6.), Some(7.), Some(8.)]), + None, + ]; + + let a_iter = a_iter.into_iter(); + let mut a = MutableFixedSizeListArray::>::new_with_field( + MutablePrimitiveArray::::new(), + "inner", + false, + 4, + ); + a.try_extend(a_iter).unwrap(); + let a_expected: FixedSizeListArray = a.into(); + + let json = json_deserializer::parse(data)?; + + let schema = Schema { + fields: vec![Field::new("a", a_expected.data_type().clone(), true)], + metadata: Metadata::default(), + }; + let actual = read::deserialize_records(&json, &schema)?; + + for (f, arr) in schema.fields.iter().zip(actual.arrays().iter()) { + let (expected, actual) = if f.name == "a" { + (&a_expected, arr.as_ref()) + } else { + panic!("unexpected field found: {}", f.name); + }; + + // No idea why assert_eq! doesn't work here, but this does. + assert_eq!(format!("{:?}", expected), format!("{:?}", actual)); + } + + Ok(()) +} diff --git a/tests/it/io/json/write.rs b/tests/it/io/json/write.rs index f224a612173..19785d385ff 100644 --- a/tests/it/io/json/write.rs +++ b/tests/it/io/json/write.rs @@ -254,6 +254,39 @@ fn nested_list_records() -> Result<()> { Ok(()) } +#[test] +fn fixed_size_list_records() -> Result<()> { + let iter = vec![ + vec![Some(1), Some(2), Some(3)], + vec![Some(4), Some(5), Some(6)], + ]; + + let iter = iter.into_iter().map(Some); + + let mut inner = MutableFixedSizeListArray::>::new_with_field( + MutablePrimitiveArray::new(), + "vs", + false, + 3, + ); + inner.try_extend(iter).unwrap(); + let inner: FixedSizeListArray = inner.into(); + + let schema = Schema { + fields: vec![Field::new("vs", inner.data_type().clone(), true)], + metadata: Metadata::default(), + }; + + let arrays: Vec> = vec![Box::new(inner)]; + let chunk = Chunk::new(arrays); + + let expected = r#"[{"vs":[1,2,3]},{"vs":[4,5,6]}]"#; + + let buf = write_record_batch(schema, chunk)?; + assert_eq!(String::from_utf8(buf).unwrap(), expected); + Ok(()) +} + #[test] fn list_of_struct() -> Result<()> { let inner = vec![Field::new("c121", DataType::Utf8, false)]; From 2ed1e55af0bb4489ebdd5a1a7fdb68a449767379 Mon Sep 17 00:00:00 2001 From: Frank Murphy Date: Fri, 19 Aug 2022 16:02:26 -0400 Subject: [PATCH 07/21] Fix issue with nested lists --- src/io/json/read/deserialize.rs | 44 +++++++++++++++------------------ 1 file changed, 20 insertions(+), 24 deletions(-) diff --git a/src/io/json/read/deserialize.rs b/src/io/json/read/deserialize.rs index 97b68098914..dab81f9392a 100644 --- a/src/io/json/read/deserialize.rs +++ b/src/io/json/read/deserialize.rs @@ -572,35 +572,31 @@ pub fn deserialize(json: &Value, data_type: DataType) -> Result, } fn allocate_array(f: &Field) -> Box { - fn allocate_inner(f: &Field) -> Box { - match f.data_type() { - DataType::Int8 => Box::new(MutablePrimitiveArray::::new()), - DataType::Int16 => Box::new(MutablePrimitiveArray::::new()), - DataType::Int32 => Box::new(MutablePrimitiveArray::::new()), - DataType::Int64 => Box::new(MutablePrimitiveArray::::new()), - DataType::UInt8 => Box::new(MutablePrimitiveArray::::new()), - DataType::UInt16 => Box::new(MutablePrimitiveArray::::new()), - DataType::UInt32 => Box::new(MutablePrimitiveArray::::new()), - DataType::UInt64 => Box::new(MutablePrimitiveArray::::new()), - DataType::Float16 => Box::new(MutablePrimitiveArray::::new()), - DataType::Float32 => Box::new(MutablePrimitiveArray::::new()), - DataType::Float64 => Box::new(MutablePrimitiveArray::::new()), - DataType::List(..) => allocate_array(f), - DataType::FixedSizeList(..) => allocate_array(f), - _ => todo!(), - } - } match f.data_type() { - DataType::List(inner) => Box::new(MutableListArray::::new_from( + DataType::Int8 => Box::new(MutablePrimitiveArray::::new()), + DataType::Int16 => Box::new(MutablePrimitiveArray::::new()), + DataType::Int32 => Box::new(MutablePrimitiveArray::::new()), + DataType::Int64 => Box::new(MutablePrimitiveArray::::new()), + DataType::UInt8 => Box::new(MutablePrimitiveArray::::new()), + DataType::UInt16 => Box::new(MutablePrimitiveArray::::new()), + DataType::UInt32 => Box::new(MutablePrimitiveArray::::new()), + DataType::UInt64 => Box::new(MutablePrimitiveArray::::new()), + DataType::Float16 => Box::new(MutablePrimitiveArray::::new()), + DataType::Float32 => Box::new(MutablePrimitiveArray::::new()), + DataType::Float64 => Box::new(MutablePrimitiveArray::::new()), + DataType::FixedSizeList(inner, size) => Box::new(MutableFixedSizeListArray::<_>::new_from( allocate_array(inner), f.data_type().clone(), - 0, - )), - DataType::FixedSizeList(child, size) => Box::new(MutableFixedSizeListArray::<_>::new_from( - allocate_inner(child), - f.data_type().clone(), *size, )), + DataType::List(inner) => match inner.data_type() { + DataType::List(_) => Box::new(MutableListArray::::new_from( + allocate_array(inner), + inner.data_type().clone(), + 0, + )), + _ => allocate_array(inner), + }, _ => todo!(), } } From d10a8de9848b81c498348af9804d59c2be0d1372 Mon Sep 17 00:00:00 2001 From: Frank Murphy Date: Thu, 29 Sep 2022 16:05:01 -0400 Subject: [PATCH 08/21] Fix MutableArray impl --- src/array/mod.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/array/mod.rs b/src/array/mod.rs index c0e9f141e45..b03e44f5bce 100644 --- a/src/array/mod.rs +++ b/src/array/mod.rs @@ -217,6 +217,10 @@ impl MutableArray for Box { fn shrink_to_fit(&mut self) { self.as_mut().shrink_to_fit(); } + + fn reserve(&mut self, additional: usize) { + self.as_mut().reserve(additional); + } } macro_rules! general_dyn { From 4e1d1bb93a21a2bee94724671b9ed6ee12d01dc9 Mon Sep 17 00:00:00 2001 From: Frank Murphy Date: Thu, 13 Oct 2022 16:23:55 -0400 Subject: [PATCH 09/21] s/expand/extend_offsets/ --- src/array/list/mutable.rs | 6 +++--- src/io/json/read/deserialize.rs | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/array/list/mutable.rs b/src/array/list/mutable.rs index 6c9a2fc894f..f654c47c9a0 100644 --- a/src/array/list/mutable.rs +++ b/src/array/list/mutable.rs @@ -164,7 +164,7 @@ impl MutableListArray { /// - the new offsets are not in monotonic increasing order. /// - any new offset is not in bounds of the backing array. /// - the passed iterator has no upper bound. - pub fn expand(&mut self, expansion: II) + pub fn extend_offsets(&mut self, expansion: II) where II: IntoIterator> + TrustedLen, { @@ -175,7 +175,7 @@ impl MutableListArray { self.offsets.push(O::zero()); } // safety: checked below - unsafe { self.unsafe_expand(expansion) }; + unsafe { self.unsafe_extend_offsets(expansion) }; if self.offsets.len() > current_len { // check all inserted offsets try_check_offsets(&self.offsets[current_len..], self.values.len()) @@ -191,7 +191,7 @@ impl MutableListArray { /// zero if the array is currently empty. /// /// Panics if the passed iterator has no upper bound. - pub unsafe fn unsafe_expand(&mut self, expansion: II) + pub unsafe fn unsafe_extend_offsets(&mut self, expansion: II) where II: IntoIterator> + TrustedLen, { diff --git a/src/io/json/read/deserialize.rs b/src/io/json/read/deserialize.rs index dab81f9392a..878be5dd1a6 100644 --- a/src/io/json/read/deserialize.rs +++ b/src/io/json/read/deserialize.rs @@ -288,9 +288,9 @@ fn deserialize_list_into<'a, O: Offset, A: Borrow>>( } }); - // though this will always be safe, we cannot use unsafe_expand here due to - // `#![forbid(unsafe_code)]` on the io module - target.expand(arrays); + // though this will always be safe, we cannot use unsafe_extend_offsets here + // due to `#![forbid(unsafe_code)]` on the io module + target.extend_offsets(arrays); } fn deserialize_fixed_size_list_into<'a, A: Borrow>>( From a24d74313520d304bde7f2d137c0115989403932 Mon Sep 17 00:00:00 2001 From: Frank Murphy Date: Thu, 13 Oct 2022 16:30:16 -0400 Subject: [PATCH 10/21] s/Preallocate/Container --- src/array/binary/mutable.rs | 4 ++-- src/array/boolean/mutable.rs | 4 ++-- src/array/fixed_size_binary/mutable.rs | 4 ++-- src/array/list/mutable.rs | 4 ++-- src/array/mod.rs | 2 +- src/array/primitive/mutable.rs | 4 ++-- src/array/utf8/mutable.rs | 4 ++-- src/io/json/read/deserialize.rs | 2 +- 8 files changed, 14 insertions(+), 14 deletions(-) diff --git a/src/array/binary/mutable.rs b/src/array/binary/mutable.rs index 936f9d074bf..20c0bfe0bc5 100644 --- a/src/array/binary/mutable.rs +++ b/src/array/binary/mutable.rs @@ -2,7 +2,7 @@ use std::{iter::FromIterator, sync::Arc}; use crate::{ array::{ - specification::check_offsets, Array, MutableArray, Offset, Preallocate, TryExtend, TryPush, + specification::check_offsets, Array, Container, MutableArray, Offset, TryExtend, TryPush, }, bitmap::MutableBitmap, datatypes::DataType, @@ -188,7 +188,7 @@ impl MutableBinaryArray { } } -impl Preallocate for MutableBinaryArray { +impl Container for MutableBinaryArray { fn with_capacity(capacity: usize) -> Self { MutableBinaryArray::with_capacity(capacity) } diff --git a/src/array/boolean/mutable.rs b/src/array/boolean/mutable.rs index d40522a961e..0f05b1a8f21 100644 --- a/src/array/boolean/mutable.rs +++ b/src/array/boolean/mutable.rs @@ -2,7 +2,7 @@ use std::iter::FromIterator; use std::sync::Arc; use crate::{ - array::{Array, MutableArray, Preallocate, TryExtend, TryPush}, + array::{Array, Container, MutableArray, TryExtend, TryPush}, bitmap::MutableBitmap, datatypes::{DataType, PhysicalType}, error::Result, @@ -453,7 +453,7 @@ impl>> FromIterator for MutableBoolea } } -impl Preallocate for MutableBooleanArray { +impl Container for MutableBooleanArray { fn with_capacity(capacity: usize) -> Self { MutableBooleanArray::with_capacity(capacity) } diff --git a/src/array/fixed_size_binary/mutable.rs b/src/array/fixed_size_binary/mutable.rs index c5ef7176b79..0077be382e5 100644 --- a/src/array/fixed_size_binary/mutable.rs +++ b/src/array/fixed_size_binary/mutable.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use crate::{ - array::{Array, MutableArray, Preallocate}, + array::{Array, Container, MutableArray}, bitmap::MutableBitmap, datatypes::DataType, error::{Error, Result}, @@ -211,7 +211,7 @@ impl MutableFixedSizeBinaryArray { } } -impl Preallocate for MutableFixedSizeBinaryArray { +impl Container for MutableFixedSizeBinaryArray { fn with_capacity(capacity: usize) -> Self { MutableFixedSizeBinaryArray::with_capacity(capacity, 0) } diff --git a/src/array/list/mutable.rs b/src/array/list/mutable.rs index f654c47c9a0..c0fb107b947 100644 --- a/src/array/list/mutable.rs +++ b/src/array/list/mutable.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use crate::{ array::{ - specification::try_check_offsets, Array, MutableArray, Offset, Preallocate, TryExtend, + specification::try_check_offsets, Array, Container, MutableArray, Offset, TryExtend, TryPush, }, bitmap::MutableBitmap, @@ -282,7 +282,7 @@ impl MutableListArray { } } -impl Preallocate for MutableListArray { +impl Container for MutableListArray { fn with_capacity(capacity: usize) -> Self { MutableListArray::with_capacity(capacity) } diff --git a/src/array/mod.rs b/src/array/mod.rs index b03e44f5bce..e0751cba226 100644 --- a/src/array/mod.rs +++ b/src/array/mod.rs @@ -116,7 +116,7 @@ dyn_clone::clone_trait_object!(Array); /// A trait describing an array with a backing store that can be preallocated to /// a given size. -pub trait Preallocate { +pub trait Container { /// Create this array with a given capacity. fn with_capacity(capacity: usize) -> Self where diff --git a/src/array/primitive/mutable.rs b/src/array/primitive/mutable.rs index 3b238b857a1..d385520cccf 100644 --- a/src/array/primitive/mutable.rs +++ b/src/array/primitive/mutable.rs @@ -2,7 +2,7 @@ use std::{iter::FromIterator, sync::Arc}; use crate::bitmap::Bitmap; use crate::{ - array::{Array, MutableArray, Preallocate, TryExtend, TryPush}, + array::{Array, Container, MutableArray, TryExtend, TryPush}, bitmap::MutableBitmap, datatypes::DataType, error::{Error, Result}, @@ -378,7 +378,7 @@ impl TryPush> for MutablePrimitiveArray { } } -impl Preallocate for MutablePrimitiveArray { +impl Container for MutablePrimitiveArray { fn with_capacity(capacity: usize) -> Self { MutablePrimitiveArray::with_capacity(capacity) } diff --git a/src/array/utf8/mutable.rs b/src/array/utf8/mutable.rs index fcff8204685..b81303091e6 100644 --- a/src/array/utf8/mutable.rs +++ b/src/array/utf8/mutable.rs @@ -2,7 +2,7 @@ use std::{iter::FromIterator, sync::Arc}; use crate::array::physical_binary::*; use crate::{ - array::{Array, MutableArray, Offset, Preallocate, TryExtend, TryPush}, + array::{Array, Container, MutableArray, Offset, TryExtend, TryPush}, bitmap::{ utils::{BitmapIter, ZipValidity}, Bitmap, MutableBitmap, @@ -247,7 +247,7 @@ impl MutableUtf8Array { } } -impl Preallocate for MutableUtf8Array { +impl Container for MutableUtf8Array { fn with_capacity(capacity: usize) -> Self { MutableUtf8Array::with_capacity(capacity) } diff --git a/src/io/json/read/deserialize.rs b/src/io/json/read/deserialize.rs index 878be5dd1a6..bf6f3886e4f 100644 --- a/src/io/json/read/deserialize.rs +++ b/src/io/json/read/deserialize.rs @@ -470,7 +470,7 @@ where fn fill_generic_array_from(f: fn(&mut M, &[B]), rows: &[B]) -> Box where - M: Preallocate, + M: Container, A: From + Array, { let mut array = M::with_capacity(rows.len()); From 33fde655b6b652b274f91de56c6e270c75bea9ed Mon Sep 17 00:00:00 2001 From: Frank Murphy Date: Thu, 13 Oct 2022 16:31:18 -0400 Subject: [PATCH 11/21] Remove unused --- src/io/json/read/deserialize.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/io/json/read/deserialize.rs b/src/io/json/read/deserialize.rs index bf6f3886e4f..e4aeabce2ac 100644 --- a/src/io/json/read/deserialize.rs +++ b/src/io/json/read/deserialize.rs @@ -10,9 +10,9 @@ use crate::{ array::*, bitmap::MutableBitmap, chunk::Chunk, - datatypes::{DataType, Field, IntervalUnit, PhysicalType, Schema}, + datatypes::{DataType, Field, IntervalUnit, Schema}, error::Error, - types::{f16, NativeType, PrimitiveType}, + types::{f16, NativeType}, }; /// A function that converts a &Value into an optional tuple of a byte slice and a Value. From 31994320a4bae05f8d30703b901c9947b2bd7b2d Mon Sep 17 00:00:00 2001 From: Frank Murphy Date: Thu, 20 Oct 2022 12:14:15 -0400 Subject: [PATCH 12/21] s/zip_validity/ZipValidity --- src/io/json/write/serialize.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/io/json/write/serialize.rs b/src/io/json/write/serialize.rs index 9e7b54b5c43..7320094fdce 100644 --- a/src/io/json/write/serialize.rs +++ b/src/io/json/write/serialize.rs @@ -171,7 +171,7 @@ fn fixed_size_list_serializer<'a>( let mut serializer = new_serializer(array.values().as_ref()); Box::new(BufStreamingIterator::new( - zip_validity(0..array.len(), array.validity().map(|x| x.iter())), + ZipValidity::new(0..array.len(), array.validity().map(|x| x.iter())), move |ix, buf| { if let Some(_) = ix { let length = array.size(); From 0c167c895e8dccf4bffccfe0e3d627b9f2ecc8d9 Mon Sep 17 00:00:00 2001 From: Frank Murphy Date: Thu, 20 Oct 2022 14:50:20 -0400 Subject: [PATCH 13/21] Clippy fixes --- src/array/list/mutable.rs | 10 ++++++++-- src/io/json/read/deserialize.rs | 7 ++++--- src/io/json/write/serialize.rs | 2 +- 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/src/array/list/mutable.rs b/src/array/list/mutable.rs index c0fb107b947..a4f9f74b752 100644 --- a/src/array/list/mutable.rs +++ b/src/array/list/mutable.rs @@ -184,8 +184,14 @@ impl MutableListArray { // else expansion is empty, and this is trivially safe. } - /// Expand this array. Assumes that `offsets` are in order, and do not - /// overrun the underlying `values` backing array. + /// Expand this array, using elements from the underlying backing array. + /// Assumes the expansion begins at the highest previous offset, or zero if + /// this [MutableListArray] is currently empty. + /// + /// # Safety + /// + /// Assumes that `offsets` are in order, and do not overrun the underlying + /// `values` backing array. /// /// Also assumes the expansion begins at the highest previous offset, or /// zero if the array is currently empty. diff --git a/src/io/json/read/deserialize.rs b/src/io/json/read/deserialize.rs index e4aeabce2ac..74711204103 100644 --- a/src/io/json/read/deserialize.rs +++ b/src/io/json/read/deserialize.rs @@ -282,7 +282,7 @@ fn deserialize_list_into<'a, O: Offset, A: Borrow>>( Value::Array(value) => { // todo make this an Err position += O::from_usize(value.len()).expect("List offset is too large :/"); - Some(position.clone()) + Some(position) } _ => None, } @@ -303,7 +303,7 @@ fn deserialize_fixed_size_list_into<'a, A: Borrow>>( if value.len() == target.size() { { let child = target.mut_values(); - deserialize_into(child, &value); + deserialize_into(child, value); } // unless alignment is already off, the if above should // prevent this from ever happening. @@ -342,6 +342,7 @@ fn try_generic_deserialize_into<'a, A: Borrow>, M: 'static>( fn deserialize_into<'a, A: Borrow>>(target: &mut Box, rows: &[A]) { // It'd be nice to have something like pattern matching for downcasting from Any // I'm not aware of anything like that, which leads to this ... ugliness + #[allow(clippy::if_same_then_else)] if let Some(list_array) = target .as_mut_any() .downcast_mut::>>() @@ -622,7 +623,7 @@ pub fn deserialize_records(json: &Value, schema: &Schema) -> Result>(); match json { diff --git a/src/io/json/write/serialize.rs b/src/io/json/write/serialize.rs index 7320094fdce..ebdb2f3b81b 100644 --- a/src/io/json/write/serialize.rs +++ b/src/io/json/write/serialize.rs @@ -173,7 +173,7 @@ fn fixed_size_list_serializer<'a>( Box::new(BufStreamingIterator::new( ZipValidity::new(0..array.len(), array.validity().map(|x| x.iter())), move |ix, buf| { - if let Some(_) = ix { + if ix.is_some() { let length = array.size(); buf.push(b'['); let mut is_first_row = true; From d11836583cb5dc0b95494da56cdf3bf3ade711e4 Mon Sep 17 00:00:00 2001 From: Frank Murphy Date: Fri, 21 Oct 2022 12:07:35 -0400 Subject: [PATCH 14/21] Use .into() Co-authored-by: Jorge Leitao --- tests/it/io/json/read.rs | 5 +---- tests/it/io/json/write.rs | 7 ++----- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/tests/it/io/json/read.rs b/tests/it/io/json/read.rs index e540503d0d4..3fb9960a299 100644 --- a/tests/it/io/json/read.rs +++ b/tests/it/io/json/read.rs @@ -149,10 +149,7 @@ fn read_json_fixed_size_records() -> Result<()> { let json = json_deserializer::parse(data)?; - let schema = Schema { - fields: vec![Field::new("a", a_expected.data_type().clone(), true)], - metadata: Metadata::default(), - }; + let schema: Schema = vec![Field::new("a", a_expected.data_type().clone(), true)].into(); let actual = read::deserialize_records(&json, &schema)?; for (f, arr) in schema.fields.iter().zip(actual.arrays().iter()) { diff --git a/tests/it/io/json/write.rs b/tests/it/io/json/write.rs index 19785d385ff..7f57f7c9c5a 100644 --- a/tests/it/io/json/write.rs +++ b/tests/it/io/json/write.rs @@ -235,13 +235,10 @@ fn nested_list_records() -> Result<()> { let c2 = Utf8Array::::from(&vec![Some("foo"), Some("bar"), None]); - let schema = Schema { - fields: vec![ + let schema: Schema = vec![ Field::new("c1", c1.data_type().clone(), true), Field::new("c2", c2.data_type().clone(), true), - ], - metadata: Metadata::default(), - }; + ].into(); let arrays: Vec> = vec![Box::new(c1), Box::new(c2)]; let chunk = Chunk::new(arrays); From 10159503b426b1a108fe1a768d933e73cf21c51a Mon Sep 17 00:00:00 2001 From: Frank Murphy Date: Fri, 21 Oct 2022 12:08:07 -0400 Subject: [PATCH 15/21] Simplify Co-authored-by: Jorge Leitao --- src/array/list/mutable.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/array/list/mutable.rs b/src/array/list/mutable.rs index a4f9f74b752..16a779b064b 100644 --- a/src/array/list/mutable.rs +++ b/src/array/list/mutable.rs @@ -166,7 +166,7 @@ impl MutableListArray { /// - the passed iterator has no upper bound. pub fn extend_offsets(&mut self, expansion: II) where - II: IntoIterator> + TrustedLen, + II: TrustedLen>, { let current_len = self.offsets.len(); let (_, upper) = expansion.size_hint(); @@ -199,7 +199,7 @@ impl MutableListArray { /// Panics if the passed iterator has no upper bound. pub unsafe fn unsafe_extend_offsets(&mut self, expansion: II) where - II: IntoIterator> + TrustedLen, + II: TrustedLen>, { let (_, upper) = expansion.size_hint(); let upper = upper.expect("iterator must have upper bound"); From 1fb85ec11513b38eaf6277bb7fcb6de1721e1f1d Mon Sep 17 00:00:00 2001 From: Frank Murphy Date: Fri, 21 Oct 2022 12:10:32 -0400 Subject: [PATCH 16/21] Make crate-private Co-authored-by: Jorge Leitao --- src/array/list/mutable.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/array/list/mutable.rs b/src/array/list/mutable.rs index 16a779b064b..b87c0761519 100644 --- a/src/array/list/mutable.rs +++ b/src/array/list/mutable.rs @@ -164,7 +164,7 @@ impl MutableListArray { /// - the new offsets are not in monotonic increasing order. /// - any new offset is not in bounds of the backing array. /// - the passed iterator has no upper bound. - pub fn extend_offsets(&mut self, expansion: II) + pub(crate) fn extend_offsets(&mut self, expansion: II) where II: TrustedLen>, { @@ -197,7 +197,7 @@ impl MutableListArray { /// zero if the array is currently empty. /// /// Panics if the passed iterator has no upper bound. - pub unsafe fn unsafe_extend_offsets(&mut self, expansion: II) + pub(crate) unsafe fn unsafe_extend_offsets(&mut self, expansion: II) where II: TrustedLen>, { From b399166e923c33d1f76770a6567bc2e520a3c68c Mon Sep 17 00:00:00 2001 From: Frank Murphy Date: Fri, 21 Oct 2022 13:47:23 -0400 Subject: [PATCH 17/21] cargo fmt --- tests/it/io/json/write.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/it/io/json/write.rs b/tests/it/io/json/write.rs index 7f57f7c9c5a..8c7830f8ef7 100644 --- a/tests/it/io/json/write.rs +++ b/tests/it/io/json/write.rs @@ -236,9 +236,10 @@ fn nested_list_records() -> Result<()> { let c2 = Utf8Array::::from(&vec![Some("foo"), Some("bar"), None]); let schema: Schema = vec![ - Field::new("c1", c1.data_type().clone(), true), - Field::new("c2", c2.data_type().clone(), true), - ].into(); + Field::new("c1", c1.data_type().clone(), true), + Field::new("c2", c2.data_type().clone(), true), + ] + .into(); let arrays: Vec> = vec![Box::new(c1), Box::new(c2)]; let chunk = Chunk::new(arrays); From 5ceee98574b4c3fb795627287d2338ee968e5ace Mon Sep 17 00:00:00 2001 From: Frank Murphy Date: Thu, 27 Oct 2022 16:59:47 -0400 Subject: [PATCH 18/21] Update src/array/mod.rs Co-authored-by: Jorge Leitao --- src/array/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/array/mod.rs b/src/array/mod.rs index e0751cba226..a9c58ab0a87 100644 --- a/src/array/mod.rs +++ b/src/array/mod.rs @@ -116,7 +116,7 @@ dyn_clone::clone_trait_object!(Array); /// A trait describing an array with a backing store that can be preallocated to /// a given size. -pub trait Container { +pub(crate) trait Container { /// Create this array with a given capacity. fn with_capacity(capacity: usize) -> Self where From 757385a5cfdd20f7dc035ae851b58042d014c4e3 Mon Sep 17 00:00:00 2001 From: Frank Murphy Date: Thu, 27 Oct 2022 17:08:24 -0400 Subject: [PATCH 19/21] Move Container to json --- src/array/binary/mutable.rs | 10 +----- src/array/boolean/mutable.rs | 8 +---- src/array/fixed_size_binary/mutable.rs | 8 +---- src/array/list/mutable.rs | 11 +------ src/array/primitive/mutable.rs | 8 +---- src/array/utf8/mutable.rs | 8 +---- src/io/json/read/deserialize.rs | 45 ++++++++++++++++++++++++++ 7 files changed, 51 insertions(+), 47 deletions(-) diff --git a/src/array/binary/mutable.rs b/src/array/binary/mutable.rs index 20c0bfe0bc5..c1ee4ba8a6e 100644 --- a/src/array/binary/mutable.rs +++ b/src/array/binary/mutable.rs @@ -1,9 +1,7 @@ use std::{iter::FromIterator, sync::Arc}; use crate::{ - array::{ - specification::check_offsets, Array, Container, MutableArray, Offset, TryExtend, TryPush, - }, + array::{specification::check_offsets, Array, MutableArray, Offset, TryExtend, TryPush}, bitmap::MutableBitmap, datatypes::DataType, error::{Error, Result}, @@ -188,12 +186,6 @@ impl MutableBinaryArray { } } -impl Container for MutableBinaryArray { - fn with_capacity(capacity: usize) -> Self { - MutableBinaryArray::with_capacity(capacity) - } -} - impl MutableArray for MutableBinaryArray { fn len(&self) -> usize { self.offsets.len() - 1 diff --git a/src/array/boolean/mutable.rs b/src/array/boolean/mutable.rs index 0f05b1a8f21..83fac4b7ef5 100644 --- a/src/array/boolean/mutable.rs +++ b/src/array/boolean/mutable.rs @@ -2,7 +2,7 @@ use std::iter::FromIterator; use std::sync::Arc; use crate::{ - array::{Array, Container, MutableArray, TryExtend, TryPush}, + array::{Array, MutableArray, TryExtend, TryPush}, bitmap::MutableBitmap, datatypes::{DataType, PhysicalType}, error::Result, @@ -453,12 +453,6 @@ impl>> FromIterator for MutableBoolea } } -impl Container for MutableBooleanArray { - fn with_capacity(capacity: usize) -> Self { - MutableBooleanArray::with_capacity(capacity) - } -} - impl MutableArray for MutableBooleanArray { fn len(&self) -> usize { self.values.len() diff --git a/src/array/fixed_size_binary/mutable.rs b/src/array/fixed_size_binary/mutable.rs index 0077be382e5..3a02158cc88 100644 --- a/src/array/fixed_size_binary/mutable.rs +++ b/src/array/fixed_size_binary/mutable.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use crate::{ - array::{Array, Container, MutableArray}, + array::{Array, MutableArray}, bitmap::MutableBitmap, datatypes::DataType, error::{Error, Result}, @@ -211,12 +211,6 @@ impl MutableFixedSizeBinaryArray { } } -impl Container for MutableFixedSizeBinaryArray { - fn with_capacity(capacity: usize) -> Self { - MutableFixedSizeBinaryArray::with_capacity(capacity, 0) - } -} - impl MutableArray for MutableFixedSizeBinaryArray { fn len(&self) -> usize { self.values.len() / self.size diff --git a/src/array/list/mutable.rs b/src/array/list/mutable.rs index b87c0761519..b4f3c9d1d3b 100644 --- a/src/array/list/mutable.rs +++ b/src/array/list/mutable.rs @@ -1,10 +1,7 @@ use std::sync::Arc; use crate::{ - array::{ - specification::try_check_offsets, Array, Container, MutableArray, Offset, TryExtend, - TryPush, - }, + array::{specification::try_check_offsets, Array, MutableArray, Offset, TryExtend, TryPush}, bitmap::MutableBitmap, datatypes::{DataType, Field}, error::{Error, Result}, @@ -288,12 +285,6 @@ impl MutableListArray { } } -impl Container for MutableListArray { - fn with_capacity(capacity: usize) -> Self { - MutableListArray::with_capacity(capacity) - } -} - impl MutableArray for MutableListArray { fn len(&self) -> usize { MutableListArray::len(self) diff --git a/src/array/primitive/mutable.rs b/src/array/primitive/mutable.rs index d385520cccf..5f2014db319 100644 --- a/src/array/primitive/mutable.rs +++ b/src/array/primitive/mutable.rs @@ -2,7 +2,7 @@ use std::{iter::FromIterator, sync::Arc}; use crate::bitmap::Bitmap; use crate::{ - array::{Array, Container, MutableArray, TryExtend, TryPush}, + array::{Array, MutableArray, TryExtend, TryPush}, bitmap::MutableBitmap, datatypes::DataType, error::{Error, Result}, @@ -378,12 +378,6 @@ impl TryPush> for MutablePrimitiveArray { } } -impl Container for MutablePrimitiveArray { - fn with_capacity(capacity: usize) -> Self { - MutablePrimitiveArray::with_capacity(capacity) - } -} - impl MutableArray for MutablePrimitiveArray { fn len(&self) -> usize { self.values.len() diff --git a/src/array/utf8/mutable.rs b/src/array/utf8/mutable.rs index b81303091e6..4cc83da9011 100644 --- a/src/array/utf8/mutable.rs +++ b/src/array/utf8/mutable.rs @@ -2,7 +2,7 @@ use std::{iter::FromIterator, sync::Arc}; use crate::array::physical_binary::*; use crate::{ - array::{Array, Container, MutableArray, Offset, TryExtend, TryPush}, + array::{Array, MutableArray, Offset, TryExtend, TryPush}, bitmap::{ utils::{BitmapIter, ZipValidity}, Bitmap, MutableBitmap, @@ -247,12 +247,6 @@ impl MutableUtf8Array { } } -impl Container for MutableUtf8Array { - fn with_capacity(capacity: usize) -> Self { - MutableUtf8Array::with_capacity(capacity) - } -} - impl MutableArray for MutableUtf8Array { fn len(&self) -> usize { self.len() diff --git a/src/io/json/read/deserialize.rs b/src/io/json/read/deserialize.rs index 74711204103..04d6e6ed599 100644 --- a/src/io/json/read/deserialize.rs +++ b/src/io/json/read/deserialize.rs @@ -469,6 +469,51 @@ where Box::new(A::from(array)) } +/// A trait describing an array with a backing store that can be preallocated to +/// a given size. +pub(crate) trait Container { + /// Create this array with a given capacity. + fn with_capacity(capacity: usize) -> Self + where + Self: Sized; +} + +impl Container for MutableBinaryArray { + fn with_capacity(capacity: usize) -> Self { + MutableBinaryArray::with_capacity(capacity) + } +} + +impl Container for MutableBooleanArray { + fn with_capacity(capacity: usize) -> Self { + MutableBooleanArray::with_capacity(capacity) + } +} + +impl Container for MutableFixedSizeBinaryArray { + fn with_capacity(capacity: usize) -> Self { + MutableFixedSizeBinaryArray::with_capacity(capacity, 0) + } +} + +impl Container for MutableListArray { + fn with_capacity(capacity: usize) -> Self { + MutableListArray::with_capacity(capacity) + } +} + +impl Container for MutablePrimitiveArray { + fn with_capacity(capacity: usize) -> Self { + MutablePrimitiveArray::with_capacity(capacity) + } +} + +impl Container for MutableUtf8Array { + fn with_capacity(capacity: usize) -> Self { + MutableUtf8Array::with_capacity(capacity) + } +} + fn fill_generic_array_from(f: fn(&mut M, &[B]), rows: &[B]) -> Box where M: Container, From 5b3ef81f776a234e4f3a5a5570544b95021efbcd Mon Sep 17 00:00:00 2001 From: Frank Murphy Date: Thu, 27 Oct 2022 17:28:08 -0400 Subject: [PATCH 20/21] Use match for deserialize_into --- src/io/json/read/deserialize.rs | 102 ++++++++++++++++---------------- 1 file changed, 51 insertions(+), 51 deletions(-) diff --git a/src/io/json/read/deserialize.rs b/src/io/json/read/deserialize.rs index 04d6e6ed599..84ad5bea984 100644 --- a/src/io/json/read/deserialize.rs +++ b/src/io/json/read/deserialize.rs @@ -317,69 +317,69 @@ fn deserialize_fixed_size_list_into<'a, A: Borrow>>( } } -fn try_deserialize_into<'a, A: Borrow>, T: NativeType>( +fn deserialize_primitive_into<'a, A: Borrow>, T: NativeType>( target: &mut Box, rows: &[A], deserialize_into: fn(&mut MutablePrimitiveArray, &[A]) -> (), -) -> bool { - try_generic_deserialize_into(target, rows, deserialize_into) +) { + generic_deserialize_into(target, rows, deserialize_into) } -fn try_generic_deserialize_into<'a, A: Borrow>, M: 'static>( +fn generic_deserialize_into<'a, A: Borrow>, M: 'static>( target: &mut Box, rows: &[A], deserialize_into: fn(&mut M, &[A]) -> (), -) -> bool { - if let Some(array) = target.as_mut_any().downcast_mut::() { - deserialize_into(array, rows); - true - } else { - false - } +) { + deserialize_into(target.as_mut_any().downcast_mut::().unwrap(), rows); } /// Deserialize `rows` by extending them into the given `target` fn deserialize_into<'a, A: Borrow>>(target: &mut Box, rows: &[A]) { - // It'd be nice to have something like pattern matching for downcasting from Any - // I'm not aware of anything like that, which leads to this ... ugliness - #[allow(clippy::if_same_then_else)] - if let Some(list_array) = target - .as_mut_any() - .downcast_mut::>>() - { - deserialize_list_into(list_array, rows); - } else if let Some(fixed_size_list_array) = target - .as_mut_any() - .downcast_mut::>>() - { - deserialize_fixed_size_list_into(fixed_size_list_array, rows); - } else if try_generic_deserialize_into::<_, MutableBooleanArray>( - target, - rows, - deserialize_boolean_into, - ) { - } else if try_deserialize_into::<_, f32>(target, rows, deserialize_float_into) { - } else if try_deserialize_into::<_, f64>(target, rows, deserialize_float_into) { - } else if try_deserialize_into::<_, i8>(target, rows, deserialize_int_into) { - } else if try_deserialize_into::<_, i16>(target, rows, deserialize_int_into) { - } else if try_deserialize_into::<_, i32>(target, rows, deserialize_int_into) { - } else if try_deserialize_into::<_, i64>(target, rows, deserialize_int_into) { - } else if try_deserialize_into::<_, u8>(target, rows, deserialize_int_into) { - } else if try_deserialize_into::<_, u16>(target, rows, deserialize_int_into) { - } else if try_deserialize_into::<_, u32>(target, rows, deserialize_int_into) { - } else if try_deserialize_into::<_, u64>(target, rows, deserialize_int_into) { - } else if try_generic_deserialize_into::<_, MutableUtf8Array>( - target, - rows, - deserialize_utf8_into, - ) { - } else if try_generic_deserialize_into::<_, MutableUtf8Array>( - target, - rows, - deserialize_utf8_into, - ) { - } else { - todo!(); + match target.data_type() { + DataType::Boolean => generic_deserialize_into(target, rows, deserialize_boolean_into), + DataType::Float32 => { + deserialize_primitive_into::<_, f32>(target, rows, deserialize_float_into) + } + DataType::Float64 => { + deserialize_primitive_into::<_, f64>(target, rows, deserialize_float_into) + } + DataType::Int8 => deserialize_primitive_into::<_, i8>(target, rows, deserialize_int_into), + DataType::Int16 => deserialize_primitive_into::<_, i16>(target, rows, deserialize_int_into), + DataType::Int32 => deserialize_primitive_into::<_, i32>(target, rows, deserialize_int_into), + DataType::Int64 => deserialize_primitive_into::<_, i64>(target, rows, deserialize_int_into), + DataType::UInt8 => deserialize_primitive_into::<_, u8>(target, rows, deserialize_int_into), + DataType::UInt16 => { + deserialize_primitive_into::<_, u16>(target, rows, deserialize_int_into) + } + DataType::UInt32 => { + deserialize_primitive_into::<_, u32>(target, rows, deserialize_int_into) + } + DataType::UInt64 => { + deserialize_primitive_into::<_, u64>(target, rows, deserialize_int_into) + } + DataType::Utf8 => generic_deserialize_into::<_, MutableUtf8Array>( + target, + rows, + deserialize_utf8_into, + ), + DataType::LargeUtf8 => generic_deserialize_into::<_, MutableUtf8Array>( + target, + rows, + deserialize_utf8_into, + ), + DataType::FixedSizeList(_, _) => { + generic_deserialize_into(target, rows, deserialize_fixed_size_list_into) + } + DataType::List(_) => deserialize_list_into( + target + .as_mut_any() + .downcast_mut::>>() + .unwrap(), + rows, + ), + _ => { + todo!() + } } } From 06757dba874a2479ef2b7bb02d628072cbcedf9a Mon Sep 17 00:00:00 2001 From: Frank Murphy Date: Thu, 27 Oct 2022 17:41:47 -0400 Subject: [PATCH 21/21] Solve remaining test mysteries --- tests/it/io/json/read.rs | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/tests/it/io/json/read.rs b/tests/it/io/json/read.rs index 3fb9960a299..f2fb8570b8c 100644 --- a/tests/it/io/json/read.rs +++ b/tests/it/io/json/read.rs @@ -73,12 +73,12 @@ fn read_json_records() -> Result<()> { let a_iter = a_iter.into_iter().map(Some); let a_inner = MutableListArray::>::new_with_field( MutablePrimitiveArray::::new(), - "inner", - false, + "item", + true, ); let mut a_outer = MutableListArray::>>::new_with_field( - a_inner, "a", false, + a_inner, "item", true, ); a_outer.try_extend(a_iter).unwrap(); let a_expected: ListArray = a_outer.into(); @@ -90,8 +90,8 @@ fn read_json_records() -> Result<()> { let b_iter = b_iter.into_iter().map(Some); let mut b = MutableListArray::>::new_with_field( MutablePrimitiveArray::::new(), - "b", - false, + "item", + true, ); b.try_extend(b_iter).unwrap(); let b_expected: ListArray = b.into(); @@ -110,8 +110,7 @@ fn read_json_records() -> Result<()> { panic!("unexpected field found: {}", f.name); }; - // No idea why assert_eq! doesn't work here, but this does. - assert_eq!(format!("{:?}", expected), format!("{:?}", actual)); + assert_eq!(expected.to_boxed().as_ref(), actual); } Ok(()) @@ -141,7 +140,7 @@ fn read_json_fixed_size_records() -> Result<()> { let mut a = MutableFixedSizeListArray::>::new_with_field( MutablePrimitiveArray::::new(), "inner", - false, + true, 4, ); a.try_extend(a_iter).unwrap(); @@ -159,8 +158,7 @@ fn read_json_fixed_size_records() -> Result<()> { panic!("unexpected field found: {}", f.name); }; - // No idea why assert_eq! doesn't work here, but this does. - assert_eq!(format!("{:?}", expected), format!("{:?}", actual)); + assert_eq!(expected.to_boxed().as_ref(), actual); } Ok(())