This repository has been archived by the owner on Feb 18, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 224
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
9f6be52
commit b8cb9c2
Showing
8 changed files
with
283 additions
and
13 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,154 @@ | ||
use std::collections::VecDeque; | ||
|
||
use crate::array::{Array, BooleanArray, FromFfi}; | ||
use crate::datatypes::DataType; | ||
use crate::error::Error; | ||
|
||
use crate::io::ipc::read::OutOfSpecKind; | ||
use crate::io::ipc::read::{IpcBuffer, Node}; | ||
|
||
use super::{ArrowArray, InternalArrowArray}; | ||
|
||
#[allow(dead_code)] | ||
struct PrivateData<T> { | ||
// the owner of the pointers' regions | ||
data: T, | ||
buffers_ptr: Box<[*const std::os::raw::c_void]>, | ||
//children_ptr: Box<[*mut ArrowArray]>, | ||
dictionary_ptr: Option<*mut ArrowArray>, | ||
} | ||
|
||
fn get_buffer(buffers: &mut VecDeque<IpcBuffer>) -> Result<(usize, usize), Error> { | ||
let buffer = buffers | ||
.pop_front() | ||
.ok_or_else(|| Error::from(OutOfSpecKind::ExpectedBuffer))?; | ||
|
||
let offset: usize = buffer | ||
.offset() | ||
.try_into() | ||
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; | ||
|
||
let length: usize = buffer | ||
.length() | ||
.try_into() | ||
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; | ||
|
||
Ok((offset, length)) | ||
} | ||
|
||
// callback used to drop [ArrowArray] when it is exported | ||
unsafe extern "C" fn release<T>(array: *mut ArrowArray) { | ||
if array.is_null() { | ||
return; | ||
} | ||
let array = &mut *array; | ||
|
||
// take ownership of `private_data`, therefore dropping it | ||
let private = Box::from_raw(array.private_data as *mut PrivateData<T>); | ||
/*for child in private.children_ptr.iter() { | ||
let _ = Box::from_raw(*child); | ||
}*/ | ||
|
||
if let Some(ptr) = private.dictionary_ptr { | ||
let _ = Box::from_raw(ptr); | ||
} | ||
|
||
array.release = None; | ||
} | ||
|
||
fn mmap_boolean<T: Clone + AsRef<[u8]>>( | ||
data: T, | ||
node: &Node, | ||
block_offset: usize, | ||
buffers: &mut VecDeque<IpcBuffer>, | ||
) -> Result<ArrowArray, Error> { | ||
let num_rows: usize = node | ||
.length() | ||
.try_into() | ||
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; | ||
|
||
let null_count: usize = node | ||
.null_count() | ||
.try_into() | ||
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; | ||
|
||
let data_ref = data.as_ref(); | ||
|
||
let validity = get_buffer(buffers)?; | ||
let (offset, length) = validity; | ||
|
||
let validity = if null_count > 0 { | ||
// verify that they are in-bounds and get its pointer | ||
Some(data_ref[block_offset + offset..block_offset + offset + length].as_ptr()) | ||
} else { | ||
None | ||
}; | ||
|
||
let values = get_buffer(buffers)?; | ||
let (offset, length) = values; | ||
|
||
// verify that they are in-bounds and get its pointer | ||
let values = data_ref[block_offset + offset..block_offset + offset + length].as_ptr(); | ||
|
||
// NOTE: this is valid for Boolean, but for others (e.g. Utf8), we need to validate other invariants | ||
// or mark this as unsafe | ||
|
||
let buffers_ptr = [validity, Some(values)] | ||
.iter() | ||
.map(|maybe_buffer| match maybe_buffer { | ||
Some(b) => *b as *const std::os::raw::c_void, | ||
None => std::ptr::null(), | ||
}) | ||
.collect::<Box<[_]>>(); | ||
let n_buffers = buffers.len() as i64; | ||
|
||
let mut private_data = Box::new(PrivateData::<T> { | ||
data: data.clone(), | ||
buffers_ptr, | ||
dictionary_ptr: None, | ||
}); | ||
|
||
Ok(ArrowArray { | ||
length: num_rows as i64, | ||
null_count: null_count as i64, | ||
offset: 0, | ||
n_buffers, | ||
n_children: 0, | ||
buffers: private_data.buffers_ptr.as_mut_ptr(), | ||
children: std::ptr::null_mut(), | ||
dictionary: private_data.dictionary_ptr.unwrap_or(std::ptr::null_mut()), | ||
release: Some(release::<T>), | ||
private_data: Box::into_raw(private_data) as *mut ::std::os::raw::c_void, | ||
}) | ||
} | ||
|
||
fn boolean<T: Clone + AsRef<[u8]>>( | ||
data: T, | ||
node: &Node, | ||
block_offset: usize, | ||
buffers: &mut VecDeque<IpcBuffer>, | ||
data_type: DataType, | ||
) -> Result<BooleanArray, Error> { | ||
let array = mmap_boolean(data, node, block_offset, buffers)?; | ||
let array = InternalArrowArray::new(array, data_type); | ||
// this is safe because we just (correctly) constructed `ArrowArray` | ||
unsafe { BooleanArray::try_from_ffi(array) } | ||
} | ||
|
||
/// Maps a memory region to an [`Array`]. | ||
pub fn mmap<T: Clone + AsRef<[u8]>>( | ||
data: T, | ||
block_offset: usize, | ||
data_type: DataType, | ||
field_nodes: &mut VecDeque<Node>, | ||
buffers: &mut VecDeque<IpcBuffer>, | ||
) -> Result<Box<dyn Array>, Error> { | ||
use crate::datatypes::PhysicalType::*; | ||
let node = field_nodes | ||
.pop_front() | ||
.ok_or_else(|| Error::from(OutOfSpecKind::ExpectedBuffer))?; | ||
match data_type.to_physical_type() { | ||
Boolean => boolean(data, &node, block_offset, buffers, data_type).map(|x| x.boxed()), | ||
_ => todo!(), | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
//! Memory maps regions defined on the IPC format into [`Array`]. | ||
use std::collections::VecDeque; | ||
|
||
use crate::array::Array; | ||
use crate::error::Error; | ||
use crate::ffi::mmap; | ||
|
||
use super::read::read_file_metadata; | ||
use super::read::reader::get_serialized_batch; | ||
use super::read::OutOfSpecKind; | ||
use super::CONTINUATION_MARKER; | ||
|
||
use arrow_format::ipc::planus::ReadAsRoot; | ||
|
||
/// something | ||
pub fn map_chunk<T: Clone + AsRef<[u8]>>(data: T, index: usize) -> Result<Box<dyn Array>, Error> { | ||
let mut bytes = data.as_ref(); | ||
let metadata = read_file_metadata(&mut std::io::Cursor::new(bytes))?; | ||
|
||
let block = metadata.blocks[index]; | ||
|
||
let offset: usize = block | ||
.offset | ||
.try_into() | ||
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; | ||
|
||
let meta_data_length: usize = block | ||
.meta_data_length | ||
.try_into() | ||
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; | ||
|
||
bytes = &bytes[offset..]; | ||
let mut message_length = bytes[..4].try_into().unwrap(); | ||
bytes = &bytes[4..]; | ||
|
||
if message_length == CONTINUATION_MARKER { | ||
// continuation marker encountered, read message next | ||
message_length = bytes[..4].try_into().unwrap(); | ||
bytes = &bytes[4..]; | ||
}; | ||
|
||
let message_length: usize = i32::from_le_bytes(message_length) | ||
.try_into() | ||
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; | ||
|
||
let message = arrow_format::ipc::MessageRef::read_as_root(&bytes[..message_length]) | ||
.map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; | ||
|
||
let batch = get_serialized_batch(&message)?; | ||
|
||
let buffers = batch | ||
.buffers() | ||
.map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferBuffers(err)))? | ||
.ok_or_else(|| Error::from(OutOfSpecKind::MissingMessageBuffers))?; | ||
let mut buffers = buffers.iter().collect::<VecDeque<_>>(); | ||
|
||
let field_nodes = batch | ||
.nodes() | ||
.map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferNodes(err)))? | ||
.ok_or_else(|| Error::from(OutOfSpecKind::MissingMessageNodes))?; | ||
let mut field_nodes = field_nodes.iter().collect::<VecDeque<_>>(); | ||
|
||
let data_type = metadata.schema.fields[0].data_type.clone(); | ||
|
||
mmap( | ||
data.clone(), | ||
offset + meta_data_length, | ||
data_type, | ||
&mut field_nodes, | ||
&mut buffers, | ||
) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -77,6 +77,7 @@ mod compression; | |
mod endianess; | ||
|
||
pub mod append; | ||
pub mod mmap; | ||
pub mod read; | ||
pub mod write; | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
use arrow2::error::Result; | ||
use arrow2::io::ipc::mmap::map_chunk; | ||
|
||
use super::super::common::read_gzip_json; | ||
|
||
#[derive(Clone)] | ||
struct Mmap(pub std::sync::Arc<Vec<u8>>); | ||
|
||
impl AsRef<[u8]> for Mmap { | ||
#[inline] | ||
fn as_ref(&self) -> &[u8] { | ||
self.0.as_ref() | ||
} | ||
} | ||
|
||
fn test_file(version: &str, file_name: &str) -> Result<()> { | ||
let testdata = crate::test_util::arrow_test_data(); | ||
|
||
let arrow_file = format!( | ||
"{}/arrow-ipc-stream/integration/{}/{}.arrow_file", | ||
testdata, version, file_name | ||
); | ||
|
||
let data = std::fs::read(arrow_file).unwrap(); | ||
|
||
let data = Mmap(std::sync::Arc::new(data)); | ||
|
||
// read expected JSON output | ||
let (_schema, _, batches) = read_gzip_json(version, file_name)?; | ||
|
||
let array = map_chunk(data, 0)?; | ||
|
||
assert_eq!(batches[0].arrays()[0], array); | ||
Ok(()) | ||
} | ||
|
||
#[test] | ||
fn read_generated_100_primitive() -> Result<()> { | ||
test_file("1.0.0-littleendian", "generated_primitive") | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,3 @@ | ||
mod file; | ||
mod mmap; | ||
mod stream; |