From 69b3da95c597e0668870cdd141a0eefb75e7b4da Mon Sep 17 00:00:00 2001 From: Lachlan Deakin Date: Sat, 14 Oct 2023 10:32:28 +1100 Subject: [PATCH] feat!: array chunk parallelism --- CHANGELOG.md | 1 + benches/array_uncompressed.rs | 8 +- src/array.rs | 73 +++++++++++++++---- src/array/array_builder.rs | 11 +++ src/array/codec/array_to_bytes/sharding.rs | 29 -------- .../array_to_bytes/sharding/sharding_codec.rs | 14 ++-- .../sharding/sharding_partial_decoder.rs | 12 +-- src/array/unsafe_cell_slice.rs | 22 ++++++ 8 files changed, 112 insertions(+), 58 deletions(-) create mode 100644 src/array/unsafe_cell_slice.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index c1a97965..5a368964 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added `array_subset::iter_linearised_indices_unchecked` - Added parallel encoding/decoding to tests - Added `array_subset::ArrayStoreBytesError`, `store_bytes`, and `store_bytes_unchecked` + - Added `parallel_chunks` option to `Array`, enabled by default. Lets `store_array_subset` and `retrieve_array_subset` (and their variants) encode/decode chunks in parallel ### Changed - **Breaking**: `array::data_type::DataType` is now marked `#[non_exhaustive]` diff --git a/benches/array_uncompressed.rs b/benches/array_uncompressed.rs index 55f3ba15..140f6cd4 100644 --- a/benches/array_uncompressed.rs +++ b/benches/array_uncompressed.rs @@ -132,5 +132,11 @@ fn array_read_all_sharded(c: &mut Criterion) { group.finish(); } -criterion_group!(benches, array_write_all, array_read_all, array_write_all_sharded, array_read_all_sharded); +criterion_group!( + benches, + array_write_all, + array_read_all, + array_write_all_sharded, + array_read_all_sharded +); criterion_main!(benches); diff --git a/src/array.rs b/src/array.rs index 80d5e7b7..b510cab4 100644 --- a/src/array.rs +++ b/src/array.rs @@ -38,6 +38,7 @@ pub mod data_type; mod dimension_name; mod fill_value; mod fill_value_metadata; +mod unsafe_cell_slice; use std::{collections::HashMap, sync::Arc}; @@ -57,6 +58,7 @@ pub use self::{ }; use parking_lot::Mutex; +use rayon::prelude::{ParallelBridge, ParallelIterator}; use safe_transmute::TriviallyTransmutable; use crate::{ @@ -73,6 +75,7 @@ use self::{ array_errors::TransmuteError, chunk_grid::InvalidChunkGridIndicesError, codec::{ArrayCodecTraits, ArrayToBytesCodecTraits, StoragePartialDecoder}, + unsafe_cell_slice::UnsafeCellSlice, }; /// An ND index to an element in an array. @@ -164,6 +167,8 @@ pub struct Array { additional_fields: AdditionalFields, /// If true, codecs run with multithreading (where supported) parallel_codecs: bool, + /// If true, chunks are encoded and stored in parallel + parallel_chunks: bool, /// Chunk locks. chunk_locks: Mutex, Arc>>>, } @@ -238,6 +243,7 @@ impl Array { storage_transformers, dimension_names: metadata.dimension_names, parallel_codecs: true, + parallel_chunks: true, chunk_locks: Mutex::default(), }) } @@ -319,19 +325,32 @@ impl Array { &self.additional_fields } - /// Returns true if codecs should use multiple threads for encoding and decoding where supported. + /// Returns true if codecs can use multiple threads for encoding and decoding (where supported). #[must_use] pub fn parallel_codecs(&self) -> bool { self.parallel_codecs } - /// Set whether or not to use multithreaded codec encoding/decoding. + /// Enable or disable multithreaded codec encoding/decoding. Enabled by default. /// - /// It may be advantageous to turn this off if parallelisation is external (e.g. parallel chunk decoding). + /// It may be advantageous to turn this off if parallelisation is external to avoid thrashing. pub fn set_parallel_codecs(&mut self, parallel_codecs: bool) { self.parallel_codecs = parallel_codecs; } + /// Returns true if chunks are encoded/decoded in parallel by the [`store_array_subset`](Self::store_array_subset), [`retrieve_array_subset`](Self::retrieve_array_subset), and their variants. + #[must_use] + pub fn parallel_chunks(&self) -> bool { + self.parallel_chunks + } + + /// Enable or disable multithreaded chunk encoding/decoding. Enabled by default. + /// + /// It may be advantageous to disable parallel codecs if parallel chunks is enabled. + pub fn set_parallel_chunks(&mut self, parallel_chunks: bool) { + self.parallel_chunks = parallel_chunks; + } + /// Create [`ArrayMetadata`]. #[must_use] pub fn metadata(&self) -> ArrayMetadata { @@ -556,16 +575,8 @@ impl Array { )); } - // Allocate the output data let element_size = self.fill_value().size() as u64; - let size_output = usize::try_from(array_subset.num_elements() * element_size).unwrap(); - let mut output: Vec = vec![0; size_output]; - - // Find the chunks intersecting this array subset - let chunks = self.chunks_in_array_subset(array_subset)?; - - // Read those chunks - for chunk_indices in chunks.iter_indices() { + let decode_chunk = |chunk_indices: Vec, output: &mut [u8]| -> Result<(), ArrayError> { // Get the subset of the array corresponding to the chunk let chunk_subset_in_array = unsafe { self.chunk_grid().subset_unchecked(&chunk_indices) }; @@ -593,6 +604,26 @@ impl Array { .copy_from_slice(&decoded_bytes[decoded_offset..decoded_offset + length]); decoded_offset += length; } + Ok(()) + }; + + // Find the chunks intersecting this array subset + let chunks = self.chunks_in_array_subset(array_subset)?; + + // Decode chunks and copy to output + let size_output = usize::try_from(array_subset.num_elements() * element_size).unwrap(); + let mut output: Vec = vec![0; size_output]; + if self.parallel_chunks { + let output = UnsafeCellSlice::new(output.as_mut_slice()); + chunks + .iter_indices() + .par_bridge() + .map(|chunk_indices| decode_chunk(chunk_indices, unsafe { output.get() })) + .collect::, ArrayError>>()?; + } else { + for chunk_indices in chunks.iter_indices() { + decode_chunk(chunk_indices, &mut output)?; + } } Ok(output) @@ -930,6 +961,7 @@ impl Array Array| -> Result<(), ArrayError> { let chunk_subset_in_array = unsafe { self.chunk_grid().subset_unchecked(&chunk_indices) }; @@ -985,6 +1017,19 @@ impl Array, _>>()?; + } else { + for chunk_indices in chunks.iter_indices() { + store_chunk(chunk_indices)?; + } } Ok(()) } @@ -996,7 +1041,7 @@ impl Array( &self, diff --git a/src/array/array_builder.rs b/src/array/array_builder.rs index 574cc040..569c3651 100644 --- a/src/array/array_builder.rs +++ b/src/array/array_builder.rs @@ -69,6 +69,7 @@ pub struct ArrayBuilder { dimension_names: Option>, additional_fields: AdditionalFields, parallel_codecs: bool, + parallel_chunks: bool, } impl ArrayBuilder { @@ -97,6 +98,7 @@ impl ArrayBuilder { dimension_names: None, additional_fields: AdditionalFields::default(), parallel_codecs: true, + parallel_chunks: true, } } @@ -203,6 +205,14 @@ impl ArrayBuilder { self } + /// Set whether or not to use multithreaded chunk encoding and decoding. + /// + /// If parallel chunks is not set, it defaults to true. + pub fn parallel_chunks(&mut self, parallel_chunks: bool) -> &mut Self { + self.parallel_chunks = parallel_chunks; + self + } + /// Build into an [`Array`]. /// /// # Errors @@ -248,6 +258,7 @@ impl ArrayBuilder { dimension_names: self.dimension_names.clone(), additional_fields: self.additional_fields.clone(), parallel_codecs: self.parallel_codecs, + parallel_chunks: self.parallel_chunks, chunk_locks: parking_lot::Mutex::default(), }) } diff --git a/src/array/codec/array_to_bytes/sharding.rs b/src/array/codec/array_to_bytes/sharding.rs index 6a483825..4c437046 100644 --- a/src/array/codec/array_to_bytes/sharding.rs +++ b/src/array/codec/array_to_bytes/sharding.rs @@ -81,35 +81,6 @@ fn decode_shard_index( .collect()) } -/// An unsafe cell slice. -/// -/// This is used internally by the sharding codec for parallel decoding and partial decoding. -/// It is used to write to subsets of a slice from multiple threads without locking. -/// This enables inner chunks to be decoded and written to an output array in parallel. -#[derive(Copy, Clone)] -struct UnsafeCellSlice<'a, T>(&'a [std::cell::UnsafeCell]); - -unsafe impl<'a, T: Send + Sync> Send for UnsafeCellSlice<'a, T> {} -unsafe impl<'a, T: Send + Sync> Sync for UnsafeCellSlice<'a, T> {} - -impl<'a, T: Copy> UnsafeCellSlice<'a, T> { - pub fn new(slice: &'a mut [T]) -> Self { - let ptr = slice as *mut [T] as *const [std::cell::UnsafeCell]; - Self(unsafe { &*ptr }) - } - - /// Copies all elements from `src` into `self`, using a memcpy. - /// - /// # Safety - /// - /// Undefined behaviour if two threads write to the same region without sync. - pub unsafe fn copy_from_slice(&self, offset: usize, src: &[T]) { - let ptr = self.0[offset].get(); - let slice = std::slice::from_raw_parts_mut(ptr, src.len()); - slice.copy_from_slice(src); - } -} - #[cfg(test)] mod tests { use crate::{array::codec::ArrayCodecTraits, array_subset::ArraySubset}; diff --git a/src/array/codec/array_to_bytes/sharding/sharding_codec.rs b/src/array/codec/array_to_bytes/sharding/sharding_codec.rs index a1ba4af6..7f5b5423 100644 --- a/src/array/codec/array_to_bytes/sharding/sharding_codec.rs +++ b/src/array/codec/array_to_bytes/sharding/sharding_codec.rs @@ -4,7 +4,7 @@ use crate::{ ArrayCodecTraits, ArrayPartialDecoderTraits, ArrayToBytesCodecTraits, BytesPartialDecoderTraits, Codec, CodecChain, CodecError, CodecPlugin, CodecTraits, }, - ArrayRepresentation, BytesRepresentation, + ArrayRepresentation, BytesRepresentation, UnsafeCellSlice, }, array_subset::ArraySubset, metadata::Metadata, @@ -14,7 +14,7 @@ use crate::{ use super::{ calculate_chunks_per_shard, compute_index_encoded_size, decode_shard_index, sharding_index_decoded_representation, sharding_partial_decoder, ShardingCodecConfiguration, - ShardingCodecConfigurationV1, UnsafeCellSlice, + ShardingCodecConfigurationV1, }; use rayon::prelude::*; @@ -426,6 +426,8 @@ impl ShardingCodec { .enumerate() .par_bridge() .map(|(chunk_index, (_chunk_indices, chunk_subset))| { + let shard_slice = unsafe { shard_slice.get() }; + // Read the offset/size let offset = shard_index[chunk_index * 2]; let size = shard_index[chunk_index * 2 + 1]; @@ -445,12 +447,8 @@ impl ShardingCodec { } { let shard_offset = usize::try_from(index * element_size).unwrap(); let length = usize::try_from(num_elements * element_size).unwrap(); - unsafe { - shard_slice.copy_from_slice( - shard_offset, - &decoded_chunk[data_idx..data_idx + length], - ); - } + shard_slice[shard_offset..shard_offset + length] + .copy_from_slice(&decoded_chunk[data_idx..data_idx + length]); data_idx += length; } } diff --git a/src/array/codec/array_to_bytes/sharding/sharding_partial_decoder.rs b/src/array/codec/array_to_bytes/sharding/sharding_partial_decoder.rs index 3321e848..e66ea17a 100644 --- a/src/array/codec/array_to_bytes/sharding/sharding_partial_decoder.rs +++ b/src/array/codec/array_to_bytes/sharding/sharding_partial_decoder.rs @@ -8,14 +8,14 @@ use crate::{ ArrayPartialDecoderTraits, ArraySubset, ArrayToBytesCodecTraits, ByteIntervalPartialDecoder, BytesPartialDecoderTraits, CodecChain, CodecError, }, - ravel_indices, ArrayRepresentation, ArrayShape, BytesRepresentation, + ravel_indices, ArrayRepresentation, ArrayShape, BytesRepresentation, UnsafeCellSlice, }, byte_range::ByteRange, }; use super::{ calculate_chunks_per_shard, compute_index_encoded_size, decode_shard_index, - sharding_index_decoded_representation, UnsafeCellSlice, + sharding_index_decoded_representation, }; /// The partial decoder for the sharding codec. @@ -258,6 +258,8 @@ impl ArrayPartialDecoderTraits for ShardingPartialDecoder<'_> { unsafe { array_subset.iter_chunks_unchecked(chunk_representation.shape()) } .par_bridge() .map(|(chunk_indices, chunk_subset)| { + let out_array_subset_slice = unsafe { out_array_subset_slice.get() }; + let shard_index_idx: usize = usize::try_from(ravel_indices(&chunk_indices, &chunks_per_shard) * 2) .unwrap(); @@ -293,12 +295,10 @@ impl ArrayPartialDecoderTraits for ShardingPartialDecoder<'_> { let output_offset = usize::try_from(array_subset_element_index * element_size).unwrap(); let length = usize::try_from(num_elements * element_size).unwrap(); - unsafe { - out_array_subset_slice.copy_from_slice( - output_offset, + out_array_subset_slice[output_offset..output_offset + length] + .copy_from_slice( &decoded_bytes[decoded_offset..decoded_offset + length], ); - } decoded_offset += length; } Ok::<_, CodecError>(()) diff --git a/src/array/unsafe_cell_slice.rs b/src/array/unsafe_cell_slice.rs new file mode 100644 index 00000000..951e266b --- /dev/null +++ b/src/array/unsafe_cell_slice.rs @@ -0,0 +1,22 @@ +/// An unsafe cell slice. +/// +/// This is used internally for parallel chunk decoding. +/// It is used to write to subsets of a slice from multiple threads without locking. +#[derive(Copy, Clone)] +pub struct UnsafeCellSlice<'a, T>(&'a [std::cell::UnsafeCell]); + +unsafe impl<'a, T: Send + Sync> Send for UnsafeCellSlice<'a, T> {} +unsafe impl<'a, T: Send + Sync> Sync for UnsafeCellSlice<'a, T> {} + +impl<'a, T: Copy> UnsafeCellSlice<'a, T> { + pub fn new(slice: &'a mut [T]) -> Self { + let ptr = slice as *mut [T] as *const [std::cell::UnsafeCell]; + Self(unsafe { &*ptr }) + } + + #[allow(clippy::mut_from_ref)] + pub unsafe fn get(&self) -> &mut [T] { + let ptr = self.0[0].get(); + std::slice::from_raw_parts_mut(ptr, self.0.len()) + } +}