Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
IPC mmap: use Arc<T> where T: AsRef<[u8]>
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Aug 4, 2022
1 parent 8562dec commit 996506e
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 47 deletions.
69 changes: 35 additions & 34 deletions src/ffi/mmap.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::VecDeque;
use std::sync::Arc;

use crate::array::{Array, DictionaryKey, FixedSizeListArray, ListArray, Offset, StructArray};
use crate::datatypes::DataType;
Expand Down Expand Up @@ -85,11 +86,11 @@ fn get_validity<'a>(
}

fn create_array<
T: Clone + AsRef<[u8]>,
T: AsRef<[u8]>,
I: Iterator<Item = Option<*const u8>>,
II: Iterator<Item = ArrowArray>,
>(
data: T,
data: Arc<T>,
num_rows: usize,
null_count: usize,
buffers: I,
Expand All @@ -111,7 +112,7 @@ fn create_array<

let dictionary_ptr = dictionary.map(|array| Box::into_raw(Box::new(array)));

let mut private_data = Box::new(PrivateData::<T> {
let mut private_data = Box::new(PrivateData::<Arc<T>> {
data,
buffers_ptr,
children_ptr,
Expand All @@ -127,7 +128,7 @@ fn create_array<
buffers: private_data.buffers_ptr.as_mut_ptr(),
children: private_data.children_ptr.as_mut_ptr(),
dictionary: private_data.dictionary_ptr.unwrap_or(std::ptr::null_mut()),
release: Some(release::<T>),
release: Some(release::<Arc<T>>),
private_data: Box::into_raw(private_data) as *mut ::std::os::raw::c_void,
}
}
Expand All @@ -152,8 +153,8 @@ unsafe extern "C" fn release<T>(array: *mut ArrowArray) {
array.release = None;
}

fn mmap_binary<O: Offset, T: Clone + AsRef<[u8]>>(
data: T,
fn mmap_binary<O: Offset, T: AsRef<[u8]>>(
data: Arc<T>,
node: &Node,
block_offset: usize,
buffers: &mut VecDeque<IpcBuffer>,
Expand All @@ -168,7 +169,7 @@ fn mmap_binary<O: Offset, T: Clone + AsRef<[u8]>>(
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let data_ref = data.as_ref();
let data_ref = data.as_ref().as_ref();

let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr());

Expand All @@ -186,8 +187,8 @@ fn mmap_binary<O: Offset, T: Clone + AsRef<[u8]>>(
))
}

fn mmap_fixed_size_binary<T: Clone + AsRef<[u8]>>(
data: T,
fn mmap_fixed_size_binary<T: AsRef<[u8]>>(
data: Arc<T>,
node: &Node,
block_offset: usize,
buffers: &mut VecDeque<IpcBuffer>,
Expand All @@ -202,7 +203,7 @@ fn mmap_fixed_size_binary<T: Clone + AsRef<[u8]>>(
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let data_ref = data.as_ref();
let data_ref = data.as_ref().as_ref();

let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr());

Expand All @@ -218,8 +219,8 @@ fn mmap_fixed_size_binary<T: Clone + AsRef<[u8]>>(
))
}

fn mmap_null<T: Clone + AsRef<[u8]>>(
data: T,
fn mmap_null<T: AsRef<[u8]>>(
data: Arc<T>,
node: &Node,
_block_offset: usize,
_buffers: &mut VecDeque<IpcBuffer>,
Expand All @@ -244,8 +245,8 @@ fn mmap_null<T: Clone + AsRef<[u8]>>(
))
}

fn mmap_boolean<T: Clone + AsRef<[u8]>>(
data: T,
fn mmap_boolean<T: AsRef<[u8]>>(
data: Arc<T>,
node: &Node,
block_offset: usize,
buffers: &mut VecDeque<IpcBuffer>,
Expand All @@ -260,7 +261,7 @@ fn mmap_boolean<T: Clone + AsRef<[u8]>>(
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let data_ref = data.as_ref();
let data_ref = data.as_ref().as_ref();

let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr());

Expand All @@ -280,13 +281,13 @@ fn mmap_boolean<T: Clone + AsRef<[u8]>>(
))
}

fn mmap_primitive<P: NativeType, T: Clone + AsRef<[u8]>>(
data: T,
fn mmap_primitive<P: NativeType, T: AsRef<[u8]>>(
data: Arc<T>,
node: &Node,
block_offset: usize,
buffers: &mut VecDeque<IpcBuffer>,
) -> Result<ArrowArray, Error> {
let data_ref = data.as_ref();
let data_ref = data.as_ref().as_ref();

let num_rows: usize = node
.length()
Expand All @@ -313,8 +314,8 @@ fn mmap_primitive<P: NativeType, T: Clone + AsRef<[u8]>>(
}

#[allow(clippy::too_many_arguments)]
fn mmap_list<O: Offset, T: Clone + AsRef<[u8]>>(
data: T,
fn mmap_list<O: Offset, T: AsRef<[u8]>>(
data: Arc<T>,
node: &Node,
block_offset: usize,
data_type: &DataType,
Expand All @@ -335,7 +336,7 @@ fn mmap_list<O: Offset, T: Clone + AsRef<[u8]>>(
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let data_ref = data.as_ref();
let data_ref = data.as_ref().as_ref();

let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr());

Expand Down Expand Up @@ -363,8 +364,8 @@ fn mmap_list<O: Offset, T: Clone + AsRef<[u8]>>(
}

#[allow(clippy::too_many_arguments)]
fn mmap_fixed_size_list<T: Clone + AsRef<[u8]>>(
data: T,
fn mmap_fixed_size_list<T: AsRef<[u8]>>(
data: Arc<T>,
node: &Node,
block_offset: usize,
data_type: &DataType,
Expand All @@ -387,7 +388,7 @@ fn mmap_fixed_size_list<T: Clone + AsRef<[u8]>>(
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let data_ref = data.as_ref();
let data_ref = data.as_ref().as_ref();

let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr());

Expand All @@ -412,8 +413,8 @@ fn mmap_fixed_size_list<T: Clone + AsRef<[u8]>>(
}

#[allow(clippy::too_many_arguments)]
fn mmap_struct<T: Clone + AsRef<[u8]>>(
data: T,
fn mmap_struct<T: AsRef<[u8]>>(
data: Arc<T>,
node: &Node,
block_offset: usize,
data_type: &DataType,
Expand All @@ -434,7 +435,7 @@ fn mmap_struct<T: Clone + AsRef<[u8]>>(
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let data_ref = data.as_ref();
let data_ref = data.as_ref().as_ref();

let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr());

Expand Down Expand Up @@ -466,8 +467,8 @@ fn mmap_struct<T: Clone + AsRef<[u8]>>(
}

#[allow(clippy::too_many_arguments)]
fn mmap_dict<K: DictionaryKey, T: Clone + AsRef<[u8]>>(
data: T,
fn mmap_dict<K: DictionaryKey, T: AsRef<[u8]>>(
data: Arc<T>,
node: &Node,
block_offset: usize,
_: &DataType,
Expand All @@ -486,7 +487,7 @@ fn mmap_dict<K: DictionaryKey, T: Clone + AsRef<[u8]>>(
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let data_ref = data.as_ref();
let data_ref = data.as_ref().as_ref();

let dictionary = dictionaries
.get(&ipc_field.dictionary_id.unwrap())
Expand All @@ -507,8 +508,8 @@ fn mmap_dict<K: DictionaryKey, T: Clone + AsRef<[u8]>>(
))
}

fn get_array<T: Clone + AsRef<[u8]>>(
data: T,
fn get_array<T: AsRef<[u8]>>(
data: Arc<T>,
block_offset: usize,
data_type: &DataType,
ipc_field: &IpcField,
Expand Down Expand Up @@ -587,8 +588,8 @@ fn get_array<T: Clone + AsRef<[u8]>>(
}

/// Maps a memory region to an [`Array`].
pub(crate) unsafe fn mmap<T: Clone + AsRef<[u8]>>(
data: T,
pub(crate) unsafe fn mmap<T: AsRef<[u8]>>(
data: Arc<T>,
block_offset: usize,
data_type: DataType,
ipc_field: &IpcField,
Expand Down
27 changes: 14 additions & 13 deletions src/mmap/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Memory maps regions defined on the IPC format into [`Array`].
use std::collections::VecDeque;
use std::sync::Arc;

use crate::array::Array;
use crate::chunk::Chunk;
Expand Down Expand Up @@ -74,10 +75,10 @@ fn get_buffers_nodes(
Ok((buffers, field_nodes))
}

unsafe fn _mmap_record<T: Clone + AsRef<[u8]>>(
unsafe fn _mmap_record<T: AsRef<[u8]>>(
fields: &[Field],
ipc_fields: &[IpcField],
data: T,
data: Arc<T>,
batch: RecordBatchRef,
offset: usize,
dictionaries: &Dictionaries,
Expand All @@ -104,14 +105,14 @@ unsafe fn _mmap_record<T: Clone + AsRef<[u8]>>(
.and_then(Chunk::try_new)
}

unsafe fn _mmap_unchecked<T: Clone + AsRef<[u8]>>(
unsafe fn _mmap_unchecked<T: AsRef<[u8]>>(
fields: &[Field],
ipc_fields: &[IpcField],
data: T,
data: Arc<T>,
block: Block,
dictionaries: &Dictionaries,
) -> Result<Chunk<Box<dyn Array>>, Error> {
let (message, offset) = read_message(data.as_ref(), block)?;
let (message, offset) = read_message(data.as_ref().as_ref(), block)?;
let batch = get_record_batch(message)?;
_mmap_record(
fields,
Expand All @@ -134,15 +135,15 @@ unsafe fn _mmap_unchecked<T: Clone + AsRef<[u8]>>(
/// The caller must ensure that `data` contains a valid buffers, for example:
/// * Offsets in variable-sized containers must be in-bounds and increasing
/// * Utf8 data is valid
pub unsafe fn mmap_unchecked<T: Clone + AsRef<[u8]>>(
pub unsafe fn mmap_unchecked<T: AsRef<[u8]>>(
metadata: &FileMetadata,
dictionaries: &Dictionaries,
data: T,
data: Arc<T>,
chunk: usize,
) -> Result<Chunk<Box<dyn Array>>, Error> {
let block = metadata.blocks[chunk];

let (message, offset) = read_message(data.as_ref(), block)?;
let (message, offset) = read_message(data.as_ref().as_ref(), block)?;
let batch = get_record_batch(message)?;
_mmap_record(
&metadata.schema.fields,
Expand All @@ -154,13 +155,13 @@ pub unsafe fn mmap_unchecked<T: Clone + AsRef<[u8]>>(
)
}

unsafe fn mmap_dictionary<T: Clone + AsRef<[u8]>>(
unsafe fn mmap_dictionary<T: AsRef<[u8]>>(
metadata: &FileMetadata,
data: T,
data: Arc<T>,
block: Block,
dictionaries: &mut Dictionaries,
) -> Result<(), Error> {
let (message, offset) = read_message(data.as_ref(), block)?;
let (message, offset) = read_message(data.as_ref().as_ref(), block)?;
let batch = get_dictionary_batch(&message)?;

let id = batch
Expand Down Expand Up @@ -205,9 +206,9 @@ unsafe fn mmap_dictionary<T: Clone + AsRef<[u8]>>(
/// The caller must ensure that `data` contains a valid buffers, for example:
/// * Offsets in variable-sized containers must be in-bounds and increasing
/// * Utf8 data is valid
pub unsafe fn mmap_dictionaries_unchecked<T: Clone + AsRef<[u8]>>(
pub unsafe fn mmap_dictionaries_unchecked<T: AsRef<[u8]>>(
metadata: &FileMetadata,
data: T,
data: Arc<T>,
) -> Result<Dictionaries, Error> {
let blocks = if let Some(blocks) = &metadata.dictionaries {
blocks
Expand Down

0 comments on commit 996506e

Please sign in to comment.