From 4b893b71ddb73bfe9e7a0d76fbf6fe4ca0e9a930 Mon Sep 17 00:00:00 2001 From: Jorge Leitao Date: Thu, 10 Mar 2022 21:28:00 +0100 Subject: [PATCH] Growable union (#902) --- src/array/growable/mod.rs | 13 +++- src/array/growable/union.rs | 111 +++++++++++++++++++++++++++++++ src/array/union/mod.rs | 4 +- tests/it/array/growable/mod.rs | 1 + tests/it/array/growable/union.rs | 72 ++++++++++++++++++++ tests/it/array/union.rs | 25 ++++++- 6 files changed, 220 insertions(+), 6 deletions(-) create mode 100644 src/array/growable/union.rs create mode 100644 tests/it/array/growable/union.rs diff --git a/src/array/growable/mod.rs b/src/array/growable/mod.rs index 03f5d2345d5..398c4297317 100644 --- a/src/array/growable/mod.rs +++ b/src/array/growable/mod.rs @@ -6,6 +6,8 @@ use crate::datatypes::*; mod binary; pub use binary::GrowableBinary; +mod union; +pub use union::GrowableUnion; mod boolean; pub use boolean::GrowableBoolean; mod fixed_binary; @@ -28,7 +30,7 @@ pub use dictionary::GrowableDictionary; mod utils; /// Describes a struct that can be extended from slices of other pre-existing [`Array`]s. -/// This is used in operations where a new array is built out of other arrays such +/// This is used in operations where a new array is built out of other arrays, such /// as filter and concatenation. pub trait Growable<'a> { /// Extends this [`Growable`] with elements from the bounded [`Array`] at index `index` from @@ -110,7 +112,14 @@ pub fn make_growable<'a>( use_validity, capacity ), - Union | Map => todo!(), + Union => { + let arrays = arrays + .iter() + .map(|array| array.as_any().downcast_ref().unwrap()) + .collect::>(); + Box::new(union::GrowableUnion::new(arrays, capacity)) + } + Map => todo!(), Dictionary(key_type) => { match_integer_type!(key_type, |$T| { let arrays = arrays diff --git a/src/array/growable/union.rs b/src/array/growable/union.rs new file mode 100644 index 00000000000..c880c05ccf7 --- /dev/null +++ b/src/array/growable/union.rs @@ -0,0 +1,111 @@ +use std::sync::Arc; + +use crate::array::{Array, UnionArray}; + +use super::{make_growable, Growable}; + +/// Concrete [`Growable`] for the [`UnionArray`]. +pub struct GrowableUnion<'a> { + arrays: Vec<&'a UnionArray>, + types: Vec, + offsets: Option>, + fields: Vec + 'a>>, +} + +impl<'a> GrowableUnion<'a> { + /// Creates a new [`GrowableUnion`] bound to `arrays` with a pre-allocated `capacity`. + /// # Panics + /// Panics iff + /// * `arrays` is empty. + /// * any of the arrays has a different + pub fn new(arrays: Vec<&'a UnionArray>, capacity: usize) -> Self { + let first = arrays[0].data_type(); + assert!(arrays.iter().all(|x| x.data_type() == first)); + + let has_offsets = arrays[0].offsets().is_some(); + + let fields = (0..arrays[0].fields().len()) + .map(|i| { + make_growable( + &arrays + .iter() + .map(|x| x.fields()[i].as_ref()) + .collect::>(), + false, + capacity, + ) + }) + .collect::>>(); + + Self { + arrays, + fields, + offsets: if has_offsets { + Some(Vec::with_capacity(capacity)) + } else { + None + }, + types: Vec::with_capacity(capacity), + } + } + + fn to(&mut self) -> UnionArray { + let types = std::mem::take(&mut self.types); + let fields = std::mem::take(&mut self.fields); + let offsets = std::mem::take(&mut self.offsets); + let fields = fields.into_iter().map(|mut x| x.as_arc()).collect(); + + UnionArray::new( + self.arrays[0].data_type().clone(), + types.into(), + fields, + offsets.map(|x| x.into()), + ) + } +} + +impl<'a> Growable<'a> for GrowableUnion<'a> { + fn extend(&mut self, index: usize, start: usize, len: usize) { + let array = self.arrays[index]; + + let types = &array.types()[start..start + len]; + self.types.extend(types); + if let Some(x) = self.offsets.as_mut() { + let offsets = &array.offsets().unwrap()[start..start + len]; + + x.extend(offsets); + // in a dense union, each slot has its own offset. We extend the fields accordingly. + for (&type_, &offset) in types.iter().zip(offsets.iter()) { + self.fields[type_ as usize].extend(index, offset as usize, 1); + } + } else { + // in a sparse union, every field has the same length => extend all fields equally + self.fields + .iter_mut() + .for_each(|field| field.extend(index, start, len)) + } + } + + fn extend_validity(&mut self, _additional: usize) {} + + fn as_arc(&mut self) -> Arc { + Arc::new(self.to()) + } + + fn as_box(&mut self) -> Box { + Box::new(self.to()) + } +} + +impl<'a> From> for UnionArray { + fn from(val: GrowableUnion<'a>) -> Self { + let fields = val.fields.into_iter().map(|mut x| x.as_arc()).collect(); + + UnionArray::new( + val.arrays[0].data_type().clone(), + val.types.into(), + fields, + val.offsets.map(|x| x.into()), + ) + } +} diff --git a/src/array/union/mod.rs b/src/array/union/mod.rs index 40825705ef8..d7b7ddd3c3b 100644 --- a/src/array/union/mod.rs +++ b/src/array/union/mod.rs @@ -228,8 +228,8 @@ impl UnionArray { } /// The optional offsets. - pub fn offsets(&self) -> &Option> { - &self.offsets + pub fn offsets(&self) -> Option<&Buffer> { + self.offsets.as_ref() } /// The fields. diff --git a/tests/it/array/growable/mod.rs b/tests/it/array/growable/mod.rs index fcefae2c5a2..578f34485cf 100644 --- a/tests/it/array/growable/mod.rs +++ b/tests/it/array/growable/mod.rs @@ -7,6 +7,7 @@ mod list; mod null; mod primitive; mod struct_; +mod union; mod utf8; /* diff --git a/tests/it/array/growable/union.rs b/tests/it/array/growable/union.rs new file mode 100644 index 00000000000..e670bc1a750 --- /dev/null +++ b/tests/it/array/growable/union.rs @@ -0,0 +1,72 @@ +use std::sync::Arc; + +use arrow2::{ + array::{ + growable::{Growable, GrowableUnion}, + *, + }, + datatypes::*, + error::Result, +}; + +#[test] +fn sparse() -> Result<()> { + let fields = vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Utf8, true), + ]; + let data_type = DataType::Union(fields, None, UnionMode::Sparse); + let types = vec![0, 0, 1].into(); + let fields = vec![ + Arc::new(Int32Array::from(&[Some(1), None, Some(2)])) as Arc, + Arc::new(Utf8Array::::from(&[Some("a"), Some("b"), Some("c")])) as Arc, + ]; + let array = UnionArray::from_data(data_type, types, fields, None); + + for length in 1..2 { + for index in 0..(array.len() - length + 1) { + let mut a = GrowableUnion::new(vec![&array], 10); + + a.extend(0, index, length); + let expected = array.slice(index, length); + + let result: UnionArray = a.into(); + + assert_eq!(expected, result); + } + } + + Ok(()) +} + +#[test] +fn dense() -> Result<()> { + let fields = vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Utf8, true), + ]; + let data_type = DataType::Union(fields, None, UnionMode::Dense); + let types = vec![0, 0, 1].into(); + let fields = vec![ + Arc::new(Int32Array::from(&[Some(1), None, Some(2)])) as Arc, + Arc::new(Utf8Array::::from(&[Some("c")])) as Arc, + ]; + let offsets = Some(vec![0, 1, 0].into()); + + let array = UnionArray::from_data(data_type, types, fields, offsets); + + for length in 1..2 { + for index in 0..(array.len() - length + 1) { + let mut a = GrowableUnion::new(vec![&array], 10); + + a.extend(0, index, length); + let expected = array.slice(index, length); + + let result: UnionArray = a.into(); + + assert_eq!(expected, result); + } + } + + Ok(()) +} diff --git a/tests/it/array/union.rs b/tests/it/array/union.rs index 4066c0ad9b1..9045c560681 100644 --- a/tests/it/array/union.rs +++ b/tests/it/array/union.rs @@ -9,13 +9,13 @@ use arrow2::{ }; #[test] -fn debug() -> Result<()> { +fn sparse_debug() -> Result<()> { let fields = vec![ Field::new("a", DataType::Int32, true), Field::new("b", DataType::Utf8, true), ]; let data_type = DataType::Union(fields, None, UnionMode::Sparse); - let types = Buffer::from_slice([0, 0, 1]); + let types = vec![0, 0, 1].into(); let fields = vec![ Arc::new(Int32Array::from(&[Some(1), None, Some(2)])) as Arc, Arc::new(Utf8Array::::from(&[Some("a"), Some("b"), Some("c")])) as Arc, @@ -28,6 +28,27 @@ fn debug() -> Result<()> { Ok(()) } +#[test] +fn dense_debug() -> Result<()> { + let fields = vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Utf8, true), + ]; + let data_type = DataType::Union(fields, None, UnionMode::Dense); + let types = vec![0, 0, 1].into(); + let fields = vec![ + Arc::new(Int32Array::from(&[Some(1), None, Some(2)])) as Arc, + Arc::new(Utf8Array::::from(&[Some("c")])) as Arc, + ]; + let offsets = Some(vec![0, 1, 0].into()); + + let array = UnionArray::from_data(data_type, types, fields, offsets); + + assert_eq!(format!("{:?}", array), "UnionArray[1, None, c]"); + + Ok(()) +} + #[test] fn slice() -> Result<()> { let fields = vec![