Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Add UnionArray #283

Merged
merged 10 commits into from
Aug 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ venv/bin/python parquet_integration/write_parquet.py
* `MutableArray` API to work in-memory in-place.
* faster IPC reader (different design that avoids an extra copy of all data)
* IPC supports 2.0 (compression)
* FFI support for dictionary-encoded arrays
* FFI support for dictionary-encoded arrays and union array
* All implemented arrow types pass IPC integration tests against other implementations

### Parquet

Expand All @@ -81,7 +82,7 @@ venv/bin/python parquet_integration/write_parquet.py
## Features in the original not available in this crate

* Parquet read and write of struct and nested lists.
* Union and Map types
* Map types

## Features in this crate not in pyarrow

Expand All @@ -90,7 +91,7 @@ venv/bin/python parquet_integration/write_parquet.py

## Features in pyarrow not in this crate

Too many to enumerate; e.g. nested dictionary arrays, union, map, nested parquet.
Too many to enumerate; e.g. nested dictionary arrays, map, nested parquet.

## How to develop

Expand Down
2 changes: 2 additions & 0 deletions arrow-flight/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use arrow2::{
datatypes::*,
error::{ArrowError, Result},
io::ipc,
io::ipc::gen::Schema::MetadataVersion,
io::ipc::read::read_record_batch,
io::ipc::write,
io::ipc::write::common::{encoded_batch, DictionaryTracker, EncodedData, IpcWriteOptions},
Expand Down Expand Up @@ -168,6 +169,7 @@ pub fn flight_data_to_arrow_batch(
None,
is_little_endian,
&dictionaries_by_field,
MetadataVersion::V5,
&mut reader,
0,
)
Expand Down
35 changes: 35 additions & 0 deletions arrow-pyarrow-integration-testing/tests/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,38 @@ def test_dict(self):
b.validate(full=True)
assert a.to_pylist() == b.to_pylist()
assert a.type == b.type

def test_sparse_union(self):
"""
Python -> Rust -> Python
"""
a = pyarrow.UnionArray.from_sparse(
pyarrow.array([0, 1, 1, 0, 1], pyarrow.int8()),
[
pyarrow.array(["a", "", "", "", "c"], pyarrow.utf8()),
pyarrow.array([0, 1, 2, None, 0], pyarrow.int64()),
],
)
b = arrow_pyarrow_integration_testing.round_trip(a)

b.validate(full=True)
assert a.to_pylist() == b.to_pylist()
assert a.type == b.type

def test_dense_union(self):
"""
Python -> Rust -> Python
"""
a = pyarrow.UnionArray.from_dense(
pyarrow.array([0, 1, 1, 0, 1], pyarrow.int8()),
pyarrow.array([0, 1, 2, 3, 4], type=pyarrow.int32()),
[
pyarrow.array(["a", "", "", "", "c"], pyarrow.utf8()),
pyarrow.array([0, 1, 2, None, 0], pyarrow.int64()),
],
)
b = arrow_pyarrow_integration_testing.round_trip(a)

b.validate(full=True)
assert a.to_pylist() == b.to_pylist()
assert a.type == b.type
1 change: 1 addition & 0 deletions guide/src/high_level.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ There is a many-to-one relationship between `DataType` and an Array (i.e. a phys
| `FixedSizeBinary(_)` | `FixedSizeBinaryArray` |
| `FixedSizeList(_,_)` | `FixedSizeListArray` |
| `Struct(_)` | `StructArray` |
| `Union(_,_,_)` | `UnionArray` |
| `Dictionary(UInt8,_)` | `DictionaryArray<u8>` |
| `Dictionary(UInt16,_)`| `DictionaryArray<u16>` |
| `Dictionary(UInt32,_)`| `DictionaryArray<u32>` |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use arrow2::{
datatypes::*,
io::ipc,
io::ipc::gen::Message::{Message, MessageHeader},
io::ipc::gen::Schema::MetadataVersion,
record_batch::RecordBatch,
};
use arrow_flight::flight_descriptor::*;
Expand Down Expand Up @@ -295,6 +296,7 @@ async fn record_batch_from_message(
None,
true,
&dictionaries_by_field,
MetadataVersion::V5,
&mut reader,
0,
);
Expand Down
13 changes: 12 additions & 1 deletion src/array/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,18 @@ pub fn get_value_display<'a>(array: &'a dyn Array) -> Box<dyn Fn(usize) -> Strin
string
})
}
Union(_) => todo!(),
Union(_, _, _) => {
let array = array.as_any().downcast_ref::<UnionArray>().unwrap();
let displays = array
.fields()
.iter()
.map(|x| get_display(x.as_ref()))
.collect::<Vec<_>>();
Box::new(move |row: usize| {
let (field, index) = array.index(row);
displays[field](index)
})
}
}
}

Expand Down
26 changes: 19 additions & 7 deletions src/array/equal/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
use std::unimplemented;

use crate::{
datatypes::{DataType, IntervalUnit},
types::{days_ms, NativeType},
};

use super::{
primitive::PrimitiveArray, Array, BinaryArray, BooleanArray, DictionaryArray, DictionaryKey,
FixedSizeBinaryArray, FixedSizeListArray, ListArray, NullArray, Offset, StructArray, Utf8Array,
};
use super::*;

mod binary;
mod boolean;
Expand All @@ -19,6 +14,7 @@ mod list;
mod null;
mod primitive;
mod struct_;
mod union;
mod utf8;

impl PartialEq for dyn Array {
Expand Down Expand Up @@ -147,6 +143,18 @@ impl<K: DictionaryKey> PartialEq<&dyn Array> for DictionaryArray<K> {
}
}

impl PartialEq<UnionArray> for UnionArray {
fn eq(&self, other: &Self) -> bool {
equal(self, other)
}
}

impl PartialEq<&dyn Array> for UnionArray {
fn eq(&self, other: &&dyn Array) -> bool {
equal(self, *other)
}
}

/// Logically compares two [`Array`]s.
/// Two arrays are logically equal if and only if:
/// * their data types are equal
Expand Down Expand Up @@ -323,7 +331,11 @@ pub fn equal(lhs: &dyn Array, rhs: &dyn Array) -> bool {
let rhs = rhs.as_any().downcast_ref().unwrap();
fixed_size_list::equal(lhs, rhs)
}
DataType::Union(_) => unimplemented!(),
DataType::Union(_, _, _) => {
let lhs = lhs.as_any().downcast_ref().unwrap();
let rhs = rhs.as_any().downcast_ref().unwrap();
union::equal(lhs, rhs)
}
}
}

Expand Down
5 changes: 5 additions & 0 deletions src/array/equal/union.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
use crate::array::{Array, UnionArray};

pub(super) fn equal(lhs: &UnionArray, rhs: &UnionArray) -> bool {
lhs.data_type() == rhs.data_type() && lhs.len() == rhs.len() && lhs.iter().eq(rhs.iter())
}
2 changes: 1 addition & 1 deletion src/array/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ pub fn buffers_children_dictionary(array: &dyn Array) -> BuffersChildren {
DataType::LargeList(_) => ffi_dyn!(array, ListArray::<i64>),
DataType::FixedSizeList(_, _) => ffi_dyn!(array, FixedSizeListArray),
DataType::Struct(_) => ffi_dyn!(array, StructArray),
DataType::Union(_) => unimplemented!(),
DataType::Union(_, _, _) => ffi_dyn!(array, UnionArray),
DataType::Dictionary(key_type, _) => match key_type.as_ref() {
DataType::Int8 => ffi_dict_dyn!(array, DictionaryArray::<i8>),
DataType::Int16 => ffi_dict_dyn!(array, DictionaryArray::<i16>),
Expand Down
2 changes: 1 addition & 1 deletion src/array/growable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ pub fn make_growable<'a>(
))
}
DataType::FixedSizeList(_, _) => todo!(),
DataType::Union(_) => todo!(),
DataType::Union(_, _, _) => todo!(),
DataType::Dictionary(key, _) => match key.as_ref() {
DataType::UInt8 => dyn_dict_growable!(u8, arrays, use_validity, capacity),
DataType::UInt16 => dyn_dict_growable!(u16, arrays, use_validity, capacity),
Expand Down
15 changes: 11 additions & 4 deletions src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ pub trait Array: std::fmt::Debug + Send + Sync {
/// This is `O(1)`.
#[inline]
fn null_count(&self) -> usize {
if self.data_type() == &DataType::Null {
return self.len();
};
self.validity()
.as_ref()
.map(|x| x.null_count())
Expand Down Expand Up @@ -185,7 +188,7 @@ impl Display for dyn Array {
DataType::LargeList(_) => fmt_dyn!(self, ListArray::<i64>, f),
DataType::FixedSizeList(_, _) => fmt_dyn!(self, FixedSizeListArray, f),
DataType::Struct(_) => fmt_dyn!(self, StructArray, f),
DataType::Union(_) => unimplemented!(),
DataType::Union(_, _, _) => fmt_dyn!(self, UnionArray, f),
DataType::Dictionary(key_type, _) => match key_type.as_ref() {
DataType::Int8 => fmt_dyn!(self, DictionaryArray::<i8>, f),
DataType::Int16 => fmt_dyn!(self, DictionaryArray::<i16>, f),
Expand Down Expand Up @@ -239,7 +242,7 @@ pub fn new_empty_array(data_type: DataType) -> Box<dyn Array> {
DataType::LargeList(_) => Box::new(ListArray::<i64>::new_empty(data_type)),
DataType::FixedSizeList(_, _) => Box::new(FixedSizeListArray::new_empty(data_type)),
DataType::Struct(fields) => Box::new(StructArray::new_empty(&fields)),
DataType::Union(_) => unimplemented!(),
DataType::Union(_, _, _) => Box::new(UnionArray::new_empty(data_type)),
DataType::Dictionary(key_type, value_type) => match key_type.as_ref() {
DataType::Int8 => Box::new(DictionaryArray::<i8>::new_empty(*value_type)),
DataType::Int16 => Box::new(DictionaryArray::<i16>::new_empty(*value_type)),
Expand Down Expand Up @@ -293,7 +296,7 @@ pub fn new_null_array(data_type: DataType, length: usize) -> Box<dyn Array> {
DataType::LargeList(_) => Box::new(ListArray::<i64>::new_null(data_type, length)),
DataType::FixedSizeList(_, _) => Box::new(FixedSizeListArray::new_null(data_type, length)),
DataType::Struct(fields) => Box::new(StructArray::new_null(&fields, length)),
DataType::Union(_) => unimplemented!(),
DataType::Union(_, _, _) => Box::new(UnionArray::new_null(data_type, length)),
DataType::Dictionary(key_type, value_type) => match key_type.as_ref() {
DataType::Int8 => Box::new(DictionaryArray::<i8>::new_null(*value_type, length)),
DataType::Int16 => Box::new(DictionaryArray::<i16>::new_null(*value_type, length)),
Expand Down Expand Up @@ -354,7 +357,7 @@ pub fn clone(array: &dyn Array) -> Box<dyn Array> {
DataType::LargeList(_) => clone_dyn!(array, ListArray::<i64>),
DataType::FixedSizeList(_, _) => clone_dyn!(array, FixedSizeListArray),
DataType::Struct(_) => clone_dyn!(array, StructArray),
DataType::Union(_) => unimplemented!(),
DataType::Union(_, _, _) => clone_dyn!(array, UnionArray),
DataType::Dictionary(key_type, _) => match key_type.as_ref() {
DataType::Int8 => clone_dyn!(array, DictionaryArray::<i8>),
DataType::Int16 => clone_dyn!(array, DictionaryArray::<i16>),
Expand All @@ -380,6 +383,7 @@ mod null;
mod primitive;
mod specification;
mod struct_;
mod union;
mod utf8;

mod equal;
Expand All @@ -399,6 +403,7 @@ pub use null::NullArray;
pub use primitive::*;
pub use specification::Offset;
pub use struct_::StructArray;
pub use union::UnionArray;
pub use utf8::{MutableUtf8Array, Utf8Array, Utf8ValuesIter};

pub(crate) use self::ffi::buffers_children_dictionary;
Expand Down Expand Up @@ -498,6 +503,8 @@ mod tests {
DataType::Utf8,
DataType::Binary,
DataType::List(Box::new(Field::new("a", DataType::Binary, true))),
DataType::Union(vec![Field::new("a", DataType::Binary, true)], None, true),
DataType::Union(vec![Field::new("a", DataType::Binary, true)], None, false),
];
let a = datatypes.into_iter().all(|x| new_empty_array(x).len() == 0);
assert!(a);
Expand Down
58 changes: 58 additions & 0 deletions src/array/union/ffi.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use std::sync::Arc;

use crate::{array::FromFfi, error::Result, ffi};

use super::super::{ffi::ToFfi, Array};
use super::UnionArray;

unsafe impl ToFfi for UnionArray {
fn buffers(&self) -> Vec<Option<std::ptr::NonNull<u8>>> {
if let Some(offsets) = &self.offsets {
vec![
None,
std::ptr::NonNull::new(self.types.as_ptr() as *mut u8),
std::ptr::NonNull::new(offsets.as_ptr() as *mut u8),
]
} else {
vec![None, std::ptr::NonNull::new(self.types.as_ptr() as *mut u8)]
}
}

fn offset(&self) -> usize {
self.offset
}

fn children(&self) -> Vec<Arc<dyn Array>> {
self.fields.clone()
}
}

unsafe impl<A: ffi::ArrowArrayRef> FromFfi<A> for UnionArray {
fn try_from_ffi(array: A) -> Result<Self> {
let field = array.field()?;
let data_type = field.data_type().clone();
let fields = Self::get_fields(field.data_type());

let mut types = unsafe { array.buffer::<i8>(0) }?;
let offsets = if Self::is_sparse(&data_type) {
None
} else {
Some(unsafe { array.buffer::<i32>(1) }?)
};

let length = array.array().len();
let offset = array.array().offset();
let fields = (0..fields.len())
.map(|index| {
let child = array.child(index)?;
Ok(ffi::try_from(child)?.into())
})
.collect::<Result<Vec<Arc<dyn Array>>>>()?;

if offset > 0 {
types = types.slice(offset, length);
};

Ok(Self::from_data(data_type, types, fields, offsets))
}
}
55 changes: 55 additions & 0 deletions src/array/union/iterator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use super::{Array, UnionArray};
use crate::{scalar::Scalar, trusted_len::TrustedLen};

#[derive(Debug, Clone)]
pub struct UnionIter<'a> {
array: &'a UnionArray,
current: usize,
}

impl<'a> UnionIter<'a> {
pub fn new(array: &'a UnionArray) -> Self {
Self { array, current: 0 }
}
}

impl<'a> Iterator for UnionIter<'a> {
type Item = Box<dyn Scalar>;

fn next(&mut self) -> Option<Self::Item> {
if self.current == self.array.len() {
None
} else {
let old = self.current;
self.current += 1;
Some(self.array.value(old))
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
let len = self.array.len() - self.current;
(len, Some(len))
}
}

impl<'a> IntoIterator for &'a UnionArray {
type Item = Box<dyn Scalar>;
type IntoIter = UnionIter<'a>;

#[inline]
fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}

impl<'a> UnionArray {
/// constructs a new iterator
#[inline]
pub fn iter(&'a self) -> UnionIter<'a> {
UnionIter::new(self)
}
}

impl<'a> std::iter::ExactSizeIterator for UnionIter<'a> {}

unsafe impl<'a> TrustedLen for UnionIter<'a> {}
Loading