Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Merge branch 'jorgecarleitao:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
Dexter Duckworth authored Mar 2, 2022
2 parents 6b834c5 + eb4bc5d commit b55447e
Show file tree
Hide file tree
Showing 139 changed files with 4,868 additions and 3,746 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ target-tarpaulin
venv
lcov.info
Cargo.lock
example.arrow
fixtures
settings.json
dev/
Expand Down
15 changes: 8 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ simdutf8 = "0.1.3"

# for csv io
csv = { version = "^1.1", optional = true }
csv-core = { version = "0.1", optional = true }

# for csv async io
csv-async = { version = "^1.1", optional = true }
Expand Down Expand Up @@ -76,7 +77,7 @@ avro-schema = { version = "0.2", optional = true }
# compression of avro
libflate = { version = "1.1.1", optional = true }
snap = { version = "1", optional = true }
crc = { version = "1", optional = true }
crc = { version = "2", optional = true }
# async avro
async-stream = { version = "0.3.2", optional = true }

Expand Down Expand Up @@ -129,7 +130,7 @@ io_csv = ["io_csv_read", "io_csv_write"]
io_csv_async = ["io_csv_read_async"]
io_csv_read = ["csv", "lexical-core"]
io_csv_read_async = ["csv-async", "lexical-core", "futures"]
io_csv_write = ["csv", "streaming-iterator", "lexical-core"]
io_csv_write = ["csv", "csv-core", "streaming-iterator", "lexical-core"]
io_json = ["serde", "serde_json", "streaming-iterator", "fallible-streaming-iterator", "indexmap", "lexical-core"]
io_ipc = ["arrow-format"]
io_ipc_write_async = ["io_ipc", "futures"]
Expand Down Expand Up @@ -181,8 +182,7 @@ compute_substring = []
compute_take = []
compute_temporal = []
compute_window = ["compute_concatenate"]
compute_lower = []
compute_upper = []
compute_utf8 = []
compute = [
"compute_aggregate",
"compute_arithmetics",
Expand All @@ -207,12 +207,12 @@ compute = [
"compute_substring",
"compute_take",
"compute_temporal",
"compute_window",
"compute_lower",
"compute_upper"
"compute_utf8",
"compute_window"
]
benchmarks = ["rand"]
simd = ["packed_simd"]
serde_types = ["serde", "serde_derive"]

[package.metadata.cargo-all-features]
allowlist = ["compute", "compute_sort", "compute_hash", "compute_nullif"]
Expand Down Expand Up @@ -249,6 +249,7 @@ harness = false
name = "comparison_kernels"
harness = false


[[bench]]
name = "read_parquet"
harness = false
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ documentation of each of its APIs.
* Most feature-complete implementation of Apache Arrow after the reference implementation (C++)
* Float 16 unsupported (not a Rust native type)
* Decimal 256 unsupported (not a Rust native type)
* FFI supported for all Arrow types
* C data interface supported for all Arrow types (read and write)
* C stream interface supported for all Arrow types (read and write)
* Full interoperability with Rust's `Vec`
* MutableArray API to work with bitmaps and arrays in-place
* Full support for timestamps with timezones, including arithmetics that take
Expand Down
2 changes: 1 addition & 1 deletion arrow-pyarrow-integration-testing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Note that this crate uses two languages and an external ABI:

Pyarrow exposes a C ABI to convert arrow arrays from and to its C implementation, see [here](https://arrow.apache.org/docs/format/CDataInterface.html).

This package uses the equivalent struct in Rust (`arrow::array::ArrowArray`), and verifies that
This package uses the equivalent struct in Rust (`arrow::array::InternalArrowArray`), and verifies that
we can use pyarrow's interface to move pointers from and to Rust.

## Relevant literature
Expand Down
2 changes: 1 addition & 1 deletion arrow-pyarrow-integration-testing/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@
# under the License.

[build-system]
requires = ["maturin"]
requires = ["maturin>=0.12,<0.13"]
build-backend = "maturin"
63 changes: 63 additions & 0 deletions arrow-pyarrow-integration-testing/src/c_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
//! This library demonstrates a minimal usage of Rust's C data interface to pass
//! arrays from and to Python.
use pyo3::ffi::Py_uintptr_t;
use pyo3::prelude::*;

use arrow2::array::{Int32Array, StructArray};
use arrow2::datatypes::DataType;
use arrow2::ffi;

use super::*;

pub fn to_rust_iterator(ob: PyObject, py: Python) -> PyResult<Vec<PyObject>> {
let stream = Box::new(ffi::ArrowArrayStream::empty());

let stream_ptr = &*stream as *const ffi::ArrowArrayStream;

// make the conversion through PyArrow's private API
// this changes the pointer's memory and is thus unsafe. In particular, `_export_to_c` can go out of bounds
ob.call_method1(py, "_export_to_c", (stream_ptr as Py_uintptr_t,))?;

let mut iter =
unsafe { ffi::ArrowArrayStreamReader::try_new(stream).map_err(PyO3ArrowError::from) }?;

let mut arrays = vec![];
while let Some(array) = unsafe { iter.next() } {
let py_array = to_py_array(array.unwrap().into(), py)?;
arrays.push(py_array)
}
Ok(arrays)
}

pub fn from_rust_iterator(py: Python) -> PyResult<PyObject> {
// initialize an array
let array = Int32Array::from(&[Some(2), None, Some(1), None]);
let array = StructArray::from_data(
DataType::Struct(vec![Field::new("a", array.data_type().clone(), true)]),
vec![Arc::new(array)],
None,
);
// and a field with its datatype
let field = Field::new("a", array.data_type().clone(), true);

// Arc it, since it will be shared with an external program
let array: Arc<dyn Array> = Arc::new(array.clone());

// create an iterator of arrays
let arrays = vec![array.clone(), array.clone(), array];
let iter = Box::new(arrays.clone().into_iter().map(Ok)) as _;

// create an [`ArrowArrayStream`] based on this iterator and field
let mut stream = Box::new(ffi::ArrowArrayStream::empty());
unsafe { ffi::export_iterator(iter, field, &mut *stream) };

// call pyarrow's interface to read this stream
let pa = py.import("pyarrow.ipc")?;
let py_stream = pa.getattr("RecordBatchReader")?.call_method1(
"_import_from_c",
((&*stream as *const ffi::ArrowArrayStream) as Py_uintptr_t,),
)?;
Box::leak(stream); // this is not ideal => the struct should be allocated by pyarrow

Ok(py_stream.to_object(py))
}
34 changes: 24 additions & 10 deletions arrow-pyarrow-integration-testing/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! This library demonstrates a minimal usage of Rust's C data interface to pass
//! arrays from and to Python.
mod c_stream;

use std::error;
use std::fmt;
Expand Down Expand Up @@ -51,11 +52,11 @@ impl From<PyO3ArrowError> for PyErr {

fn to_rust_array(ob: PyObject, py: Python) -> PyResult<Arc<dyn Array>> {
// prepare a pointer to receive the Array struct
let array = Box::new(ffi::Ffi_ArrowArray::empty());
let schema = Box::new(ffi::Ffi_ArrowSchema::empty());
let array = Box::new(ffi::ArrowArray::empty());
let schema = Box::new(ffi::ArrowSchema::empty());

let array_ptr = &*array as *const ffi::Ffi_ArrowArray;
let schema_ptr = &*schema as *const ffi::Ffi_ArrowSchema;
let array_ptr = &*array as *const ffi::ArrowArray;
let schema_ptr = &*schema as *const ffi::ArrowSchema;

// make the conversion through PyArrow's private API
// this changes the pointer's memory and is thus unsafe. In particular, `_export_to_c` can go out of bounds
Expand All @@ -66,14 +67,15 @@ fn to_rust_array(ob: PyObject, py: Python) -> PyResult<Arc<dyn Array>> {
)?;

let field = unsafe { ffi::import_field_from_c(schema.as_ref()).map_err(PyO3ArrowError::from)? };
let array = unsafe { ffi::import_array_from_c(array, &field).map_err(PyO3ArrowError::from)? };
let array =
unsafe { ffi::import_array_from_c(array, field.data_type).map_err(PyO3ArrowError::from)? };

Ok(array.into())
}

fn to_py_array(array: Arc<dyn Array>, py: Python) -> PyResult<PyObject> {
let array_ptr = Box::new(ffi::Ffi_ArrowArray::empty());
let schema_ptr = Box::new(ffi::Ffi_ArrowSchema::empty());
let array_ptr = Box::new(ffi::ArrowArray::empty());
let schema_ptr = Box::new(ffi::ArrowSchema::empty());

let array_ptr = Box::into_raw(array_ptr);
let schema_ptr = Box::into_raw(schema_ptr);
Expand All @@ -100,9 +102,9 @@ fn to_py_array(array: Arc<dyn Array>, py: Python) -> PyResult<PyObject> {

fn to_rust_field(ob: PyObject, py: Python) -> PyResult<Field> {
// prepare a pointer to receive the Array struct
let schema = Box::new(ffi::Ffi_ArrowSchema::empty());
let schema = Box::new(ffi::ArrowSchema::empty());

let schema_ptr = &*schema as *const ffi::Ffi_ArrowSchema;
let schema_ptr = &*schema as *const ffi::ArrowSchema;

// make the conversion through PyArrow's private API
// this changes the pointer's memory and is thus unsafe. In particular, `_export_to_c` can go out of bounds
Expand All @@ -114,7 +116,7 @@ fn to_rust_field(ob: PyObject, py: Python) -> PyResult<Field> {
}

fn to_py_field(field: &Field, py: Python) -> PyResult<PyObject> {
let schema_ptr = Box::new(ffi::Ffi_ArrowSchema::empty());
let schema_ptr = Box::new(ffi::ArrowSchema::empty());
let schema_ptr = Box::into_raw(schema_ptr);

unsafe {
Expand Down Expand Up @@ -152,9 +154,21 @@ fn round_trip_field(array: PyObject, py: Python) -> PyResult<PyObject> {
to_py_field(&field, py)
}

#[pyfunction]
pub fn to_rust_iterator(ob: PyObject, py: Python) -> PyResult<Vec<PyObject>> {
c_stream::to_rust_iterator(ob, py)
}

#[pyfunction]
pub fn from_rust_iterator(py: Python) -> PyResult<PyObject> {
c_stream::from_rust_iterator(py)
}

#[pymodule]
fn arrow_pyarrow_integration_testing(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_function(wrap_pyfunction!(round_trip_array, m)?)?;
m.add_function(wrap_pyfunction!(round_trip_field, m)?)?;
m.add_function(wrap_pyfunction!(to_rust_iterator, m)?)?;
m.add_function(wrap_pyfunction!(from_rust_iterator, m)?)?;
Ok(())
}
29 changes: 29 additions & 0 deletions arrow-pyarrow-integration-testing/tests/test_c_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import unittest

import pyarrow.ipc

import arrow_pyarrow_integration_testing


class TestCase(unittest.TestCase):
def test_rust_reads(self):
schema = pyarrow.schema([pyarrow.field("aa", pyarrow.int32())])
a = pyarrow.array([1, None, 2], type=pyarrow.int32())

batch = pyarrow.record_batch([a], schema)
reader = pyarrow.ipc.RecordBatchStreamReader.from_batches(schema, [batch])

arrays = arrow_pyarrow_integration_testing.to_rust_iterator(reader)

array = arrays[0].field(0)
assert array == a

def test_pyarrow_reads(self):
stream = arrow_pyarrow_integration_testing.from_rust_iterator()

arrays = [a for a in stream]

expected = pyarrow.RecordBatch.from_arrays([pyarrow.array([2, None, 1, None], pyarrow.int32())], names=["a"])
expected = [expected, expected, expected]

self.assertEqual(arrays, expected)
9 changes: 4 additions & 5 deletions benches/avro_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@ fn schema() -> AvroSchema {
fn write(size: usize, has_codec: bool) -> Result<Vec<u8>> {
let avro = schema();
// a writer needs a schema and something to write to
let mut writer: Writer<Vec<u8>>;
if has_codec {
writer = Writer::with_codec(&avro, Vec::new(), Codec::Deflate);
let mut writer = if has_codec {
Writer::with_codec(&avro, Vec::new(), Codec::Deflate)
} else {
writer = Writer::new(&avro, Vec::new());
}
Writer::new(&avro, Vec::new())
};

(0..size).for_each(|_| {
let mut record = Record::new(writer.schema()).unwrap();
Expand Down
9 changes: 9 additions & 0 deletions benches/filter_kernels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,21 @@ fn add_benchmark(c: &mut Criterion) {
});

let data_array = create_primitive_array::<f32>(size, 0.5);
let data_array_nonull = create_primitive_array::<f32>(size, 0.0);
c.bench_function("filter f32", |b| {
b.iter(|| bench_filter(&data_array, &filter_array))
});
c.bench_function("filter f32 high selectivity", |b| {
b.iter(|| bench_filter(&data_array, &dense_filter_array))
});

c.bench_function("filter f32 nonull", |b| {
b.iter(|| bench_filter(&data_array_nonull, &filter_array))
});
c.bench_function("filter f32 nonull high selectivity", |b| {
b.iter(|| bench_filter(&data_array_nonull, &dense_filter_array))
});

c.bench_function("filter context f32", |b| {
b.iter(|| bench_built_filter(&filter, &data_array))
});
Expand Down
8 changes: 4 additions & 4 deletions benches/write_csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ use arrow2::util::bench_util::*;
type ChunkArc = Chunk<Arc<dyn Array>>;

fn write_batch(columns: &ChunkArc) -> Result<()> {
let writer = &mut write::WriterBuilder::new().from_writer(vec![]);
let mut writer = vec![];

assert_eq!(columns.arrays().len(), 1);
write::write_header(writer, &["a"])?;

let options = write::SerializeOptions::default();
write::write_chunk(writer, columns, &options)
write::write_header(&mut writer, &["a"], &options)?;

write::write_chunk(&mut writer, columns, &options)
}

fn make_chunk(array: impl Array + 'static) -> Chunk<Arc<dyn Array>> {
Expand Down
29 changes: 9 additions & 20 deletions benches/write_json.rs
Original file line number Diff line number Diff line change
@@ -1,55 +1,44 @@
use std::sync::Arc;

use criterion::{criterion_group, criterion_main, Criterion};

use arrow2::array::*;
use arrow2::chunk::Chunk;
use arrow2::error::Result;
use arrow2::error::ArrowError;
use arrow2::io::json::write;
use arrow2::util::bench_util::*;

fn write_batch(columns: &Chunk<Arc<dyn Array>>) -> Result<()> {
fn write_array(array: Box<dyn Array>) -> Result<(), ArrowError> {
let mut writer = vec![];
let format = write::JsonArray::default();

let batches = vec![Ok(columns.clone())].into_iter();
let arrays = vec![Ok(array)].into_iter();

// Advancing this iterator serializes the next batch to its internal buffer (i.e. CPU-bounded)
let blocks = write::Serializer::new(batches, vec!["c1".to_string()], vec![], format);
// Advancing this iterator serializes the next array to its internal buffer (i.e. CPU-bounded)
let blocks = write::Serializer::new(arrays, vec![]);

// the operation of writing is IO-bounded.
write::write(&mut writer, format, blocks)?;
write::write(&mut writer, blocks)?;

Ok(())
}

fn make_chunk(array: impl Array + 'static) -> Chunk<Arc<dyn Array>> {
Chunk::new(vec![Arc::new(array) as Arc<dyn Array>])
}

fn add_benchmark(c: &mut Criterion) {
(10..=18).step_by(2).for_each(|log2_size| {
let size = 2usize.pow(log2_size);

let array = create_primitive_array::<i32>(size, 0.1);
let columns = make_chunk(array);

c.bench_function(&format!("json write i32 2^{}", log2_size), |b| {
b.iter(|| write_batch(&columns))
b.iter(|| write_array(Box::new(array.clone())))
});

let array = create_string_array::<i32>(size, 100, 0.1, 42);
let columns = make_chunk(array);

c.bench_function(&format!("json write utf8 2^{}", log2_size), |b| {
b.iter(|| write_batch(&columns))
b.iter(|| write_array(Box::new(array.clone())))
});

let array = create_primitive_array::<f64>(size, 0.1);
let columns = make_chunk(array);

c.bench_function(&format!("json write f64 2^{}", log2_size), |b| {
b.iter(|| write_batch(&columns))
b.iter(|| write_array(Box::new(array.clone())))
});
});
}
Expand Down
Loading

0 comments on commit b55447e

Please sign in to comment.