diff --git a/src/ffi/mmap.rs b/src/ffi/mmap.rs index d3f2944765c..fb1da4fcd23 100644 --- a/src/ffi/mmap.rs +++ b/src/ffi/mmap.rs @@ -1,11 +1,12 @@ use std::collections::VecDeque; -use crate::array::{Array, BooleanArray, FromFfi, Offset, Utf8Array}; +use crate::array::{Array, BinaryArray, BooleanArray, FromFfi, Offset, PrimitiveArray, Utf8Array}; use crate::datatypes::DataType; use crate::error::Error; use crate::io::ipc::read::OutOfSpecKind; use crate::io::ipc::read::{IpcBuffer, Node}; +use crate::types::NativeType; use super::{ArrowArray, InternalArrowArray}; @@ -18,7 +19,7 @@ struct PrivateData { dictionary_ptr: Option<*mut ArrowArray>, } -fn get_buffer(buffers: &mut VecDeque) -> Result<(usize, usize), Error> { +fn get_buffer_bounds(buffers: &mut VecDeque) -> Result<(usize, usize), Error> { let buffer = buffers .pop_front() .ok_or_else(|| Error::from(OutOfSpecKind::ExpectedBuffer))?; @@ -36,6 +37,52 @@ fn get_buffer(buffers: &mut VecDeque) -> Result<(usize, usize), Error Ok((offset, length)) } +fn get_buffer<'a, T: NativeType>( + data: &'a [u8], + block_offset: usize, + buffers: &mut VecDeque, + num_rows: usize, +) -> Result<&'a [u8], Error> { + let (offset, length) = get_buffer_bounds(buffers)?; + + // verify that they are in-bounds + let values = data + .get(block_offset + offset..block_offset + offset + length) + .ok_or_else(|| Error::OutOfSpec("buffer out of bounds".to_string()))?; + + // validate alignment + let v: &[T] = bytemuck::try_cast_slice(values) + .map_err(|_| Error::OutOfSpec("buffer not aligned for mmap".to_string()))?; + + if v.len() < num_rows { + return Err(Error::OutOfSpec( + "buffer's length is too small in mmap".to_string(), + )); + } + + Ok(values) +} + +fn get_validity<'a>( + data: &'a [u8], + block_offset: usize, + buffers: &mut VecDeque, + null_count: usize, +) -> Result, Error> { + let validity = get_buffer_bounds(buffers)?; + let (offset, length) = validity; + + Ok(if null_count > 0 { + // verify that they are in-bounds and get its pointer + Some( + data.get(block_offset + offset..block_offset + offset + length) + .ok_or_else(|| Error::OutOfSpec("buffer out of bounds".to_string()))?, + ) + } else { + None + }) +} + fn create_array, I: Iterator>>( data: T, num_rows: usize, @@ -91,7 +138,7 @@ unsafe extern "C" fn release(array: *mut ArrowArray) { array.release = None; } -fn mmap_utf8>( +fn mmap_binary>( data: T, node: &Node, block_offset: usize, @@ -109,32 +156,10 @@ fn mmap_utf8>( let data_ref = data.as_ref(); - let validity = get_buffer(buffers)?; - let (offset, length) = validity; - - let validity = if null_count > 0 { - // verify that they are in-bounds and get its pointer - Some(data_ref[block_offset + offset..block_offset + offset + length].as_ptr()) - } else { - None - }; + let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr()); - let offsets = get_buffer(buffers)?; - let (offset, length) = offsets; - - // verify that they are in-bounds and get its pointer - let offsets = &data_ref[block_offset + offset..block_offset + offset + length]; - - // validate alignment - let _: &[O] = bytemuck::cast_slice(offsets); - - let offsets = data_ref[block_offset + offset..block_offset + offset + length].as_ptr(); - - let values = get_buffer(buffers)?; - let (offset, length) = values; - - // verify that they are in-bounds and get its pointer - let values = data_ref[block_offset + offset..block_offset + offset + length].as_ptr(); + let offsets = get_buffer::(data_ref, block_offset, buffers, num_rows + 1)?.as_ptr(); + let values = get_buffer::(data_ref, block_offset, buffers, 0)?.as_ptr(); // NOTE: offsets and values invariants are _not_ validated Ok(create_array( @@ -163,17 +188,9 @@ fn mmap_boolean>( let data_ref = data.as_ref(); - let validity = get_buffer(buffers)?; - let (offset, length) = validity; - - let validity = if null_count > 0 { - // verify that they are in-bounds and get its pointer - Some(data_ref[block_offset + offset..block_offset + offset + length].as_ptr()) - } else { - None - }; + let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr()); - let values = get_buffer(buffers)?; + let values = get_buffer_bounds(buffers)?; let (offset, length) = values; // verify that they are in-bounds and get its pointer @@ -200,6 +217,49 @@ fn boolean>( unsafe { BooleanArray::try_from_ffi(array) } } +fn mmap_primitive>( + data: T, + node: &Node, + block_offset: usize, + buffers: &mut VecDeque, +) -> Result { + let data_ref = data.as_ref(); + + let num_rows: usize = node + .length() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let null_count: usize = node + .null_count() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr()); + + let values = get_buffer::

(data_ref, block_offset, buffers, num_rows)?.as_ptr(); + + Ok(create_array( + data, + num_rows, + null_count, + [validity, Some(values)].into_iter(), + )) +} + +fn primitive>( + data: T, + node: &Node, + block_offset: usize, + buffers: &mut VecDeque, + data_type: DataType, +) -> Result, Error> { + let array = mmap_primitive::(data, node, block_offset, buffers)?; + let array = InternalArrowArray::new(array, data_type); + // this is safe because we just (correctly) constructed `ArrowArray` + unsafe { PrimitiveArray::try_from_ffi(array) } +} + unsafe fn utf8>( data: T, node: &Node, @@ -207,12 +267,48 @@ unsafe fn utf8>( buffers: &mut VecDeque, data_type: DataType, ) -> Result, Error> { - let array = mmap_utf8::(data, node, block_offset, buffers)?; + let array = mmap_binary::(data, node, block_offset, buffers)?; let array = InternalArrowArray::new(array, data_type); // this is unsafe because `mmap_utf8` does not validate invariants unsafe { Utf8Array::::try_from_ffi(array) } } +unsafe fn binary>( + data: T, + node: &Node, + block_offset: usize, + buffers: &mut VecDeque, + data_type: DataType, +) -> Result, Error> { + let array = mmap_binary::(data, node, block_offset, buffers)?; + let array = InternalArrowArray::new(array, data_type); + // this is unsafe because `mmap_utf8` does not validate invariants + unsafe { BinaryArray::::try_from_ffi(array) } +} + +fn mmap_list>( + data: T, + node: &Node, + block_offset: usize, + buffers: &mut VecDeque, +) -> Result { + todo!() +} + +unsafe fn list>( + data: T, + node: &Node, + block_offset: usize, + field_nodes: &mut VecDeque, + buffers: &mut VecDeque, + data_type: DataType, +) -> Result, Error> { + let array = mmap_list::(data, node, block_offset, buffers)?; + let array = InternalArrowArray::new(array, data_type); + // this is unsafe because `mmap_utf8` does not validate invariants + unsafe { BinaryArray::::try_from_ffi(array) } +} + /// Maps a memory region to an [`Array`]. pub(crate) unsafe fn mmap>( data: T, @@ -227,10 +323,21 @@ pub(crate) unsafe fn mmap>( .ok_or_else(|| Error::from(OutOfSpecKind::ExpectedBuffer))?; match data_type.to_physical_type() { Boolean => boolean(data, &node, block_offset, buffers, data_type).map(|x| x.boxed()), + Primitive(p) => with_match_primitive_type!(p, |$T| { + primitive::<$T, _>(data, &node, block_offset, buffers, data_type).map(|x| x.boxed()) + }), Utf8 => utf8::(data, &node, block_offset, buffers, data_type).map(|x| x.boxed()), LargeUtf8 => { utf8::(data, &node, block_offset, buffers, data_type).map(|x| x.boxed()) } + Binary => { + binary::(data, &node, block_offset, buffers, data_type).map(|x| x.boxed()) + } + LargeBinary => { + binary::(data, &node, block_offset, buffers, data_type).map(|x| x.boxed()) + } + List => list::(data, &node, block_offset, field_nodes, buffers, data_type) + .map(|x| x.boxed()), _ => todo!(), } }