diff --git a/src/ffi/mmap.rs b/src/ffi/mmap.rs index 32be7d763eb..98143610da9 100644 --- a/src/ffi/mmap.rs +++ b/src/ffi/mmap.rs @@ -1,17 +1,12 @@ -use std::collections::VecDeque; +//! Functionality to mmap in-memory data regions. use std::sync::Arc; -use crate::array::{Array, DictionaryKey, FixedSizeListArray, ListArray, StructArray}; -use crate::datatypes::DataType; -use crate::error::Error; -use crate::offset::Offset; +use crate::{ + array::{FromFfi, PrimitiveArray}, + types::NativeType, +}; -use crate::io::ipc::read::{Dictionaries, OutOfSpecKind}; -use crate::io::ipc::read::{IpcBuffer, Node}; -use crate::io::ipc::IpcField; -use crate::types::NativeType; - -use super::{export_array_to_c, try_from, ArrowArray, InternalArrowArray}; +use super::{ArrowArray, InternalArrowArray}; #[allow(dead_code)] struct PrivateData { @@ -22,71 +17,7 @@ struct PrivateData { dictionary_ptr: Option<*mut ArrowArray>, } -fn get_buffer_bounds(buffers: &mut VecDeque) -> Result<(usize, usize), Error> { - let buffer = buffers - .pop_front() - .ok_or_else(|| Error::from(OutOfSpecKind::ExpectedBuffer))?; - - let offset: usize = buffer - .offset() - .try_into() - .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - - let length: usize = buffer - .length() - .try_into() - .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - - Ok((offset, length)) -} - -fn get_buffer<'a, T: NativeType>( - data: &'a [u8], - block_offset: usize, - buffers: &mut VecDeque, - num_rows: usize, -) -> Result<&'a [u8], Error> { - let (offset, length) = get_buffer_bounds(buffers)?; - - // verify that they are in-bounds - let values = data - .get(block_offset + offset..block_offset + offset + length) - .ok_or_else(|| Error::OutOfSpec("buffer out of bounds".to_string()))?; - - // validate alignment - let v: &[T] = bytemuck::try_cast_slice(values) - .map_err(|_| Error::OutOfSpec("buffer not aligned for mmap".to_string()))?; - - if v.len() < num_rows { - return Err(Error::OutOfSpec( - "buffer's length is too small in mmap".to_string(), - )); - } - - Ok(values) -} - -fn get_validity<'a>( - data: &'a [u8], - block_offset: usize, - buffers: &mut VecDeque, - null_count: usize, -) -> Result, Error> { - let validity = get_buffer_bounds(buffers)?; - let (offset, length) = validity; - - Ok(if null_count > 0 { - // verify that they are in-bounds and get its pointer - Some( - data.get(block_offset + offset..block_offset + offset + length) - .ok_or_else(|| Error::OutOfSpec("buffer out of bounds".to_string()))?, - ) - } else { - None - }) -} - -fn create_array< +pub(crate) unsafe fn create_array< T: AsRef<[u8]>, I: Iterator>, II: Iterator, @@ -134,7 +65,7 @@ fn create_array< } } -/// callback used to drop [`ArrowArray`] when it is exported +/// callback used to drop [`ArrowArray`] when it is exported specified for [`PrivateData`]. unsafe extern "C" fn release(array: *mut ArrowArray) { if array.is_null() { return; @@ -154,460 +85,37 @@ unsafe extern "C" fn release(array: *mut ArrowArray) { array.release = None; } -fn mmap_binary>( - data: Arc, - node: &Node, - block_offset: usize, - buffers: &mut VecDeque, -) -> Result { - let num_rows: usize = node - .length() - .try_into() - .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - - let null_count: usize = node - .null_count() - .try_into() - .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - - let data_ref = data.as_ref().as_ref(); - - let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr()); - - let offsets = get_buffer::(data_ref, block_offset, buffers, num_rows + 1)?.as_ptr(); - let values = get_buffer::(data_ref, block_offset, buffers, 0)?.as_ptr(); - - // NOTE: offsets and values invariants are _not_ validated - Ok(create_array( +/// Creates a (non-null) [`PrimitiveArray`] from a slice of values. +/// This does not have memcopy and is the fastest way to create a [`PrimitiveArray`]. +/// +/// This can be useful if you want to apply arrow kernels on slices without incurring +/// a memcopy cost. +/// +/// # Safety +/// +/// Using this function is not unsafe, but the returned PrimitiveArray's lifetime is bound to the lifetime +/// of the slice. The returned [`PrimitiveArray`] _must not_ outlive the passed slice. +pub unsafe fn slice(slice: &[T]) -> PrimitiveArray { + let num_rows = slice.len(); + let null_count = 0; + let validity = None; + + let data: &[u8] = bytemuck::cast_slice(slice); + let ptr = data.as_ptr() as *const u8; + let data = Arc::new(data); + + // safety: the underlying assumption of this function: the array will not be used + // beyond the + let array = create_array( data, num_rows, null_count, - [validity, Some(offsets), Some(values)].into_iter(), + [validity, Some(ptr)].into_iter(), [].into_iter(), None, - )) -} - -fn mmap_fixed_size_binary>( - data: Arc, - node: &Node, - block_offset: usize, - buffers: &mut VecDeque, -) -> Result { - let num_rows: usize = node - .length() - .try_into() - .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - - let null_count: usize = node - .null_count() - .try_into() - .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - - let data_ref = data.as_ref().as_ref(); - - let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr()); - - let values = get_buffer::(data_ref, block_offset, buffers, num_rows + 1)?.as_ptr(); - - Ok(create_array( - data, - num_rows, - null_count, - [validity, Some(values)].into_iter(), - [].into_iter(), - None, - )) -} - -fn mmap_null>( - data: Arc, - node: &Node, - _block_offset: usize, - _buffers: &mut VecDeque, -) -> Result { - let num_rows: usize = node - .length() - .try_into() - .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - - let null_count: usize = node - .null_count() - .try_into() - .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - - Ok(create_array( - data, - num_rows, - null_count, - [].into_iter(), - [].into_iter(), - None, - )) -} - -fn mmap_boolean>( - data: Arc, - node: &Node, - block_offset: usize, - buffers: &mut VecDeque, -) -> Result { - let num_rows: usize = node - .length() - .try_into() - .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - - let null_count: usize = node - .null_count() - .try_into() - .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - - let data_ref = data.as_ref().as_ref(); - - let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr()); - - let values = get_buffer_bounds(buffers)?; - let (offset, length) = values; - - // verify that they are in-bounds and get its pointer - let values = data_ref[block_offset + offset..block_offset + offset + length].as_ptr(); - - Ok(create_array( - data, - num_rows, - null_count, - [validity, Some(values)].into_iter(), - [].into_iter(), - None, - )) -} - -fn mmap_primitive>( - data: Arc, - node: &Node, - block_offset: usize, - buffers: &mut VecDeque, -) -> Result { - let data_ref = data.as_ref().as_ref(); - - let num_rows: usize = node - .length() - .try_into() - .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - - let null_count: usize = node - .null_count() - .try_into() - .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - - let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr()); - - let values = get_buffer::

(data_ref, block_offset, buffers, num_rows)?.as_ptr(); + ); + let array = InternalArrowArray::new(array, T::PRIMITIVE.into()); - Ok(create_array( - data, - num_rows, - null_count, - [validity, Some(values)].into_iter(), - [].into_iter(), - None, - )) -} - -#[allow(clippy::too_many_arguments)] -fn mmap_list>( - data: Arc, - node: &Node, - block_offset: usize, - data_type: &DataType, - ipc_field: &IpcField, - dictionaries: &Dictionaries, - field_nodes: &mut VecDeque, - buffers: &mut VecDeque, -) -> Result { - let child = ListArray::::try_get_child(data_type)?.data_type(); - - let num_rows: usize = node - .length() - .try_into() - .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - - let null_count: usize = node - .null_count() - .try_into() - .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - - let data_ref = data.as_ref().as_ref(); - - let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr()); - - let offsets = get_buffer::(data_ref, block_offset, buffers, num_rows + 1)?.as_ptr(); - - let values = get_array( - data.clone(), - block_offset, - child, - &ipc_field.fields[0], - dictionaries, - field_nodes, - buffers, - )?; - - // NOTE: offsets and values invariants are _not_ validated - Ok(create_array( - data, - num_rows, - null_count, - [validity, Some(offsets)].into_iter(), - [values].into_iter(), - None, - )) -} - -#[allow(clippy::too_many_arguments)] -fn mmap_fixed_size_list>( - data: Arc, - node: &Node, - block_offset: usize, - data_type: &DataType, - ipc_field: &IpcField, - dictionaries: &Dictionaries, - field_nodes: &mut VecDeque, - buffers: &mut VecDeque, -) -> Result { - let child = FixedSizeListArray::try_child_and_size(data_type)? - .0 - .data_type(); - - let num_rows: usize = node - .length() - .try_into() - .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - - let null_count: usize = node - .null_count() - .try_into() - .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - - let data_ref = data.as_ref().as_ref(); - - let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr()); - - let values = get_array( - data.clone(), - block_offset, - child, - &ipc_field.fields[0], - dictionaries, - field_nodes, - buffers, - )?; - - Ok(create_array( - data, - num_rows, - null_count, - [validity].into_iter(), - [values].into_iter(), - None, - )) -} - -#[allow(clippy::too_many_arguments)] -fn mmap_struct>( - data: Arc, - node: &Node, - block_offset: usize, - data_type: &DataType, - ipc_field: &IpcField, - dictionaries: &Dictionaries, - field_nodes: &mut VecDeque, - buffers: &mut VecDeque, -) -> Result { - let children = StructArray::try_get_fields(data_type)?; - - let num_rows: usize = node - .length() - .try_into() - .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - - let null_count: usize = node - .null_count() - .try_into() - .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - - let data_ref = data.as_ref().as_ref(); - - let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr()); - - let values = children - .iter() - .map(|f| &f.data_type) - .zip(ipc_field.fields.iter()) - .map(|(child, ipc)| { - get_array( - data.clone(), - block_offset, - child, - ipc, - dictionaries, - field_nodes, - buffers, - ) - }) - .collect::, Error>>()?; - - Ok(create_array( - data, - num_rows, - null_count, - [validity].into_iter(), - values.into_iter(), - None, - )) -} - -#[allow(clippy::too_many_arguments)] -fn mmap_dict>( - data: Arc, - node: &Node, - block_offset: usize, - _: &DataType, - ipc_field: &IpcField, - dictionaries: &Dictionaries, - _: &mut VecDeque, - buffers: &mut VecDeque, -) -> Result { - let num_rows: usize = node - .length() - .try_into() - .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - - let null_count: usize = node - .null_count() - .try_into() - .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - - let data_ref = data.as_ref().as_ref(); - - let dictionary = dictionaries - .get(&ipc_field.dictionary_id.unwrap()) - .ok_or_else(|| Error::oos("Missing dictionary"))? - .clone(); - - let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr()); - - let values = get_buffer::(data_ref, block_offset, buffers, num_rows)?.as_ptr(); - - Ok(create_array( - data, - num_rows, - null_count, - [validity, Some(values)].into_iter(), - [].into_iter(), - Some(export_array_to_c(dictionary)), - )) -} - -fn get_array>( - data: Arc, - block_offset: usize, - data_type: &DataType, - ipc_field: &IpcField, - dictionaries: &Dictionaries, - field_nodes: &mut VecDeque, - buffers: &mut VecDeque, -) -> Result { - use crate::datatypes::PhysicalType::*; - let node = field_nodes - .pop_front() - .ok_or_else(|| Error::from(OutOfSpecKind::ExpectedBuffer))?; - - match data_type.to_physical_type() { - Null => mmap_null(data, &node, block_offset, buffers), - Boolean => mmap_boolean(data, &node, block_offset, buffers), - Primitive(p) => with_match_primitive_type!(p, |$T| { - mmap_primitive::<$T, _>(data, &node, block_offset, buffers) - }), - Utf8 | Binary => mmap_binary::(data, &node, block_offset, buffers), - FixedSizeBinary => mmap_fixed_size_binary(data, &node, block_offset, buffers), - LargeBinary | LargeUtf8 => mmap_binary::(data, &node, block_offset, buffers), - List => mmap_list::( - data, - &node, - block_offset, - data_type, - ipc_field, - dictionaries, - field_nodes, - buffers, - ), - LargeList => mmap_list::( - data, - &node, - block_offset, - data_type, - ipc_field, - dictionaries, - field_nodes, - buffers, - ), - FixedSizeList => mmap_fixed_size_list( - data, - &node, - block_offset, - data_type, - ipc_field, - dictionaries, - field_nodes, - buffers, - ), - Struct => mmap_struct( - data, - &node, - block_offset, - data_type, - ipc_field, - dictionaries, - field_nodes, - buffers, - ), - Dictionary(key_type) => match_integer_type!(key_type, |$T| { - mmap_dict::<$T, _>( - data, - &node, - block_offset, - data_type, - ipc_field, - dictionaries, - field_nodes, - buffers, - ) - }), - _ => todo!(), - } -} - -/// Maps a memory region to an [`Array`]. -pub(crate) unsafe fn mmap>( - data: Arc, - block_offset: usize, - data_type: DataType, - ipc_field: &IpcField, - dictionaries: &Dictionaries, - field_nodes: &mut VecDeque, - buffers: &mut VecDeque, -) -> Result, Error> { - let array = get_array( - data, - block_offset, - &data_type, - ipc_field, - dictionaries, - field_nodes, - buffers, - )?; - // The unsafety comes from the fact that `array` is not necessarily valid - - // the IPC file may be corrupted (e.g. invalid offsets or non-utf8 data) - unsafe { try_from(InternalArrowArray::new(array, data_type)) } + // safety: we just created a valid array + unsafe { PrimitiveArray::::try_from_ffi(array) }.unwrap() } diff --git a/src/ffi/mod.rs b/src/ffi/mod.rs index b416d1c4648..62e66530efc 100644 --- a/src/ffi/mod.rs +++ b/src/ffi/mod.rs @@ -3,9 +3,7 @@ mod array; mod bridge; mod generated; -#[cfg(feature = "io_ipc")] -#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc")))] -pub(crate) mod mmap; +pub mod mmap; mod schema; mod stream; diff --git a/src/mmap/array.rs b/src/mmap/array.rs new file mode 100644 index 00000000000..15ba3ac8823 --- /dev/null +++ b/src/mmap/array.rs @@ -0,0 +1,555 @@ +use std::collections::VecDeque; +use std::sync::Arc; + +use crate::array::{Array, DictionaryKey, FixedSizeListArray, ListArray, StructArray}; +use crate::datatypes::DataType; +use crate::error::Error; +use crate::offset::Offset; + +use crate::io::ipc::read::{Dictionaries, OutOfSpecKind}; +use crate::io::ipc::read::{IpcBuffer, Node}; +use crate::io::ipc::IpcField; +use crate::types::NativeType; + +use crate::ffi::mmap::create_array; +use crate::ffi::{export_array_to_c, try_from, ArrowArray, InternalArrowArray}; + +fn get_buffer_bounds(buffers: &mut VecDeque) -> Result<(usize, usize), Error> { + let buffer = buffers + .pop_front() + .ok_or_else(|| Error::from(OutOfSpecKind::ExpectedBuffer))?; + + let offset: usize = buffer + .offset() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let length: usize = buffer + .length() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + Ok((offset, length)) +} + +fn get_buffer<'a, T: NativeType>( + data: &'a [u8], + block_offset: usize, + buffers: &mut VecDeque, + num_rows: usize, +) -> Result<&'a [u8], Error> { + let (offset, length) = get_buffer_bounds(buffers)?; + + // verify that they are in-bounds + let values = data + .get(block_offset + offset..block_offset + offset + length) + .ok_or_else(|| Error::OutOfSpec("buffer out of bounds".to_string()))?; + + // validate alignment + let v: &[T] = bytemuck::try_cast_slice(values) + .map_err(|_| Error::OutOfSpec("buffer not aligned for mmap".to_string()))?; + + if v.len() < num_rows { + return Err(Error::OutOfSpec( + "buffer's length is too small in mmap".to_string(), + )); + } + + Ok(values) +} + +fn get_validity<'a>( + data: &'a [u8], + block_offset: usize, + buffers: &mut VecDeque, + null_count: usize, +) -> Result, Error> { + let validity = get_buffer_bounds(buffers)?; + let (offset, length) = validity; + + Ok(if null_count > 0 { + // verify that they are in-bounds and get its pointer + Some( + data.get(block_offset + offset..block_offset + offset + length) + .ok_or_else(|| Error::OutOfSpec("buffer out of bounds".to_string()))?, + ) + } else { + None + }) +} + +fn mmap_binary>( + data: Arc, + node: &Node, + block_offset: usize, + buffers: &mut VecDeque, +) -> Result { + let num_rows: usize = node + .length() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let null_count: usize = node + .null_count() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let data_ref = data.as_ref().as_ref(); + + let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr()); + + let offsets = get_buffer::(data_ref, block_offset, buffers, num_rows + 1)?.as_ptr(); + let values = get_buffer::(data_ref, block_offset, buffers, 0)?.as_ptr(); + + // NOTE: offsets and values invariants are _not_ validated + Ok(unsafe { + create_array( + data, + num_rows, + null_count, + [validity, Some(offsets), Some(values)].into_iter(), + [].into_iter(), + None, + ) + }) +} + +fn mmap_fixed_size_binary>( + data: Arc, + node: &Node, + block_offset: usize, + buffers: &mut VecDeque, +) -> Result { + let num_rows: usize = node + .length() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let null_count: usize = node + .null_count() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let data_ref = data.as_ref().as_ref(); + + let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr()); + + let values = get_buffer::(data_ref, block_offset, buffers, num_rows + 1)?.as_ptr(); + + Ok(unsafe { + create_array( + data, + num_rows, + null_count, + [validity, Some(values)].into_iter(), + [].into_iter(), + None, + ) + }) +} + +fn mmap_null>( + data: Arc, + node: &Node, + _block_offset: usize, + _buffers: &mut VecDeque, +) -> Result { + let num_rows: usize = node + .length() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let null_count: usize = node + .null_count() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + Ok(unsafe { + create_array( + data, + num_rows, + null_count, + [].into_iter(), + [].into_iter(), + None, + ) + }) +} + +fn mmap_boolean>( + data: Arc, + node: &Node, + block_offset: usize, + buffers: &mut VecDeque, +) -> Result { + let num_rows: usize = node + .length() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let null_count: usize = node + .null_count() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let data_ref = data.as_ref().as_ref(); + + let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr()); + + let values = get_buffer_bounds(buffers)?; + let (offset, length) = values; + + // verify that they are in-bounds and get its pointer + let values = data_ref[block_offset + offset..block_offset + offset + length].as_ptr(); + + Ok(unsafe { + create_array( + data, + num_rows, + null_count, + [validity, Some(values)].into_iter(), + [].into_iter(), + None, + ) + }) +} + +fn mmap_primitive>( + data: Arc, + node: &Node, + block_offset: usize, + buffers: &mut VecDeque, +) -> Result { + let data_ref = data.as_ref().as_ref(); + + let num_rows: usize = node + .length() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let null_count: usize = node + .null_count() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr()); + + let values = get_buffer::

(data_ref, block_offset, buffers, num_rows)?.as_ptr(); + + Ok(unsafe { + create_array( + data, + num_rows, + null_count, + [validity, Some(values)].into_iter(), + [].into_iter(), + None, + ) + }) +} + +#[allow(clippy::too_many_arguments)] +fn mmap_list>( + data: Arc, + node: &Node, + block_offset: usize, + data_type: &DataType, + ipc_field: &IpcField, + dictionaries: &Dictionaries, + field_nodes: &mut VecDeque, + buffers: &mut VecDeque, +) -> Result { + let child = ListArray::::try_get_child(data_type)?.data_type(); + + let num_rows: usize = node + .length() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let null_count: usize = node + .null_count() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let data_ref = data.as_ref().as_ref(); + + let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr()); + + let offsets = get_buffer::(data_ref, block_offset, buffers, num_rows + 1)?.as_ptr(); + + let values = get_array( + data.clone(), + block_offset, + child, + &ipc_field.fields[0], + dictionaries, + field_nodes, + buffers, + )?; + + // NOTE: offsets and values invariants are _not_ validated + Ok(unsafe { + create_array( + data, + num_rows, + null_count, + [validity, Some(offsets)].into_iter(), + [values].into_iter(), + None, + ) + }) +} + +#[allow(clippy::too_many_arguments)] +fn mmap_fixed_size_list>( + data: Arc, + node: &Node, + block_offset: usize, + data_type: &DataType, + ipc_field: &IpcField, + dictionaries: &Dictionaries, + field_nodes: &mut VecDeque, + buffers: &mut VecDeque, +) -> Result { + let child = FixedSizeListArray::try_child_and_size(data_type)? + .0 + .data_type(); + + let num_rows: usize = node + .length() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let null_count: usize = node + .null_count() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let data_ref = data.as_ref().as_ref(); + + let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr()); + + let values = get_array( + data.clone(), + block_offset, + child, + &ipc_field.fields[0], + dictionaries, + field_nodes, + buffers, + )?; + + Ok(unsafe { + create_array( + data, + num_rows, + null_count, + [validity].into_iter(), + [values].into_iter(), + None, + ) + }) +} + +#[allow(clippy::too_many_arguments)] +fn mmap_struct>( + data: Arc, + node: &Node, + block_offset: usize, + data_type: &DataType, + ipc_field: &IpcField, + dictionaries: &Dictionaries, + field_nodes: &mut VecDeque, + buffers: &mut VecDeque, +) -> Result { + let children = StructArray::try_get_fields(data_type)?; + + let num_rows: usize = node + .length() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let null_count: usize = node + .null_count() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let data_ref = data.as_ref().as_ref(); + + let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr()); + + let values = children + .iter() + .map(|f| &f.data_type) + .zip(ipc_field.fields.iter()) + .map(|(child, ipc)| { + get_array( + data.clone(), + block_offset, + child, + ipc, + dictionaries, + field_nodes, + buffers, + ) + }) + .collect::, Error>>()?; + + Ok(unsafe { + create_array( + data, + num_rows, + null_count, + [validity].into_iter(), + values.into_iter(), + None, + ) + }) +} + +#[allow(clippy::too_many_arguments)] +fn mmap_dict>( + data: Arc, + node: &Node, + block_offset: usize, + _: &DataType, + ipc_field: &IpcField, + dictionaries: &Dictionaries, + _: &mut VecDeque, + buffers: &mut VecDeque, +) -> Result { + let num_rows: usize = node + .length() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let null_count: usize = node + .null_count() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let data_ref = data.as_ref().as_ref(); + + let dictionary = dictionaries + .get(&ipc_field.dictionary_id.unwrap()) + .ok_or_else(|| Error::oos("Missing dictionary"))? + .clone(); + + let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr()); + + let values = get_buffer::(data_ref, block_offset, buffers, num_rows)?.as_ptr(); + + Ok(unsafe { + create_array( + data, + num_rows, + null_count, + [validity, Some(values)].into_iter(), + [].into_iter(), + Some(export_array_to_c(dictionary)), + ) + }) +} + +fn get_array>( + data: Arc, + block_offset: usize, + data_type: &DataType, + ipc_field: &IpcField, + dictionaries: &Dictionaries, + field_nodes: &mut VecDeque, + buffers: &mut VecDeque, +) -> Result { + use crate::datatypes::PhysicalType::*; + let node = field_nodes + .pop_front() + .ok_or_else(|| Error::from(OutOfSpecKind::ExpectedBuffer))?; + + match data_type.to_physical_type() { + Null => mmap_null(data, &node, block_offset, buffers), + Boolean => mmap_boolean(data, &node, block_offset, buffers), + Primitive(p) => with_match_primitive_type!(p, |$T| { + mmap_primitive::<$T, _>(data, &node, block_offset, buffers) + }), + Utf8 | Binary => mmap_binary::(data, &node, block_offset, buffers), + FixedSizeBinary => mmap_fixed_size_binary(data, &node, block_offset, buffers), + LargeBinary | LargeUtf8 => mmap_binary::(data, &node, block_offset, buffers), + List => mmap_list::( + data, + &node, + block_offset, + data_type, + ipc_field, + dictionaries, + field_nodes, + buffers, + ), + LargeList => mmap_list::( + data, + &node, + block_offset, + data_type, + ipc_field, + dictionaries, + field_nodes, + buffers, + ), + FixedSizeList => mmap_fixed_size_list( + data, + &node, + block_offset, + data_type, + ipc_field, + dictionaries, + field_nodes, + buffers, + ), + Struct => mmap_struct( + data, + &node, + block_offset, + data_type, + ipc_field, + dictionaries, + field_nodes, + buffers, + ), + Dictionary(key_type) => match_integer_type!(key_type, |$T| { + mmap_dict::<$T, _>( + data, + &node, + block_offset, + data_type, + ipc_field, + dictionaries, + field_nodes, + buffers, + ) + }), + _ => todo!(), + } +} + +/// Maps a memory region to an [`Array`]. +pub(crate) unsafe fn mmap>( + data: Arc, + block_offset: usize, + data_type: DataType, + ipc_field: &IpcField, + dictionaries: &Dictionaries, + field_nodes: &mut VecDeque, + buffers: &mut VecDeque, +) -> Result, Error> { + let array = get_array( + data, + block_offset, + &data_type, + ipc_field, + dictionaries, + field_nodes, + buffers, + )?; + // The unsafety comes from the fact that `array` is not necessarily valid - + // the IPC file may be corrupted (e.g. invalid offsets or non-utf8 data) + unsafe { try_from(InternalArrowArray::new(array, data_type)) } +} diff --git a/src/mmap/mod.rs b/src/mmap/mod.rs index d8bd90c3a2a..5d560c93663 100644 --- a/src/mmap/mod.rs +++ b/src/mmap/mod.rs @@ -2,11 +2,12 @@ use std::collections::VecDeque; use std::sync::Arc; +mod array; + use crate::array::Array; use crate::chunk::Chunk; use crate::datatypes::{DataType, Field}; use crate::error::Error; -use crate::ffi::mmap; use crate::io::ipc::read::file::{get_dictionary_batch, get_record_batch}; use crate::io::ipc::read::{first_dict_field, Dictionaries, FileMetadata}; @@ -91,7 +92,7 @@ unsafe fn _mmap_record>( .cloned() .zip(ipc_fields) .map(|(data_type, ipc_field)| { - mmap::mmap( + array::mmap( data.clone(), offset, data_type, diff --git a/tests/it/ffi/mod.rs b/tests/it/ffi/mod.rs index af49381f138..0ae97a86627 100644 --- a/tests/it/ffi/mod.rs +++ b/tests/it/ffi/mod.rs @@ -1,2 +1,10 @@ mod data; mod stream; + +#[test] +fn mmap_slice() { + let slice = &[1, 2, 3]; + let array = unsafe { arrow2::ffi::mmap::slice(slice) }; + assert_eq!(array.values().as_ref(), &[1, 2, 3]); + // note: when `slice` is dropped, array must be dropped as-well since by construction of `slice` they share their lifetimes. +}