Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added support for Extension #350

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ venv/bin/python parquet_integration/write_parquet.py
* `MutableArray` API to work in-memory in-place.
* faster IPC reader (different design that avoids an extra copy of all data)
* IPC supports 2.0 (compression)
* Extension type supported
* All implemented arrow types pass FFI integration tests against pyarrow / C++
* All implemented arrow types pass IPC integration tests against other implementations

Expand All @@ -83,7 +84,7 @@ venv/bin/python parquet_integration/write_parquet.py
## Features in the original not available in this crate

* Parquet read and write of struct and nested lists.
* Map types
* Map type

## Features in this crate not in pyarrow

Expand Down
51 changes: 51 additions & 0 deletions examples/extension.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use std::io::{Cursor, Seek, Write};
use std::sync::Arc;

use arrow2::array::*;
use arrow2::datatypes::*;
use arrow2::error::Result;
use arrow2::io::ipc::read;
use arrow2::io::ipc::write;
use arrow2::record_batch::RecordBatch;

fn write_ipc<W: Write + Seek>(writer: &mut W, array: impl Array + 'static) -> Result<()> {
// create a batch
let schema = Schema::new(vec![Field::new("a", array.data_type().clone(), false)]);

let mut writer = write::FileWriter::try_new(writer, &schema)?;

let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)])?;

writer.write(&batch)
}

fn read_ipc(reader: &[u8]) -> Result<RecordBatch> {
let mut reader = Cursor::new(reader);
let metadata = read::read_file_metadata(&mut reader)?;
let mut reader = read::FileReader::new(&mut reader, metadata, None);
reader.next().unwrap()
}

fn main() -> Result<()> {
let array = UInt16Array::from_slice([1, 2]);
let extension_type =
DataType::Extension("date16".to_string(), Box::new(DataType::UInt16), None);
let extension_array = ExtensionArray::from_data(extension_type.clone(), Arc::new(array));

// from here on, it is as usual
let mut buffer = Cursor::new(vec![]);

// write to IPC
write_ipc(&mut buffer, extension_array)?;

// read it back
let batch = read_ipc(&buffer.into_inner())?;

// and verify that the datatype is preserved.
let array = &batch.columns()[0];
assert_eq!(array.data_type(), &extension_type);

// see https://arrow.apache.org/docs/format/Columnar.html#extension-types
// for consuming by other consumers.
Ok(())
}
1 change: 1 addition & 0 deletions guide/src/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- [Compute](./compute.md)
- [Metadata](./metadata.md)
- [Foreign interfaces](./ffi.md)
- [Extension](./extension.md)
- [IO](./io/README.md)
- [Read CSV](./io/csv_reader.md)
- [Write CSV](./io/csv_write.md)
Expand Down
9 changes: 9 additions & 0 deletions guide/src/extension.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Extension types

This crate supports Arrows' ["extension type"](https://arrow.apache.org/docs/format/Columnar.html#extension-types),
to declare, use, and share custom logical types. The follow example shows how
to declare one:

```rust
{{#include ../../../examples/extension.rs}}
```
4 changes: 4 additions & 0 deletions src/array/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,10 @@ pub fn get_value_display<'a>(array: &'a dyn Array) -> Box<dyn Fn(usize) -> Strin
displays[field](index)
})
}
Extension(_, _, _) => {
let array = array.as_any().downcast_ref::<ExtensionArray>().unwrap();
get_value_display(array.inner())
}
}
}

Expand Down
7 changes: 7 additions & 0 deletions src/array/equal/extension.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
use crate::array::ExtensionArray;

use super::equal as main_equal;

pub(super) fn equal(lhs: &ExtensionArray, rhs: &ExtensionArray) -> bool {
main_equal(lhs.inner(), rhs.inner())
}
18 changes: 18 additions & 0 deletions src/array/equal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use super::*;
mod binary;
mod boolean;
mod dictionary;
mod extension;
mod fixed_size_binary;
mod fixed_size_list;
mod list;
Expand Down Expand Up @@ -131,6 +132,18 @@ impl PartialEq<&dyn Array> for StructArray {
}
}

impl PartialEq<ExtensionArray> for ExtensionArray {
fn eq(&self, other: &Self) -> bool {
equal(self, other)
}
}

impl PartialEq<&dyn Array> for ExtensionArray {
fn eq(&self, other: &&dyn Array) -> bool {
equal(self, *other)
}
}

impl<K: DictionaryKey> PartialEq<DictionaryArray<K>> for DictionaryArray<K> {
fn eq(&self, other: &Self) -> bool {
equal(self, other)
Expand Down Expand Up @@ -300,5 +313,10 @@ pub fn equal(lhs: &dyn Array, rhs: &dyn Array) -> bool {
let rhs = rhs.as_any().downcast_ref().unwrap();
union::equal(lhs, rhs)
}
DataType::Extension(_, _, _) => {
let lhs = lhs.as_any().downcast_ref().unwrap();
let rhs = rhs.as_any().downcast_ref().unwrap();
extension::equal(lhs, rhs)
}
}
}
27 changes: 27 additions & 0 deletions src/array/extension/ffi.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use crate::{
array::{ffi as array_ffi, FromFfi, ToFfi},
datatypes::DataType,
error::Result,
ffi,
};

use super::ExtensionArray;

unsafe impl ToFfi for ExtensionArray {
fn buffers(&self) -> Vec<Option<std::ptr::NonNull<u8>>> {
array_ffi::buffers(self.inner.as_ref())
}

#[inline]
fn offset(&self) -> usize {
array_ffi::offset(self.inner.as_ref())
}
}

unsafe impl<A: ffi::ArrowArrayRef> FromFfi<A> for ExtensionArray {
fn try_from_ffi(array: A) -> Result<Self> {
let inner = ffi::try_from(array)?.into();
let data_type: DataType = todo!(); // todo: DataType from fields' metadata
Ok(Self::from_data(data_type, inner))
}
}
70 changes: 70 additions & 0 deletions src/array/extension/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use std::sync::Arc;

use crate::{
array::{new_empty_array, new_null_array, Array},
bitmap::Bitmap,
datatypes::DataType,
};

mod ffi;

#[derive(Debug, Clone)]
pub struct ExtensionArray {
data_type: DataType,
inner: Arc<dyn Array>,
}

impl ExtensionArray {
pub fn from_data(data_type: DataType, inner: Arc<dyn Array>) -> Self {
if let DataType::Extension(_, inner_data_type, _) = &data_type {
assert_eq!(inner_data_type.as_ref(), inner.data_type())
} else {
panic!("Extension array requires DataType::Extension")
}
Self { data_type, inner }
}

pub fn new_empty(data_type: DataType) -> Self {
if let DataType::Extension(_, inner_data_type, _) = &data_type {
let inner = new_empty_array(inner_data_type.as_ref().clone()).into();
Self::from_data(data_type, inner)
} else {
panic!("Extension array requires DataType::Extension")
}
}

pub fn new_null(data_type: DataType, length: usize) -> Self {
if let DataType::Extension(_, inner_data_type, _) = &data_type {
let inner = new_null_array(inner_data_type.as_ref().clone(), length).into();
Self::from_data(data_type, inner)
} else {
panic!("Extension array requires DataType::Extension")
}
}

pub fn inner(&self) -> &dyn Array {
self.inner.as_ref()
}
}

impl Array for ExtensionArray {
fn as_any(&self) -> &dyn std::any::Any {
self
}

fn len(&self) -> usize {
self.inner.len()
}

fn data_type(&self) -> &DataType {
&self.data_type
}

fn validity(&self) -> &Option<Bitmap> {
self.inner.validity()
}

fn slice(&self, offset: usize, length: usize) -> Box<dyn Array> {
Array::slice(self.inner.as_ref(), offset, length)
}
}
Loading