Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Improved API of mmap #1205

Merged
merged 1 commit into from
Aug 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/ipc_file_mmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use arrow2::mmap::{mmap_dictionaries_unchecked, mmap_unchecked};
// Arrow2 requires a struct that implements `Clone + AsRef<[u8]>`, which
// usually `Arc<Mmap>` supports. Here we mock it
#[derive(Clone)]
struct Mmap(Arc<Vec<u8>>);
struct Mmap(Vec<u8>);

impl AsRef<[u8]> for Mmap {
#[inline]
Expand All @@ -19,7 +19,7 @@ impl AsRef<[u8]> for Mmap {

fn main() -> Result<()> {
// given a mmap
let mmap = Mmap(Arc::new(vec![]));
let mmap = Arc::new(Mmap(vec![]));

// read the metadata
let metadata = read::read_file_metadata(&mut std::io::Cursor::new(mmap.as_ref()))?;
Expand Down
69 changes: 35 additions & 34 deletions src/ffi/mmap.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::VecDeque;
use std::sync::Arc;

use crate::array::{Array, DictionaryKey, FixedSizeListArray, ListArray, Offset, StructArray};
use crate::datatypes::DataType;
Expand Down Expand Up @@ -85,11 +86,11 @@ fn get_validity<'a>(
}

fn create_array<
T: Clone + AsRef<[u8]>,
T: AsRef<[u8]>,
I: Iterator<Item = Option<*const u8>>,
II: Iterator<Item = ArrowArray>,
>(
data: T,
data: Arc<T>,
num_rows: usize,
null_count: usize,
buffers: I,
Expand All @@ -111,7 +112,7 @@ fn create_array<

let dictionary_ptr = dictionary.map(|array| Box::into_raw(Box::new(array)));

let mut private_data = Box::new(PrivateData::<T> {
let mut private_data = Box::new(PrivateData::<Arc<T>> {
data,
buffers_ptr,
children_ptr,
Expand All @@ -127,7 +128,7 @@ fn create_array<
buffers: private_data.buffers_ptr.as_mut_ptr(),
children: private_data.children_ptr.as_mut_ptr(),
dictionary: private_data.dictionary_ptr.unwrap_or(std::ptr::null_mut()),
release: Some(release::<T>),
release: Some(release::<Arc<T>>),
private_data: Box::into_raw(private_data) as *mut ::std::os::raw::c_void,
}
}
Expand All @@ -152,8 +153,8 @@ unsafe extern "C" fn release<T>(array: *mut ArrowArray) {
array.release = None;
}

fn mmap_binary<O: Offset, T: Clone + AsRef<[u8]>>(
data: T,
fn mmap_binary<O: Offset, T: AsRef<[u8]>>(
data: Arc<T>,
node: &Node,
block_offset: usize,
buffers: &mut VecDeque<IpcBuffer>,
Expand All @@ -168,7 +169,7 @@ fn mmap_binary<O: Offset, T: Clone + AsRef<[u8]>>(
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let data_ref = data.as_ref();
let data_ref = data.as_ref().as_ref();

let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr());

Expand All @@ -186,8 +187,8 @@ fn mmap_binary<O: Offset, T: Clone + AsRef<[u8]>>(
))
}

fn mmap_fixed_size_binary<T: Clone + AsRef<[u8]>>(
data: T,
fn mmap_fixed_size_binary<T: AsRef<[u8]>>(
data: Arc<T>,
node: &Node,
block_offset: usize,
buffers: &mut VecDeque<IpcBuffer>,
Expand All @@ -202,7 +203,7 @@ fn mmap_fixed_size_binary<T: Clone + AsRef<[u8]>>(
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let data_ref = data.as_ref();
let data_ref = data.as_ref().as_ref();

let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr());

Expand All @@ -218,8 +219,8 @@ fn mmap_fixed_size_binary<T: Clone + AsRef<[u8]>>(
))
}

fn mmap_null<T: Clone + AsRef<[u8]>>(
data: T,
fn mmap_null<T: AsRef<[u8]>>(
data: Arc<T>,
node: &Node,
_block_offset: usize,
_buffers: &mut VecDeque<IpcBuffer>,
Expand All @@ -244,8 +245,8 @@ fn mmap_null<T: Clone + AsRef<[u8]>>(
))
}

fn mmap_boolean<T: Clone + AsRef<[u8]>>(
data: T,
fn mmap_boolean<T: AsRef<[u8]>>(
data: Arc<T>,
node: &Node,
block_offset: usize,
buffers: &mut VecDeque<IpcBuffer>,
Expand All @@ -260,7 +261,7 @@ fn mmap_boolean<T: Clone + AsRef<[u8]>>(
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let data_ref = data.as_ref();
let data_ref = data.as_ref().as_ref();

let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr());

Expand All @@ -280,13 +281,13 @@ fn mmap_boolean<T: Clone + AsRef<[u8]>>(
))
}

fn mmap_primitive<P: NativeType, T: Clone + AsRef<[u8]>>(
data: T,
fn mmap_primitive<P: NativeType, T: AsRef<[u8]>>(
data: Arc<T>,
node: &Node,
block_offset: usize,
buffers: &mut VecDeque<IpcBuffer>,
) -> Result<ArrowArray, Error> {
let data_ref = data.as_ref();
let data_ref = data.as_ref().as_ref();

let num_rows: usize = node
.length()
Expand All @@ -313,8 +314,8 @@ fn mmap_primitive<P: NativeType, T: Clone + AsRef<[u8]>>(
}

#[allow(clippy::too_many_arguments)]
fn mmap_list<O: Offset, T: Clone + AsRef<[u8]>>(
data: T,
fn mmap_list<O: Offset, T: AsRef<[u8]>>(
data: Arc<T>,
node: &Node,
block_offset: usize,
data_type: &DataType,
Expand All @@ -335,7 +336,7 @@ fn mmap_list<O: Offset, T: Clone + AsRef<[u8]>>(
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let data_ref = data.as_ref();
let data_ref = data.as_ref().as_ref();

let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr());

Expand Down Expand Up @@ -363,8 +364,8 @@ fn mmap_list<O: Offset, T: Clone + AsRef<[u8]>>(
}

#[allow(clippy::too_many_arguments)]
fn mmap_fixed_size_list<T: Clone + AsRef<[u8]>>(
data: T,
fn mmap_fixed_size_list<T: AsRef<[u8]>>(
data: Arc<T>,
node: &Node,
block_offset: usize,
data_type: &DataType,
Expand All @@ -387,7 +388,7 @@ fn mmap_fixed_size_list<T: Clone + AsRef<[u8]>>(
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let data_ref = data.as_ref();
let data_ref = data.as_ref().as_ref();

let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr());

Expand All @@ -412,8 +413,8 @@ fn mmap_fixed_size_list<T: Clone + AsRef<[u8]>>(
}

#[allow(clippy::too_many_arguments)]
fn mmap_struct<T: Clone + AsRef<[u8]>>(
data: T,
fn mmap_struct<T: AsRef<[u8]>>(
data: Arc<T>,
node: &Node,
block_offset: usize,
data_type: &DataType,
Expand All @@ -434,7 +435,7 @@ fn mmap_struct<T: Clone + AsRef<[u8]>>(
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let data_ref = data.as_ref();
let data_ref = data.as_ref().as_ref();

let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr());

Expand Down Expand Up @@ -466,8 +467,8 @@ fn mmap_struct<T: Clone + AsRef<[u8]>>(
}

#[allow(clippy::too_many_arguments)]
fn mmap_dict<K: DictionaryKey, T: Clone + AsRef<[u8]>>(
data: T,
fn mmap_dict<K: DictionaryKey, T: AsRef<[u8]>>(
data: Arc<T>,
node: &Node,
block_offset: usize,
_: &DataType,
Expand All @@ -486,7 +487,7 @@ fn mmap_dict<K: DictionaryKey, T: Clone + AsRef<[u8]>>(
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let data_ref = data.as_ref();
let data_ref = data.as_ref().as_ref();

let dictionary = dictionaries
.get(&ipc_field.dictionary_id.unwrap())
Expand All @@ -507,8 +508,8 @@ fn mmap_dict<K: DictionaryKey, T: Clone + AsRef<[u8]>>(
))
}

fn get_array<T: Clone + AsRef<[u8]>>(
data: T,
fn get_array<T: AsRef<[u8]>>(
data: Arc<T>,
block_offset: usize,
data_type: &DataType,
ipc_field: &IpcField,
Expand Down Expand Up @@ -587,8 +588,8 @@ fn get_array<T: Clone + AsRef<[u8]>>(
}

/// Maps a memory region to an [`Array`].
pub(crate) unsafe fn mmap<T: Clone + AsRef<[u8]>>(
data: T,
pub(crate) unsafe fn mmap<T: AsRef<[u8]>>(
data: Arc<T>,
block_offset: usize,
data_type: DataType,
ipc_field: &IpcField,
Expand Down
4 changes: 2 additions & 2 deletions src/io/ipc/read/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ pub struct FileMetadata {
/// The blocks in the file
///
/// A block indicates the regions in the file to read to get data
pub(crate) blocks: Vec<arrow_format::ipc::Block>,
pub blocks: Vec<arrow_format::ipc::Block>,

/// Dictionaries associated to each dict_id
pub(crate) dictionaries: Option<Vec<arrow_format::ipc::Block>>,

/// The total size of the file in bytes
pub(crate) size: u64,
pub size: u64,
}

fn read_dictionary_message<R: Read + Seek>(
Expand Down
27 changes: 14 additions & 13 deletions src/mmap/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Memory maps regions defined on the IPC format into [`Array`].
use std::collections::VecDeque;
use std::sync::Arc;

use crate::array::Array;
use crate::chunk::Chunk;
Expand Down Expand Up @@ -74,10 +75,10 @@ fn get_buffers_nodes(
Ok((buffers, field_nodes))
}

unsafe fn _mmap_record<T: Clone + AsRef<[u8]>>(
unsafe fn _mmap_record<T: AsRef<[u8]>>(
fields: &[Field],
ipc_fields: &[IpcField],
data: T,
data: Arc<T>,
batch: RecordBatchRef,
offset: usize,
dictionaries: &Dictionaries,
Expand All @@ -104,14 +105,14 @@ unsafe fn _mmap_record<T: Clone + AsRef<[u8]>>(
.and_then(Chunk::try_new)
}

unsafe fn _mmap_unchecked<T: Clone + AsRef<[u8]>>(
unsafe fn _mmap_unchecked<T: AsRef<[u8]>>(
fields: &[Field],
ipc_fields: &[IpcField],
data: T,
data: Arc<T>,
block: Block,
dictionaries: &Dictionaries,
) -> Result<Chunk<Box<dyn Array>>, Error> {
let (message, offset) = read_message(data.as_ref(), block)?;
let (message, offset) = read_message(data.as_ref().as_ref(), block)?;
let batch = get_record_batch(message)?;
_mmap_record(
fields,
Expand All @@ -134,15 +135,15 @@ unsafe fn _mmap_unchecked<T: Clone + AsRef<[u8]>>(
/// The caller must ensure that `data` contains a valid buffers, for example:
/// * Offsets in variable-sized containers must be in-bounds and increasing
/// * Utf8 data is valid
pub unsafe fn mmap_unchecked<T: Clone + AsRef<[u8]>>(
pub unsafe fn mmap_unchecked<T: AsRef<[u8]>>(
metadata: &FileMetadata,
dictionaries: &Dictionaries,
data: T,
data: Arc<T>,
chunk: usize,
) -> Result<Chunk<Box<dyn Array>>, Error> {
let block = metadata.blocks[chunk];

let (message, offset) = read_message(data.as_ref(), block)?;
let (message, offset) = read_message(data.as_ref().as_ref(), block)?;
let batch = get_record_batch(message)?;
_mmap_record(
&metadata.schema.fields,
Expand All @@ -154,13 +155,13 @@ pub unsafe fn mmap_unchecked<T: Clone + AsRef<[u8]>>(
)
}

unsafe fn mmap_dictionary<T: Clone + AsRef<[u8]>>(
unsafe fn mmap_dictionary<T: AsRef<[u8]>>(
metadata: &FileMetadata,
data: T,
data: Arc<T>,
block: Block,
dictionaries: &mut Dictionaries,
) -> Result<(), Error> {
let (message, offset) = read_message(data.as_ref(), block)?;
let (message, offset) = read_message(data.as_ref().as_ref(), block)?;
let batch = get_dictionary_batch(&message)?;

let id = batch
Expand Down Expand Up @@ -205,9 +206,9 @@ unsafe fn mmap_dictionary<T: Clone + AsRef<[u8]>>(
/// The caller must ensure that `data` contains a valid buffers, for example:
/// * Offsets in variable-sized containers must be in-bounds and increasing
/// * Utf8 data is valid
pub unsafe fn mmap_dictionaries_unchecked<T: Clone + AsRef<[u8]>>(
pub unsafe fn mmap_dictionaries_unchecked<T: AsRef<[u8]>>(
metadata: &FileMetadata,
data: T,
data: Arc<T>,
) -> Result<Dictionaries, Error> {
let blocks = if let Some(blocks) = &metadata.dictionaries {
blocks
Expand Down
5 changes: 3 additions & 2 deletions tests/it/io/ipc/mmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@ use arrow2::chunk::Chunk;
use arrow2::datatypes::{DataType, Field, Schema};
use arrow2::error::Result;
use arrow2::io::ipc::read::read_file_metadata;
use std::sync::Arc;

use super::write::file::write;

fn round_trip(array: Box<dyn Array>) -> Result<()> {
let schema = Schema::from(vec![Field::new("a", array.data_type().clone(), true)]);
let columns = Chunk::try_new(vec![array.clone()])?;

let data = write(&[columns], &schema, None, None)?;
let data = Arc::new(write(&[columns], &schema, None, None)?);

let metadata = read_file_metadata(&mut std::io::Cursor::new(&data))?;
let metadata = read_file_metadata(&mut std::io::Cursor::new(data.as_ref()))?;

let dictionaries =
unsafe { arrow2::mmap::mmap_dictionaries_unchecked(&metadata, data.clone())? };
Expand Down