Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Add UnionArray (#283)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Aug 14, 2021
1 parent 8237800 commit 959d549
Show file tree
Hide file tree
Showing 38 changed files with 886 additions and 57 deletions.
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ venv/bin/python parquet_integration/write_parquet.py
* `MutableArray` API to work in-memory in-place.
* faster IPC reader (different design that avoids an extra copy of all data)
* IPC supports 2.0 (compression)
* FFI support for dictionary-encoded arrays
* FFI support for dictionary-encoded arrays and union array
* All implemented arrow types pass IPC integration tests against other implementations

### Parquet

Expand All @@ -81,7 +82,7 @@ venv/bin/python parquet_integration/write_parquet.py
## Features in the original not available in this crate

* Parquet read and write of struct and nested lists.
* Union and Map types
* Map types

## Features in this crate not in pyarrow

Expand All @@ -90,7 +91,7 @@ venv/bin/python parquet_integration/write_parquet.py

## Features in pyarrow not in this crate

Too many to enumerate; e.g. nested dictionary arrays, union, map, nested parquet.
Too many to enumerate; e.g. nested dictionary arrays, map, nested parquet.

## How to develop

Expand Down
2 changes: 2 additions & 0 deletions arrow-flight/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use arrow2::{
datatypes::*,
error::{ArrowError, Result},
io::ipc,
io::ipc::gen::Schema::MetadataVersion,
io::ipc::read::read_record_batch,
io::ipc::write,
io::ipc::write::common::{encoded_batch, DictionaryTracker, EncodedData, IpcWriteOptions},
Expand Down Expand Up @@ -168,6 +169,7 @@ pub fn flight_data_to_arrow_batch(
None,
is_little_endian,
&dictionaries_by_field,
MetadataVersion::V5,
&mut reader,
0,
)
Expand Down
35 changes: 35 additions & 0 deletions arrow-pyarrow-integration-testing/tests/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,38 @@ def test_dict(self):
b.validate(full=True)
assert a.to_pylist() == b.to_pylist()
assert a.type == b.type

def test_sparse_union(self):
"""
Python -> Rust -> Python
"""
a = pyarrow.UnionArray.from_sparse(
pyarrow.array([0, 1, 1, 0, 1], pyarrow.int8()),
[
pyarrow.array(["a", "", "", "", "c"], pyarrow.utf8()),
pyarrow.array([0, 1, 2, None, 0], pyarrow.int64()),
],
)
b = arrow_pyarrow_integration_testing.round_trip(a)

b.validate(full=True)
assert a.to_pylist() == b.to_pylist()
assert a.type == b.type

def test_dense_union(self):
"""
Python -> Rust -> Python
"""
a = pyarrow.UnionArray.from_dense(
pyarrow.array([0, 1, 1, 0, 1], pyarrow.int8()),
pyarrow.array([0, 1, 2, 3, 4], type=pyarrow.int32()),
[
pyarrow.array(["a", "", "", "", "c"], pyarrow.utf8()),
pyarrow.array([0, 1, 2, None, 0], pyarrow.int64()),
],
)
b = arrow_pyarrow_integration_testing.round_trip(a)

b.validate(full=True)
assert a.to_pylist() == b.to_pylist()
assert a.type == b.type
1 change: 1 addition & 0 deletions guide/src/high_level.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ There is a many-to-one relationship between `DataType` and an Array (i.e. a phys
| `FixedSizeBinary(_)` | `FixedSizeBinaryArray` |
| `FixedSizeList(_,_)` | `FixedSizeListArray` |
| `Struct(_)` | `StructArray` |
| `Union(_,_,_)` | `UnionArray` |
| `Dictionary(UInt8,_)` | `DictionaryArray<u8>` |
| `Dictionary(UInt16,_)`| `DictionaryArray<u16>` |
| `Dictionary(UInt32,_)`| `DictionaryArray<u32>` |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use arrow2::{
datatypes::*,
io::ipc,
io::ipc::gen::Message::{Message, MessageHeader},
io::ipc::gen::Schema::MetadataVersion,
record_batch::RecordBatch,
};
use arrow_flight::flight_descriptor::*;
Expand Down Expand Up @@ -295,6 +296,7 @@ async fn record_batch_from_message(
None,
true,
&dictionaries_by_field,
MetadataVersion::V5,
&mut reader,
0,
);
Expand Down
13 changes: 12 additions & 1 deletion src/array/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,18 @@ pub fn get_value_display<'a>(array: &'a dyn Array) -> Box<dyn Fn(usize) -> Strin
string
})
}
Union(_) => todo!(),
Union(_, _, _) => {
let array = array.as_any().downcast_ref::<UnionArray>().unwrap();
let displays = array
.fields()
.iter()
.map(|x| get_display(x.as_ref()))
.collect::<Vec<_>>();
Box::new(move |row: usize| {
let (field, index) = array.index(row);
displays[field](index)
})
}
}
}

Expand Down
26 changes: 19 additions & 7 deletions src/array/equal/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
use std::unimplemented;

use crate::{
datatypes::{DataType, IntervalUnit},
types::{days_ms, NativeType},
};

use super::{
primitive::PrimitiveArray, Array, BinaryArray, BooleanArray, DictionaryArray, DictionaryKey,
FixedSizeBinaryArray, FixedSizeListArray, ListArray, NullArray, Offset, StructArray, Utf8Array,
};
use super::*;

mod binary;
mod boolean;
Expand All @@ -19,6 +14,7 @@ mod list;
mod null;
mod primitive;
mod struct_;
mod union;
mod utf8;

impl PartialEq for dyn Array {
Expand Down Expand Up @@ -147,6 +143,18 @@ impl<K: DictionaryKey> PartialEq<&dyn Array> for DictionaryArray<K> {
}
}

impl PartialEq<UnionArray> for UnionArray {
fn eq(&self, other: &Self) -> bool {
equal(self, other)
}
}

impl PartialEq<&dyn Array> for UnionArray {
fn eq(&self, other: &&dyn Array) -> bool {
equal(self, *other)
}
}

/// Logically compares two [`Array`]s.
/// Two arrays are logically equal if and only if:
/// * their data types are equal
Expand Down Expand Up @@ -323,7 +331,11 @@ pub fn equal(lhs: &dyn Array, rhs: &dyn Array) -> bool {
let rhs = rhs.as_any().downcast_ref().unwrap();
fixed_size_list::equal(lhs, rhs)
}
DataType::Union(_) => unimplemented!(),
DataType::Union(_, _, _) => {
let lhs = lhs.as_any().downcast_ref().unwrap();
let rhs = rhs.as_any().downcast_ref().unwrap();
union::equal(lhs, rhs)
}
}
}

Expand Down
5 changes: 5 additions & 0 deletions src/array/equal/union.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
use crate::array::{Array, UnionArray};

pub(super) fn equal(lhs: &UnionArray, rhs: &UnionArray) -> bool {
lhs.data_type() == rhs.data_type() && lhs.len() == rhs.len() && lhs.iter().eq(rhs.iter())
}
2 changes: 1 addition & 1 deletion src/array/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ pub fn buffers_children_dictionary(array: &dyn Array) -> BuffersChildren {
DataType::LargeList(_) => ffi_dyn!(array, ListArray::<i64>),
DataType::FixedSizeList(_, _) => ffi_dyn!(array, FixedSizeListArray),
DataType::Struct(_) => ffi_dyn!(array, StructArray),
DataType::Union(_) => unimplemented!(),
DataType::Union(_, _, _) => ffi_dyn!(array, UnionArray),
DataType::Dictionary(key_type, _) => match key_type.as_ref() {
DataType::Int8 => ffi_dict_dyn!(array, DictionaryArray::<i8>),
DataType::Int16 => ffi_dict_dyn!(array, DictionaryArray::<i16>),
Expand Down
2 changes: 1 addition & 1 deletion src/array/growable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ pub fn make_growable<'a>(
))
}
DataType::FixedSizeList(_, _) => todo!(),
DataType::Union(_) => todo!(),
DataType::Union(_, _, _) => todo!(),
DataType::Dictionary(key, _) => match key.as_ref() {
DataType::UInt8 => dyn_dict_growable!(u8, arrays, use_validity, capacity),
DataType::UInt16 => dyn_dict_growable!(u16, arrays, use_validity, capacity),
Expand Down
15 changes: 11 additions & 4 deletions src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ pub trait Array: std::fmt::Debug + Send + Sync {
/// This is `O(1)`.
#[inline]
fn null_count(&self) -> usize {
if self.data_type() == &DataType::Null {
return self.len();
};
self.validity()
.as_ref()
.map(|x| x.null_count())
Expand Down Expand Up @@ -185,7 +188,7 @@ impl Display for dyn Array {
DataType::LargeList(_) => fmt_dyn!(self, ListArray::<i64>, f),
DataType::FixedSizeList(_, _) => fmt_dyn!(self, FixedSizeListArray, f),
DataType::Struct(_) => fmt_dyn!(self, StructArray, f),
DataType::Union(_) => unimplemented!(),
DataType::Union(_, _, _) => fmt_dyn!(self, UnionArray, f),
DataType::Dictionary(key_type, _) => match key_type.as_ref() {
DataType::Int8 => fmt_dyn!(self, DictionaryArray::<i8>, f),
DataType::Int16 => fmt_dyn!(self, DictionaryArray::<i16>, f),
Expand Down Expand Up @@ -239,7 +242,7 @@ pub fn new_empty_array(data_type: DataType) -> Box<dyn Array> {
DataType::LargeList(_) => Box::new(ListArray::<i64>::new_empty(data_type)),
DataType::FixedSizeList(_, _) => Box::new(FixedSizeListArray::new_empty(data_type)),
DataType::Struct(fields) => Box::new(StructArray::new_empty(&fields)),
DataType::Union(_) => unimplemented!(),
DataType::Union(_, _, _) => Box::new(UnionArray::new_empty(data_type)),
DataType::Dictionary(key_type, value_type) => match key_type.as_ref() {
DataType::Int8 => Box::new(DictionaryArray::<i8>::new_empty(*value_type)),
DataType::Int16 => Box::new(DictionaryArray::<i16>::new_empty(*value_type)),
Expand Down Expand Up @@ -293,7 +296,7 @@ pub fn new_null_array(data_type: DataType, length: usize) -> Box<dyn Array> {
DataType::LargeList(_) => Box::new(ListArray::<i64>::new_null(data_type, length)),
DataType::FixedSizeList(_, _) => Box::new(FixedSizeListArray::new_null(data_type, length)),
DataType::Struct(fields) => Box::new(StructArray::new_null(&fields, length)),
DataType::Union(_) => unimplemented!(),
DataType::Union(_, _, _) => Box::new(UnionArray::new_null(data_type, length)),
DataType::Dictionary(key_type, value_type) => match key_type.as_ref() {
DataType::Int8 => Box::new(DictionaryArray::<i8>::new_null(*value_type, length)),
DataType::Int16 => Box::new(DictionaryArray::<i16>::new_null(*value_type, length)),
Expand Down Expand Up @@ -354,7 +357,7 @@ pub fn clone(array: &dyn Array) -> Box<dyn Array> {
DataType::LargeList(_) => clone_dyn!(array, ListArray::<i64>),
DataType::FixedSizeList(_, _) => clone_dyn!(array, FixedSizeListArray),
DataType::Struct(_) => clone_dyn!(array, StructArray),
DataType::Union(_) => unimplemented!(),
DataType::Union(_, _, _) => clone_dyn!(array, UnionArray),
DataType::Dictionary(key_type, _) => match key_type.as_ref() {
DataType::Int8 => clone_dyn!(array, DictionaryArray::<i8>),
DataType::Int16 => clone_dyn!(array, DictionaryArray::<i16>),
Expand All @@ -380,6 +383,7 @@ mod null;
mod primitive;
mod specification;
mod struct_;
mod union;
mod utf8;

mod equal;
Expand All @@ -399,6 +403,7 @@ pub use null::NullArray;
pub use primitive::*;
pub use specification::Offset;
pub use struct_::StructArray;
pub use union::UnionArray;
pub use utf8::{MutableUtf8Array, Utf8Array, Utf8ValuesIter};

pub(crate) use self::ffi::buffers_children_dictionary;
Expand Down Expand Up @@ -498,6 +503,8 @@ mod tests {
DataType::Utf8,
DataType::Binary,
DataType::List(Box::new(Field::new("a", DataType::Binary, true))),
DataType::Union(vec![Field::new("a", DataType::Binary, true)], None, true),
DataType::Union(vec![Field::new("a", DataType::Binary, true)], None, false),
];
let a = datatypes.into_iter().all(|x| new_empty_array(x).len() == 0);
assert!(a);
Expand Down
58 changes: 58 additions & 0 deletions src/array/union/ffi.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use std::sync::Arc;

use crate::{array::FromFfi, error::Result, ffi};

use super::super::{ffi::ToFfi, Array};
use super::UnionArray;

unsafe impl ToFfi for UnionArray {
fn buffers(&self) -> Vec<Option<std::ptr::NonNull<u8>>> {
if let Some(offsets) = &self.offsets {
vec![
None,
std::ptr::NonNull::new(self.types.as_ptr() as *mut u8),
std::ptr::NonNull::new(offsets.as_ptr() as *mut u8),
]
} else {
vec![None, std::ptr::NonNull::new(self.types.as_ptr() as *mut u8)]
}
}

fn offset(&self) -> usize {
self.offset
}

fn children(&self) -> Vec<Arc<dyn Array>> {
self.fields.clone()
}
}

unsafe impl<A: ffi::ArrowArrayRef> FromFfi<A> for UnionArray {
fn try_from_ffi(array: A) -> Result<Self> {
let field = array.field()?;
let data_type = field.data_type().clone();
let fields = Self::get_fields(field.data_type());

let mut types = unsafe { array.buffer::<i8>(0) }?;
let offsets = if Self::is_sparse(&data_type) {
None
} else {
Some(unsafe { array.buffer::<i32>(1) }?)
};

let length = array.array().len();
let offset = array.array().offset();
let fields = (0..fields.len())
.map(|index| {
let child = array.child(index)?;
Ok(ffi::try_from(child)?.into())
})
.collect::<Result<Vec<Arc<dyn Array>>>>()?;

if offset > 0 {
types = types.slice(offset, length);
};

Ok(Self::from_data(data_type, types, fields, offsets))
}
}
55 changes: 55 additions & 0 deletions src/array/union/iterator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use super::{Array, UnionArray};
use crate::{scalar::Scalar, trusted_len::TrustedLen};

#[derive(Debug, Clone)]
pub struct UnionIter<'a> {
array: &'a UnionArray,
current: usize,
}

impl<'a> UnionIter<'a> {
pub fn new(array: &'a UnionArray) -> Self {
Self { array, current: 0 }
}
}

impl<'a> Iterator for UnionIter<'a> {
type Item = Box<dyn Scalar>;

fn next(&mut self) -> Option<Self::Item> {
if self.current == self.array.len() {
None
} else {
let old = self.current;
self.current += 1;
Some(self.array.value(old))
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
let len = self.array.len() - self.current;
(len, Some(len))
}
}

impl<'a> IntoIterator for &'a UnionArray {
type Item = Box<dyn Scalar>;
type IntoIter = UnionIter<'a>;

#[inline]
fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}

impl<'a> UnionArray {
/// constructs a new iterator
#[inline]
pub fn iter(&'a self) -> UnionIter<'a> {
UnionIter::new(self)
}
}

impl<'a> std::iter::ExactSizeIterator for UnionIter<'a> {}

unsafe impl<'a> TrustedLen for UnionIter<'a> {}
Loading

0 comments on commit 959d549

Please sign in to comment.