From 6a8baed73f7102e259da5491725b7e643b54dd94 Mon Sep 17 00:00:00 2001 From: Lachlan Deakin Date: Wed, 1 Jan 2025 11:47:22 +1100 Subject: [PATCH] feat: add `ArrayShardedReadableExt::retrieve_encoded_inner_chunk` (#110) * fix: cleanup unnecessary lifetime constraints in partial decoders * refactor: make `{Array,Bytes}PartialDecoderCache` private * fix: remove BytesPartialDecoderCache lifetime constraint * feat: make `Any` a supertrait of partial encoder/decoder traits * feat: add `ArrayShardedReadableExt::retrieve_encoded_inner_chunk` * feat: add `ArrayShardedReadableExt::inner_chunk_byte_range` * feat: Add `ArrayError::UnsupportedMethod` * feat: add `ArrayShardedExt::is_exclusively_sharded` * feat: Add `ArrayShardedReadableExtCache::array_is_exclusively_sharded` * nightly! fix: inner_chunk_byte_range and retrieve_encoded_inner_chunk * fix: workaround for trait upcasting --- CHANGELOG.md | 7 + zarrs/src/array/array_errors.rs | 3 + zarrs/src/array/array_sharded_ext.rs | 9 + .../array/array_sync_sharded_readable_ext.rs | 256 ++++++++++++++++-- .../array/codec/array_to_bytes/sharding.rs | 1 + .../array_to_bytes/sharding/sharding_codec.rs | 8 +- .../sharding/sharding_partial_decoder.rs | 41 +++ 7 files changed, 297 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5b0edbf4..536f5b81 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,10 +7,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added +- Add `ArrayShardedReadableExt::retrieve_encoded_inner_chunk` +- Add `ArrayShardedReadableExt::inner_chunk_byte_range` +- Add `ArrayShardedExt::is_exclusively_sharded` +- Add `ArrayShardedReadableExtCache::array_is_exclusively_sharded` + ### Changed - **Breaking**: Seal `Array` extension traits: `ArraySharded[Readable]Ext` and `ArrayChunkCacheExt` - **Breaking**: Make `{Array,Bytes}PartialDecoderCache` private - **Breaking**: Make `Any` a supertrait of partial encoder/decoder traits +- **Breaking**: Add `ArrayError::UnsupportedMethod` ### Fixed - Cleanup unnecessary lifetime constraints in partial decoders diff --git a/zarrs/src/array/array_errors.rs b/zarrs/src/array/array_errors.rs index ff628d6d..527d0a58 100644 --- a/zarrs/src/array/array_errors.rs +++ b/zarrs/src/array/array_errors.rs @@ -109,4 +109,7 @@ pub enum ArrayError { /// - a string with invalid utf-8 encoding. #[error("Invalid element value")] InvalidElementValue, + /// Unsupported method. + #[error("unsupported array method: {_0}")] + UnsupportedMethod(String), } diff --git a/zarrs/src/array/array_sharded_ext.rs b/zarrs/src/array/array_sharded_ext.rs index eda6fc45..0421a4ad 100644 --- a/zarrs/src/array/array_sharded_ext.rs +++ b/zarrs/src/array/array_sharded_ext.rs @@ -5,6 +5,9 @@ pub trait ArrayShardedExt: private::Sealed { /// Returns true if the array to bytes codec of the array is `sharding_indexed`. fn is_sharded(&self) -> bool; + /// Returns true if the array-to-bytes codec of the array is `sharding_indexed` and the array has no array-to-array or bytes-to-bytes codecs. + fn is_exclusively_sharded(&self) -> bool; + /// Return the inner chunk shape as defined in the `sharding_indexed` codec metadata. /// /// Returns [`None`] for an unsharded array. @@ -40,6 +43,12 @@ impl ArrayShardedExt for Array { == super::codec::array_to_bytes::sharding::IDENTIFIER } + fn is_exclusively_sharded(&self) -> bool { + self.is_sharded() + && self.codecs.array_to_array_codecs().is_empty() + && self.codecs.bytes_to_bytes_codecs().is_empty() + } + fn inner_chunk_shape(&self) -> Option { let codec_metadata = self .codecs diff --git a/zarrs/src/array/array_sync_sharded_readable_ext.rs b/zarrs/src/array/array_sync_sharded_readable_ext.rs index e7f4a72e..461df909 100644 --- a/zarrs/src/array/array_sync_sharded_readable_ext.rs +++ b/zarrs/src/array/array_sync_sharded_readable_ext.rs @@ -3,22 +3,51 @@ use std::{collections::HashMap, sync::Arc}; use rayon::iter::{IntoParallelIterator, ParallelIterator}; use rayon_iter_concurrent_limit::iter_concurrent_limit; use unsafe_cell_slice::UnsafeCellSlice; +use zarrs_metadata::v3::array::codec::sharding::ShardingCodecConfiguration; +use zarrs_storage::byte_range::ByteRange; +use zarrs_storage::StorageHandle; use super::array_bytes::{merge_chunks_vlen, update_bytes_flen}; +use super::codec::array_to_bytes::sharding::ShardingPartialDecoder; +use super::codec::{CodecError, ShardingCodec}; use super::element::ElementOwned; use super::{ codec::CodecOptions, concurrency::concurrency_chunks_and_codec, Array, ArrayError, ArrayShardedExt, ChunkGrid, }; use super::{ArrayBytes, ArraySize, DataTypeSize}; +use crate::array::codec::StoragePartialDecoder; use crate::storage::ReadableStorageTraits; use crate::{array::codec::ArrayPartialDecoderTraits, array_subset::ArraySubset}; -type PartialDecoderHashMap = HashMap, Arc>; +// TODO: Remove with trait upcasting +#[derive(Clone)] +enum MaybeShardingPartialDecoder { + Sharding(Arc), + Other(Arc), +} + +impl MaybeShardingPartialDecoder { + fn partial_decode( + &self, + array_subsets: &[ArraySubset], + options: &CodecOptions, + ) -> Result>, CodecError> { + match self { + Self::Sharding(partial_decoder) => { + partial_decoder.partial_decode(array_subsets, options) + } + Self::Other(partial_decoder) => partial_decoder.partial_decode(array_subsets, options), + } + } +} + +type PartialDecoderHashMap = HashMap, MaybeShardingPartialDecoder>; /// A cache used for methods in the [`ArrayShardedReadableExt`] trait. pub struct ArrayShardedReadableExtCache { array_is_sharded: bool, + array_is_exclusively_sharded: bool, inner_chunk_grid: ChunkGrid, cache: Arc>, } @@ -30,6 +59,7 @@ impl ArrayShardedReadableExtCache { let inner_chunk_grid = array.inner_chunk_grid(); Self { array_is_sharded: array.is_sharded(), + array_is_exclusively_sharded: array.is_exclusively_sharded(), inner_chunk_grid, cache: Arc::new(std::sync::Mutex::new(HashMap::default())), } @@ -43,6 +73,14 @@ impl ArrayShardedReadableExtCache { self.array_is_sharded } + /// Returns true if the array is exclusively sharded (no array-to-array or bytes-to-bytes codecs). + /// + /// This is cheaper than calling [`ArrayShardedExt::is_exclusively_sharded`] repeatedly. + #[must_use] + pub fn array_is_exclusively_sharded(&self) -> bool { + self.array_is_exclusively_sharded + } + fn inner_chunk_grid(&self) -> &ChunkGrid { &self.inner_chunk_grid } @@ -71,13 +109,61 @@ impl ArrayShardedReadableExtCache { &self, array: &Array, shard_indices: &[u64], - ) -> Result, ArrayError> { + ) -> Result { let mut cache = self.cache.lock().unwrap(); if let Some(partial_decoder) = cache.get(shard_indices) { Ok(partial_decoder.clone()) + } else if self.array_is_exclusively_sharded() { + // Create the sharding partial decoder directly, without a codec chain + let storage_handle = Arc::new(StorageHandle::new(array.storage.clone())); + let storage_transformer = array + .storage_transformers() + .create_readable_transformer(storage_handle)?; + let input_handle = Arc::new(StoragePartialDecoder::new( + storage_transformer, + array.chunk_key(shard_indices), + )); + + // --- Workaround for lack off trait upcasting --- + let chunk_representation = array.chunk_array_representation(shard_indices)?; + let sharding_codec_metadata = array + .codecs() + .array_to_bytes_codec() + .create_metadata() + .expect("valid sharding metadata"); + let sharding_codec_configuration = sharding_codec_metadata + .to_configuration::() + .expect("valid sharding configuration"); + let sharding_codec = Arc::new( + ShardingCodec::new_with_configuration(&sharding_codec_configuration).expect( + "supported sharding codec configuration, already instantiated in array", + ), + ); + let partial_decoder = + MaybeShardingPartialDecoder::Sharding(Arc::new(ShardingPartialDecoder::new( + input_handle, + chunk_representation, + sharding_codec.chunk_shape.clone(), + sharding_codec.inner_codecs.clone(), + &sharding_codec.index_codecs, + sharding_codec.index_location, + &CodecOptions::default(), + )?)); + // // TODO: Trait upcasting + // let partial_decoder = array + // .codecs() + // .array_to_bytes_codec() + // .clone() + // .partial_decoder( + // input_handle, + // &chunk_representation, + // &CodecOptions::default(), + // )?; + cache.insert(shard_indices.to_vec(), partial_decoder.clone()); + Ok(partial_decoder) } else { - let partial_decoder: Arc = - array.partial_decoder(shard_indices)?; + let partial_decoder = + MaybeShardingPartialDecoder::Other(array.partial_decoder(shard_indices)?); cache.insert(shard_indices.to_vec(), partial_decoder.clone()); Ok(partial_decoder) } @@ -91,6 +177,28 @@ impl ArrayShardedReadableExtCache { pub trait ArrayShardedReadableExt: private::Sealed { + /// Retrieve the byte range of an encoded inner chunk. + /// + /// # Errors + /// Returns an [`ArrayError`] on failure, such as if decoding the shard index fails. + fn inner_chunk_byte_range( + &self, + cache: &ArrayShardedReadableExtCache, + inner_chunk_indices: &[u64], + ) -> Result, ArrayError>; + + /// Retrieve the encoded bytes of an inner chunk. + /// + /// See [`Array::retrieve_encoded_chunk`]. + #[allow(clippy::missing_errors_doc)] + fn retrieve_encoded_inner_chunk( + &self, + cache: &ArrayShardedReadableExtCache, + inner_chunk_indices: &[u64], + ) -> Result>, ArrayError>; + + // TODO: retrieve_encoded_inner_chunks + /// Read and decode the inner chunk at `chunk_indices` into its bytes. /// /// See [`Array::retrieve_chunk_opt`]. @@ -194,9 +302,108 @@ pub trait ArrayShardedReadableExt Result, ArrayError>; } +fn inner_chunk_shard_index_and_subset( + array: &Array, + cache: &ArrayShardedReadableExtCache, + inner_chunk_indices: &[u64], +) -> Result<(Vec, ArraySubset), ArrayError> { + // TODO: Can this logic be simplified? + let array_subset = cache + .inner_chunk_grid() + .subset(inner_chunk_indices, array.shape())? + .ok_or_else(|| ArrayError::InvalidChunkGridIndicesError(inner_chunk_indices.to_vec()))?; + let shards = array + .chunks_in_array_subset(&array_subset)? + .ok_or_else(|| ArrayError::InvalidChunkGridIndicesError(inner_chunk_indices.to_vec()))?; + if shards.num_elements() != 1 { + // This should not happen, but it is checked just in case. + return Err(ArrayError::InvalidChunkGridIndicesError( + inner_chunk_indices.to_vec(), + )); + } + let shard_indices = shards.start(); + let shard_origin = array.chunk_origin(shard_indices)?; + let shard_subset = array_subset.relative_to(&shard_origin)?; + Ok((shard_indices.to_vec(), shard_subset)) +} + +fn inner_chunk_shard_index_and_chunk_index( + array: &Array, + cache: &ArrayShardedReadableExtCache, + inner_chunk_indices: &[u64], +) -> Result<(Vec, Vec), ArrayError> { + // TODO: Simplify this? + let (shard_indices, shard_subset) = + inner_chunk_shard_index_and_subset(array, cache, inner_chunk_indices)?; + let effective_inner_chunk_shape = array + .effective_inner_chunk_shape() + .expect("array is sharded"); + let chunk_indices: Vec = shard_subset + .start() + .iter() + .zip(effective_inner_chunk_shape.as_slice()) + .map(|(o, s)| o / s.get()) + .collect(); + Ok((shard_indices, chunk_indices)) +} + impl ArrayShardedReadableExt for Array { + fn inner_chunk_byte_range( + &self, + cache: &ArrayShardedReadableExtCache, + inner_chunk_indices: &[u64], + ) -> Result, ArrayError> { + if cache.array_is_exclusively_sharded() { + let (shard_indices, chunk_indices) = + inner_chunk_shard_index_and_chunk_index(self, cache, inner_chunk_indices)?; + let partial_decoder = cache.retrieve(self, &shard_indices)?; + let MaybeShardingPartialDecoder::Sharding(partial_decoder) = partial_decoder else { + unreachable!("exlusively sharded") + }; + // TODO: trait upcasting + // let partial_decoder: Arc = partial_decoder.clone(); + // let partial_decoder = partial_decoder + // .downcast::() + // .expect("array is exclusively sharded"); + + Ok(partial_decoder.inner_chunk_byte_range(&chunk_indices)?) + } else { + Err(ArrayError::UnsupportedMethod( + "the array is not exclusively sharded".to_string(), + )) + } + } + + fn retrieve_encoded_inner_chunk( + &self, + cache: &ArrayShardedReadableExtCache, + inner_chunk_indices: &[u64], + ) -> Result>, ArrayError> { + if cache.array_is_exclusively_sharded() { + let (shard_indices, chunk_indices) = + inner_chunk_shard_index_and_chunk_index(self, cache, inner_chunk_indices)?; + let partial_decoder = cache.retrieve(self, &shard_indices)?; + let MaybeShardingPartialDecoder::Sharding(partial_decoder) = partial_decoder else { + unreachable!("exlusively sharded") + }; + // TODO: trait upcasting + // let partial_decoder: Arc = partial_decoder.clone(); + // let partial_decoder = partial_decoder + // .downcast::() + // .expect("array is exclusively sharded"); + + Ok(partial_decoder + .retrieve_inner_chunk_encoded(&chunk_indices)? + .map(Vec::from)) + } else { + Err(ArrayError::UnsupportedMethod( + "the array is not exclusively sharded".to_string(), + )) + } + } + fn retrieve_inner_chunk_opt( &self, cache: &ArrayShardedReadableExtCache, @@ -204,26 +411,9 @@ impl ArrayShardedReadableExt options: &CodecOptions, ) -> Result, ArrayError> { if cache.array_is_sharded() { - let array_subset = cache - .inner_chunk_grid() - .subset(inner_chunk_indices, self.shape())? - .ok_or_else(|| { - ArrayError::InvalidChunkGridIndicesError(inner_chunk_indices.to_vec()) - })?; - let shards = self.chunks_in_array_subset(&array_subset)?.ok_or_else(|| { - ArrayError::InvalidChunkGridIndicesError(inner_chunk_indices.to_vec()) - })?; - if shards.num_elements() != 1 { - // This should not happen, but it is checked just in case. - return Err(ArrayError::InvalidChunkGridIndicesError( - inner_chunk_indices.to_vec(), - )); - } - let shard_indices = shards.start(); - let shard_origin = self.chunk_origin(shard_indices)?; - let shard_subset = array_subset.relative_to(&shard_origin)?; - - let partial_decoder = cache.retrieve(self, shard_indices)?; + let (shard_indices, shard_subset) = + inner_chunk_shard_index_and_subset(self, cache, inner_chunk_indices)?; + let partial_decoder = cache.retrieve(self, &shard_indices)?; let bytes = partial_decoder .partial_decode(&[shard_subset], options)? .remove(0) @@ -614,6 +804,21 @@ mod tests { assert_eq!(compare, test); assert_eq!(cache.len(), 4); } + + let encoded_inner_chunk = array + .retrieve_encoded_inner_chunk(&cache, &[0, 0])? + .unwrap(); + assert_eq!( + array + .inner_chunk_byte_range(&cache, &[0, 0])? + .unwrap() + .length(u64::MAX), + encoded_inner_chunk.len() as u64 + ); + // assert_eq!( + // u16::from_array_bytes(array.data_type(), encoded_inner_chunk.into())?, + // array.retrieve_chunk_elements::(&[0, 0])? + // ); } else { assert_eq!(array.inner_chunk_shape(), None); assert_eq!( @@ -642,6 +847,9 @@ mod tests { )?; assert_eq!(compare, test); assert!(cache.is_empty()); + + assert!(array.retrieve_encoded_inner_chunk(&cache, &[0, 0]).is_err()); + assert!(array.inner_chunk_byte_range(&cache, &[0, 0]).is_err()); } Ok(()) diff --git a/zarrs/src/array/codec/array_to_bytes/sharding.rs b/zarrs/src/array/codec/array_to_bytes/sharding.rs index 58d2d417..b0a7e216 100644 --- a/zarrs/src/array/codec/array_to_bytes/sharding.rs +++ b/zarrs/src/array/codec/array_to_bytes/sharding.rs @@ -23,6 +23,7 @@ pub use crate::metadata::v3::array::codec::sharding::{ pub use sharding_codec::ShardingCodec; pub use sharding_codec_builder::ShardingCodecBuilder; +pub(crate) use sharding_partial_decoder::ShardingPartialDecoder; use crate::{ array::{ diff --git a/zarrs/src/array/codec/array_to_bytes/sharding/sharding_codec.rs b/zarrs/src/array/codec/array_to_bytes/sharding/sharding_codec.rs index 3411ca40..f6ec782f 100644 --- a/zarrs/src/array/codec/array_to_bytes/sharding/sharding_codec.rs +++ b/zarrs/src/array/codec/array_to_bytes/sharding/sharding_codec.rs @@ -38,13 +38,13 @@ use unsafe_cell_slice::UnsafeCellSlice; #[derive(Clone, Debug)] pub struct ShardingCodec { /// An array of integers specifying the shape of the inner chunks in a shard along each dimension of the outer array. - chunk_shape: ChunkShape, + pub(crate) chunk_shape: ChunkShape, /// The codecs used to encode and decode inner chunks. - inner_codecs: Arc, + pub(crate) inner_codecs: Arc, /// The codecs used to encode and decode the shard index. - index_codecs: Arc, + pub(crate) index_codecs: Arc, /// Specifies whether the shard index is located at the beginning or end of the file. - index_location: ShardingIndexLocation, + pub(crate) index_location: ShardingIndexLocation, } impl ShardingCodec { diff --git a/zarrs/src/array/codec/array_to_bytes/sharding/sharding_partial_decoder.rs b/zarrs/src/array/codec/array_to_bytes/sharding/sharding_partial_decoder.rs index cb08204d..bc90361e 100644 --- a/zarrs/src/array/codec/array_to_bytes/sharding/sharding_partial_decoder.rs +++ b/zarrs/src/array/codec/array_to_bytes/sharding/sharding_partial_decoder.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use rayon::prelude::*; use unsafe_cell_slice::UnsafeCellSlice; +use zarrs_storage::byte_range::ByteRange; use crate::array::{ array_bytes::{merge_chunks_vlen, update_bytes_flen}, @@ -12,6 +13,7 @@ use crate::array::{ }, concurrency::{calc_concurrency_outer_inner, RecommendedConcurrency}, ravel_indices, ArrayBytes, ArraySize, ChunkRepresentation, ChunkShape, DataType, DataTypeSize, + RawBytes, }; #[cfg(feature = "async")] @@ -58,6 +60,45 @@ impl ShardingPartialDecoder { shard_index, }) } + + /// Retrieve the byte range of an encoded inner chunk. + /// + /// The `chunk_indices` are relative to the start of the shard. + pub(crate) fn inner_chunk_byte_range( + &self, + chunk_indices: &[u64], + ) -> Result, CodecError> { + let shard_index = &self.shard_index; + if let Some(shard_index) = shard_index { + let chunks_per_shard = + calculate_chunks_per_shard(self.decoded_representation.shape(), &self.chunk_shape)?; + let chunks_per_shard = chunks_per_shard.to_array_shape(); + + let shard_index_idx: usize = + usize::try_from(ravel_indices(chunk_indices, &chunks_per_shard) * 2).unwrap(); + let offset = shard_index[shard_index_idx]; + let size = shard_index[shard_index_idx + 1]; + Ok(Some(ByteRange::new(offset..offset + size))) + } else { + Ok(None) + } + } + + /// Retrieve the encoded bytes of an inner chunk. + /// + /// The `chunk_indices` are relative to the start of the shard. + pub(crate) fn retrieve_inner_chunk_encoded( + &self, + chunk_indices: &[u64], + ) -> Result>, CodecError> { + let byte_range = self.inner_chunk_byte_range(chunk_indices)?; + if let Some(byte_range) = byte_range { + self.input_handle + .partial_decode_concat(&[byte_range], &CodecOptions::default()) + } else { + Ok(None) + } + } } impl ArrayPartialDecoderTraits for ShardingPartialDecoder {