Skip to content

Commit

Permalink
feat!: array chunk parallelism
Browse files Browse the repository at this point in the history
  • Loading branch information
LDeakin committed Oct 14, 2023
1 parent cfe005b commit 69b3da9
Show file tree
Hide file tree
Showing 8 changed files with 112 additions and 58 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added `array_subset::iter_linearised_indices_unchecked`
- Added parallel encoding/decoding to tests
- Added `array_subset::ArrayStoreBytesError`, `store_bytes`, and `store_bytes_unchecked`
- Added `parallel_chunks` option to `Array`, enabled by default. Lets `store_array_subset` and `retrieve_array_subset` (and their variants) encode/decode chunks in parallel

### Changed
- **Breaking**: `array::data_type::DataType` is now marked `#[non_exhaustive]`
Expand Down
8 changes: 7 additions & 1 deletion benches/array_uncompressed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,5 +132,11 @@ fn array_read_all_sharded(c: &mut Criterion) {
group.finish();
}

criterion_group!(benches, array_write_all, array_read_all, array_write_all_sharded, array_read_all_sharded);
criterion_group!(
benches,
array_write_all,
array_read_all,
array_write_all_sharded,
array_read_all_sharded
);
criterion_main!(benches);
73 changes: 59 additions & 14 deletions src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub mod data_type;
mod dimension_name;
mod fill_value;
mod fill_value_metadata;
mod unsafe_cell_slice;

use std::{collections::HashMap, sync::Arc};

Expand All @@ -57,6 +58,7 @@ pub use self::{
};

use parking_lot::Mutex;
use rayon::prelude::{ParallelBridge, ParallelIterator};
use safe_transmute::TriviallyTransmutable;

use crate::{
Expand All @@ -73,6 +75,7 @@ use self::{
array_errors::TransmuteError,
chunk_grid::InvalidChunkGridIndicesError,
codec::{ArrayCodecTraits, ArrayToBytesCodecTraits, StoragePartialDecoder},
unsafe_cell_slice::UnsafeCellSlice,
};

/// An ND index to an element in an array.
Expand Down Expand Up @@ -164,6 +167,8 @@ pub struct Array<TStorage: ?Sized> {
additional_fields: AdditionalFields,
/// If true, codecs run with multithreading (where supported)
parallel_codecs: bool,
/// If true, chunks are encoded and stored in parallel
parallel_chunks: bool,
/// Chunk locks.
chunk_locks: Mutex<HashMap<Vec<u64>, Arc<Mutex<()>>>>,
}
Expand Down Expand Up @@ -238,6 +243,7 @@ impl<TStorage: ?Sized> Array<TStorage> {
storage_transformers,
dimension_names: metadata.dimension_names,
parallel_codecs: true,
parallel_chunks: true,
chunk_locks: Mutex::default(),
})
}
Expand Down Expand Up @@ -319,19 +325,32 @@ impl<TStorage: ?Sized> Array<TStorage> {
&self.additional_fields
}

/// Returns true if codecs should use multiple threads for encoding and decoding where supported.
/// Returns true if codecs can use multiple threads for encoding and decoding (where supported).
#[must_use]
pub fn parallel_codecs(&self) -> bool {
self.parallel_codecs
}

/// Set whether or not to use multithreaded codec encoding/decoding.
/// Enable or disable multithreaded codec encoding/decoding. Enabled by default.
///
/// It may be advantageous to turn this off if parallelisation is external (e.g. parallel chunk decoding).
/// It may be advantageous to turn this off if parallelisation is external to avoid thrashing.
pub fn set_parallel_codecs(&mut self, parallel_codecs: bool) {
self.parallel_codecs = parallel_codecs;
}

/// Returns true if chunks are encoded/decoded in parallel by the [`store_array_subset`](Self::store_array_subset), [`retrieve_array_subset`](Self::retrieve_array_subset), and their variants.
#[must_use]
pub fn parallel_chunks(&self) -> bool {
self.parallel_chunks
}

/// Enable or disable multithreaded chunk encoding/decoding. Enabled by default.
///
/// It may be advantageous to disable parallel codecs if parallel chunks is enabled.
pub fn set_parallel_chunks(&mut self, parallel_chunks: bool) {
self.parallel_chunks = parallel_chunks;
}

/// Create [`ArrayMetadata`].
#[must_use]
pub fn metadata(&self) -> ArrayMetadata {
Expand Down Expand Up @@ -556,16 +575,8 @@ impl<TStorage: ?Sized + ReadableStorageTraits> Array<TStorage> {
));
}

// Allocate the output data
let element_size = self.fill_value().size() as u64;
let size_output = usize::try_from(array_subset.num_elements() * element_size).unwrap();
let mut output: Vec<u8> = vec![0; size_output];

// Find the chunks intersecting this array subset
let chunks = self.chunks_in_array_subset(array_subset)?;

// Read those chunks
for chunk_indices in chunks.iter_indices() {
let decode_chunk = |chunk_indices: Vec<u64>, output: &mut [u8]| -> Result<(), ArrayError> {
// Get the subset of the array corresponding to the chunk
let chunk_subset_in_array =
unsafe { self.chunk_grid().subset_unchecked(&chunk_indices) };
Expand Down Expand Up @@ -593,6 +604,26 @@ impl<TStorage: ?Sized + ReadableStorageTraits> Array<TStorage> {
.copy_from_slice(&decoded_bytes[decoded_offset..decoded_offset + length]);
decoded_offset += length;
}
Ok(())
};

// Find the chunks intersecting this array subset
let chunks = self.chunks_in_array_subset(array_subset)?;

// Decode chunks and copy to output
let size_output = usize::try_from(array_subset.num_elements() * element_size).unwrap();
let mut output: Vec<u8> = vec![0; size_output];
if self.parallel_chunks {
let output = UnsafeCellSlice::new(output.as_mut_slice());
chunks
.iter_indices()
.par_bridge()
.map(|chunk_indices| decode_chunk(chunk_indices, unsafe { output.get() }))
.collect::<Result<Vec<_>, ArrayError>>()?;
} else {
for chunk_indices in chunks.iter_indices() {
decode_chunk(chunk_indices, &mut output)?;
}
}

Ok(output)
Expand Down Expand Up @@ -930,6 +961,7 @@ impl<TStorage: ?Sized + ReadableStorageTraits + WritableStorageTraits> Array<TSt
///
/// Returns an [`ArrayError`] if
/// - `array_subset` is invalid or out of bounds of the array,
/// - the length of `subset_bytes` does not match the expected length governed by the shape of the array subset and the data type size,
/// - there is a codec encoding error, or
/// - an underlying store error.
pub fn store_array_subset(
Expand All @@ -956,7 +988,7 @@ impl<TStorage: ?Sized + ReadableStorageTraits + WritableStorageTraits> Array<TSt
let chunks = self.chunks_in_array_subset(array_subset)?;

let element_size = self.data_type().size();
for chunk_indices in chunks.iter_indices() {
let store_chunk = |chunk_indices: Vec<u64>| -> Result<(), ArrayError> {
let chunk_subset_in_array =
unsafe { self.chunk_grid().subset_unchecked(&chunk_indices) };

Expand Down Expand Up @@ -985,6 +1017,19 @@ impl<TStorage: ?Sized + ReadableStorageTraits + WritableStorageTraits> Array<TSt
&chunk_subset_bytes,
)?;
}
Ok(())
};

if self.parallel_chunks {
chunks
.iter_indices()
.par_bridge()
.map(store_chunk)
.collect::<Result<Vec<_>, _>>()?;
} else {
for chunk_indices in chunks.iter_indices() {
store_chunk(chunk_indices)?;
}
}
Ok(())
}
Expand All @@ -996,7 +1041,7 @@ impl<TStorage: ?Sized + ReadableStorageTraits + WritableStorageTraits> Array<TSt
/// # Errors
///
/// Returns an [`ArrayError`] if
/// - the size of `T` does not match the data type size, or
/// - the size of `T` does not match the data type size, or
/// - a [`store_array_subset`](Array::store_array_subset) error condition is met.
pub fn store_array_subset_elements<T: TriviallyTransmutable>(
&self,
Expand Down
11 changes: 11 additions & 0 deletions src/array/array_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ pub struct ArrayBuilder {
dimension_names: Option<Vec<DimensionName>>,
additional_fields: AdditionalFields,
parallel_codecs: bool,
parallel_chunks: bool,
}

impl ArrayBuilder {
Expand Down Expand Up @@ -97,6 +98,7 @@ impl ArrayBuilder {
dimension_names: None,
additional_fields: AdditionalFields::default(),
parallel_codecs: true,
parallel_chunks: true,
}
}

Expand Down Expand Up @@ -203,6 +205,14 @@ impl ArrayBuilder {
self
}

/// Set whether or not to use multithreaded chunk encoding and decoding.
///
/// If parallel chunks is not set, it defaults to true.
pub fn parallel_chunks(&mut self, parallel_chunks: bool) -> &mut Self {
self.parallel_chunks = parallel_chunks;
self
}

/// Build into an [`Array`].
///
/// # Errors
Expand Down Expand Up @@ -248,6 +258,7 @@ impl ArrayBuilder {
dimension_names: self.dimension_names.clone(),
additional_fields: self.additional_fields.clone(),
parallel_codecs: self.parallel_codecs,
parallel_chunks: self.parallel_chunks,
chunk_locks: parking_lot::Mutex::default(),
})
}
Expand Down
29 changes: 0 additions & 29 deletions src/array/codec/array_to_bytes/sharding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,35 +81,6 @@ fn decode_shard_index(
.collect())
}

/// An unsafe cell slice.
///
/// This is used internally by the sharding codec for parallel decoding and partial decoding.
/// It is used to write to subsets of a slice from multiple threads without locking.
/// This enables inner chunks to be decoded and written to an output array in parallel.
#[derive(Copy, Clone)]
struct UnsafeCellSlice<'a, T>(&'a [std::cell::UnsafeCell<T>]);

unsafe impl<'a, T: Send + Sync> Send for UnsafeCellSlice<'a, T> {}
unsafe impl<'a, T: Send + Sync> Sync for UnsafeCellSlice<'a, T> {}

impl<'a, T: Copy> UnsafeCellSlice<'a, T> {
pub fn new(slice: &'a mut [T]) -> Self {
let ptr = slice as *mut [T] as *const [std::cell::UnsafeCell<T>];
Self(unsafe { &*ptr })
}

/// Copies all elements from `src` into `self`, using a memcpy.
///
/// # Safety
///
/// Undefined behaviour if two threads write to the same region without sync.
pub unsafe fn copy_from_slice(&self, offset: usize, src: &[T]) {
let ptr = self.0[offset].get();
let slice = std::slice::from_raw_parts_mut(ptr, src.len());
slice.copy_from_slice(src);
}
}

#[cfg(test)]
mod tests {
use crate::{array::codec::ArrayCodecTraits, array_subset::ArraySubset};
Expand Down
14 changes: 6 additions & 8 deletions src/array/codec/array_to_bytes/sharding/sharding_codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
ArrayCodecTraits, ArrayPartialDecoderTraits, ArrayToBytesCodecTraits,
BytesPartialDecoderTraits, Codec, CodecChain, CodecError, CodecPlugin, CodecTraits,
},
ArrayRepresentation, BytesRepresentation,
ArrayRepresentation, BytesRepresentation, UnsafeCellSlice,
},
array_subset::ArraySubset,
metadata::Metadata,
Expand All @@ -14,7 +14,7 @@ use crate::{
use super::{
calculate_chunks_per_shard, compute_index_encoded_size, decode_shard_index,
sharding_index_decoded_representation, sharding_partial_decoder, ShardingCodecConfiguration,
ShardingCodecConfigurationV1, UnsafeCellSlice,
ShardingCodecConfigurationV1,
};

use rayon::prelude::*;
Expand Down Expand Up @@ -426,6 +426,8 @@ impl ShardingCodec {
.enumerate()
.par_bridge()
.map(|(chunk_index, (_chunk_indices, chunk_subset))| {
let shard_slice = unsafe { shard_slice.get() };

// Read the offset/size
let offset = shard_index[chunk_index * 2];
let size = shard_index[chunk_index * 2 + 1];
Expand All @@ -445,12 +447,8 @@ impl ShardingCodec {
} {
let shard_offset = usize::try_from(index * element_size).unwrap();
let length = usize::try_from(num_elements * element_size).unwrap();
unsafe {
shard_slice.copy_from_slice(
shard_offset,
&decoded_chunk[data_idx..data_idx + length],
);
}
shard_slice[shard_offset..shard_offset + length]
.copy_from_slice(&decoded_chunk[data_idx..data_idx + length]);
data_idx += length;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ use crate::{
ArrayPartialDecoderTraits, ArraySubset, ArrayToBytesCodecTraits,
ByteIntervalPartialDecoder, BytesPartialDecoderTraits, CodecChain, CodecError,
},
ravel_indices, ArrayRepresentation, ArrayShape, BytesRepresentation,
ravel_indices, ArrayRepresentation, ArrayShape, BytesRepresentation, UnsafeCellSlice,
},
byte_range::ByteRange,
};

use super::{
calculate_chunks_per_shard, compute_index_encoded_size, decode_shard_index,
sharding_index_decoded_representation, UnsafeCellSlice,
sharding_index_decoded_representation,
};

/// The partial decoder for the sharding codec.
Expand Down Expand Up @@ -258,6 +258,8 @@ impl ArrayPartialDecoderTraits for ShardingPartialDecoder<'_> {
unsafe { array_subset.iter_chunks_unchecked(chunk_representation.shape()) }
.par_bridge()
.map(|(chunk_indices, chunk_subset)| {
let out_array_subset_slice = unsafe { out_array_subset_slice.get() };

let shard_index_idx: usize =
usize::try_from(ravel_indices(&chunk_indices, &chunks_per_shard) * 2)
.unwrap();
Expand Down Expand Up @@ -293,12 +295,10 @@ impl ArrayPartialDecoderTraits for ShardingPartialDecoder<'_> {
let output_offset =
usize::try_from(array_subset_element_index * element_size).unwrap();
let length = usize::try_from(num_elements * element_size).unwrap();
unsafe {
out_array_subset_slice.copy_from_slice(
output_offset,
out_array_subset_slice[output_offset..output_offset + length]
.copy_from_slice(
&decoded_bytes[decoded_offset..decoded_offset + length],
);
}
decoded_offset += length;
}
Ok::<_, CodecError>(())
Expand Down
22 changes: 22 additions & 0 deletions src/array/unsafe_cell_slice.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/// An unsafe cell slice.
///
/// This is used internally for parallel chunk decoding.
/// It is used to write to subsets of a slice from multiple threads without locking.
#[derive(Copy, Clone)]
pub struct UnsafeCellSlice<'a, T>(&'a [std::cell::UnsafeCell<T>]);

unsafe impl<'a, T: Send + Sync> Send for UnsafeCellSlice<'a, T> {}
unsafe impl<'a, T: Send + Sync> Sync for UnsafeCellSlice<'a, T> {}

impl<'a, T: Copy> UnsafeCellSlice<'a, T> {
pub fn new(slice: &'a mut [T]) -> Self {
let ptr = slice as *mut [T] as *const [std::cell::UnsafeCell<T>];
Self(unsafe { &*ptr })
}

#[allow(clippy::mut_from_ref)]
pub unsafe fn get(&self) -> &mut [T] {
let ptr = self.0[0].get();
std::slice::from_raw_parts_mut(ptr, self.0.len())
}
}

0 comments on commit 69b3da9

Please sign in to comment.