Skip to content

Commit

Permalink
feat: add ArrayShardedReadableExt::retrieve_encoded_inner_chunk (#110)
Browse files Browse the repository at this point in the history
* fix: cleanup unnecessary lifetime constraints in partial decoders

* refactor: make `{Array,Bytes}PartialDecoderCache` private

* fix: remove BytesPartialDecoderCache lifetime constraint

* feat: make `Any` a supertrait of partial encoder/decoder traits

* feat: add `ArrayShardedReadableExt::retrieve_encoded_inner_chunk`

* feat: add `ArrayShardedReadableExt::inner_chunk_byte_range`

* feat: Add `ArrayError::UnsupportedMethod`

* feat: add `ArrayShardedExt::is_exclusively_sharded`

* feat: Add `ArrayShardedReadableExtCache::array_is_exclusively_sharded`

* nightly! fix: inner_chunk_byte_range and retrieve_encoded_inner_chunk

* fix: workaround for trait upcasting
  • Loading branch information
LDeakin authored Jan 1, 2025
1 parent 287bb15 commit 6a8baed
Show file tree
Hide file tree
Showing 7 changed files with 297 additions and 28 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added
- Add `ArrayShardedReadableExt::retrieve_encoded_inner_chunk`
- Add `ArrayShardedReadableExt::inner_chunk_byte_range`
- Add `ArrayShardedExt::is_exclusively_sharded`
- Add `ArrayShardedReadableExtCache::array_is_exclusively_sharded`

### Changed
- **Breaking**: Seal `Array` extension traits: `ArraySharded[Readable]Ext` and `ArrayChunkCacheExt`
- **Breaking**: Make `{Array,Bytes}PartialDecoderCache` private
- **Breaking**: Make `Any` a supertrait of partial encoder/decoder traits
- **Breaking**: Add `ArrayError::UnsupportedMethod`

### Fixed
- Cleanup unnecessary lifetime constraints in partial decoders
Expand Down
3 changes: 3 additions & 0 deletions zarrs/src/array/array_errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,7 @@ pub enum ArrayError {
/// - a string with invalid utf-8 encoding.
#[error("Invalid element value")]
InvalidElementValue,
/// Unsupported method.
#[error("unsupported array method: {_0}")]
UnsupportedMethod(String),
}
9 changes: 9 additions & 0 deletions zarrs/src/array/array_sharded_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ pub trait ArrayShardedExt: private::Sealed {
/// Returns true if the array to bytes codec of the array is `sharding_indexed`.
fn is_sharded(&self) -> bool;

/// Returns true if the array-to-bytes codec of the array is `sharding_indexed` and the array has no array-to-array or bytes-to-bytes codecs.
fn is_exclusively_sharded(&self) -> bool;

/// Return the inner chunk shape as defined in the `sharding_indexed` codec metadata.
///
/// Returns [`None`] for an unsharded array.
Expand Down Expand Up @@ -40,6 +43,12 @@ impl<TStorage: ?Sized> ArrayShardedExt for Array<TStorage> {
== super::codec::array_to_bytes::sharding::IDENTIFIER
}

fn is_exclusively_sharded(&self) -> bool {
self.is_sharded()
&& self.codecs.array_to_array_codecs().is_empty()
&& self.codecs.bytes_to_bytes_codecs().is_empty()
}

fn inner_chunk_shape(&self) -> Option<ChunkShape> {
let codec_metadata = self
.codecs
Expand Down
256 changes: 232 additions & 24 deletions zarrs/src/array/array_sync_sharded_readable_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,51 @@ use std::{collections::HashMap, sync::Arc};
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use rayon_iter_concurrent_limit::iter_concurrent_limit;
use unsafe_cell_slice::UnsafeCellSlice;
use zarrs_metadata::v3::array::codec::sharding::ShardingCodecConfiguration;
use zarrs_storage::byte_range::ByteRange;
use zarrs_storage::StorageHandle;

use super::array_bytes::{merge_chunks_vlen, update_bytes_flen};
use super::codec::array_to_bytes::sharding::ShardingPartialDecoder;
use super::codec::{CodecError, ShardingCodec};
use super::element::ElementOwned;
use super::{
codec::CodecOptions, concurrency::concurrency_chunks_and_codec, Array, ArrayError,
ArrayShardedExt, ChunkGrid,
};
use super::{ArrayBytes, ArraySize, DataTypeSize};
use crate::array::codec::StoragePartialDecoder;
use crate::storage::ReadableStorageTraits;
use crate::{array::codec::ArrayPartialDecoderTraits, array_subset::ArraySubset};

type PartialDecoderHashMap = HashMap<Vec<u64>, Arc<dyn ArrayPartialDecoderTraits>>;
// TODO: Remove with trait upcasting
#[derive(Clone)]
enum MaybeShardingPartialDecoder {
Sharding(Arc<ShardingPartialDecoder>),
Other(Arc<dyn ArrayPartialDecoderTraits>),
}

impl MaybeShardingPartialDecoder {
fn partial_decode(
&self,
array_subsets: &[ArraySubset],
options: &CodecOptions,
) -> Result<Vec<ArrayBytes<'_>>, CodecError> {
match self {
Self::Sharding(partial_decoder) => {
partial_decoder.partial_decode(array_subsets, options)
}
Self::Other(partial_decoder) => partial_decoder.partial_decode(array_subsets, options),
}
}
}

type PartialDecoderHashMap = HashMap<Vec<u64>, MaybeShardingPartialDecoder>;

/// A cache used for methods in the [`ArrayShardedReadableExt`] trait.
pub struct ArrayShardedReadableExtCache {
array_is_sharded: bool,
array_is_exclusively_sharded: bool,
inner_chunk_grid: ChunkGrid,
cache: Arc<std::sync::Mutex<PartialDecoderHashMap>>,
}
Expand All @@ -30,6 +59,7 @@ impl ArrayShardedReadableExtCache {
let inner_chunk_grid = array.inner_chunk_grid();
Self {
array_is_sharded: array.is_sharded(),
array_is_exclusively_sharded: array.is_exclusively_sharded(),
inner_chunk_grid,
cache: Arc::new(std::sync::Mutex::new(HashMap::default())),
}
Expand All @@ -43,6 +73,14 @@ impl ArrayShardedReadableExtCache {
self.array_is_sharded
}

/// Returns true if the array is exclusively sharded (no array-to-array or bytes-to-bytes codecs).
///
/// This is cheaper than calling [`ArrayShardedExt::is_exclusively_sharded`] repeatedly.
#[must_use]
pub fn array_is_exclusively_sharded(&self) -> bool {
self.array_is_exclusively_sharded
}

fn inner_chunk_grid(&self) -> &ChunkGrid {
&self.inner_chunk_grid
}
Expand Down Expand Up @@ -71,13 +109,61 @@ impl ArrayShardedReadableExtCache {
&self,
array: &Array<TStorage>,
shard_indices: &[u64],
) -> Result<Arc<dyn ArrayPartialDecoderTraits>, ArrayError> {
) -> Result<MaybeShardingPartialDecoder, ArrayError> {
let mut cache = self.cache.lock().unwrap();
if let Some(partial_decoder) = cache.get(shard_indices) {
Ok(partial_decoder.clone())
} else if self.array_is_exclusively_sharded() {
// Create the sharding partial decoder directly, without a codec chain
let storage_handle = Arc::new(StorageHandle::new(array.storage.clone()));
let storage_transformer = array
.storage_transformers()
.create_readable_transformer(storage_handle)?;
let input_handle = Arc::new(StoragePartialDecoder::new(
storage_transformer,
array.chunk_key(shard_indices),
));

// --- Workaround for lack off trait upcasting ---
let chunk_representation = array.chunk_array_representation(shard_indices)?;
let sharding_codec_metadata = array
.codecs()
.array_to_bytes_codec()
.create_metadata()
.expect("valid sharding metadata");
let sharding_codec_configuration = sharding_codec_metadata
.to_configuration::<ShardingCodecConfiguration>()
.expect("valid sharding configuration");
let sharding_codec = Arc::new(
ShardingCodec::new_with_configuration(&sharding_codec_configuration).expect(
"supported sharding codec configuration, already instantiated in array",
),
);
let partial_decoder =
MaybeShardingPartialDecoder::Sharding(Arc::new(ShardingPartialDecoder::new(
input_handle,
chunk_representation,
sharding_codec.chunk_shape.clone(),
sharding_codec.inner_codecs.clone(),
&sharding_codec.index_codecs,
sharding_codec.index_location,
&CodecOptions::default(),
)?));
// // TODO: Trait upcasting
// let partial_decoder = array
// .codecs()
// .array_to_bytes_codec()
// .clone()
// .partial_decoder(
// input_handle,
// &chunk_representation,
// &CodecOptions::default(),
// )?;
cache.insert(shard_indices.to_vec(), partial_decoder.clone());
Ok(partial_decoder)
} else {
let partial_decoder: Arc<dyn ArrayPartialDecoderTraits> =
array.partial_decoder(shard_indices)?;
let partial_decoder =
MaybeShardingPartialDecoder::Other(array.partial_decoder(shard_indices)?);
cache.insert(shard_indices.to_vec(), partial_decoder.clone());
Ok(partial_decoder)
}
Expand All @@ -91,6 +177,28 @@ impl ArrayShardedReadableExtCache {
pub trait ArrayShardedReadableExt<TStorage: ?Sized + ReadableStorageTraits + 'static>:
private::Sealed
{
/// Retrieve the byte range of an encoded inner chunk.
///
/// # Errors
/// Returns an [`ArrayError`] on failure, such as if decoding the shard index fails.
fn inner_chunk_byte_range(
&self,
cache: &ArrayShardedReadableExtCache,
inner_chunk_indices: &[u64],
) -> Result<Option<ByteRange>, ArrayError>;

/// Retrieve the encoded bytes of an inner chunk.
///
/// See [`Array::retrieve_encoded_chunk`].
#[allow(clippy::missing_errors_doc)]
fn retrieve_encoded_inner_chunk(
&self,
cache: &ArrayShardedReadableExtCache,
inner_chunk_indices: &[u64],
) -> Result<Option<Vec<u8>>, ArrayError>;

// TODO: retrieve_encoded_inner_chunks

/// Read and decode the inner chunk at `chunk_indices` into its bytes.
///
/// See [`Array::retrieve_chunk_opt`].
Expand Down Expand Up @@ -194,36 +302,118 @@ pub trait ArrayShardedReadableExt<TStorage: ?Sized + ReadableStorageTraits + 'st
) -> Result<ndarray::ArrayD<T>, ArrayError>;
}

fn inner_chunk_shard_index_and_subset<TStorage: ?Sized + ReadableStorageTraits + 'static>(
array: &Array<TStorage>,
cache: &ArrayShardedReadableExtCache,
inner_chunk_indices: &[u64],
) -> Result<(Vec<u64>, ArraySubset), ArrayError> {
// TODO: Can this logic be simplified?
let array_subset = cache
.inner_chunk_grid()
.subset(inner_chunk_indices, array.shape())?
.ok_or_else(|| ArrayError::InvalidChunkGridIndicesError(inner_chunk_indices.to_vec()))?;
let shards = array
.chunks_in_array_subset(&array_subset)?
.ok_or_else(|| ArrayError::InvalidChunkGridIndicesError(inner_chunk_indices.to_vec()))?;
if shards.num_elements() != 1 {
// This should not happen, but it is checked just in case.
return Err(ArrayError::InvalidChunkGridIndicesError(
inner_chunk_indices.to_vec(),
));
}
let shard_indices = shards.start();
let shard_origin = array.chunk_origin(shard_indices)?;
let shard_subset = array_subset.relative_to(&shard_origin)?;
Ok((shard_indices.to_vec(), shard_subset))
}

fn inner_chunk_shard_index_and_chunk_index<TStorage: ?Sized + ReadableStorageTraits + 'static>(
array: &Array<TStorage>,
cache: &ArrayShardedReadableExtCache,
inner_chunk_indices: &[u64],
) -> Result<(Vec<u64>, Vec<u64>), ArrayError> {
// TODO: Simplify this?
let (shard_indices, shard_subset) =
inner_chunk_shard_index_and_subset(array, cache, inner_chunk_indices)?;
let effective_inner_chunk_shape = array
.effective_inner_chunk_shape()
.expect("array is sharded");
let chunk_indices: Vec<u64> = shard_subset
.start()
.iter()
.zip(effective_inner_chunk_shape.as_slice())
.map(|(o, s)| o / s.get())
.collect();
Ok((shard_indices, chunk_indices))
}

impl<TStorage: ?Sized + ReadableStorageTraits + 'static> ArrayShardedReadableExt<TStorage>
for Array<TStorage>
{
fn inner_chunk_byte_range(
&self,
cache: &ArrayShardedReadableExtCache,
inner_chunk_indices: &[u64],
) -> Result<Option<ByteRange>, ArrayError> {
if cache.array_is_exclusively_sharded() {
let (shard_indices, chunk_indices) =
inner_chunk_shard_index_and_chunk_index(self, cache, inner_chunk_indices)?;
let partial_decoder = cache.retrieve(self, &shard_indices)?;
let MaybeShardingPartialDecoder::Sharding(partial_decoder) = partial_decoder else {
unreachable!("exlusively sharded")
};
// TODO: trait upcasting
// let partial_decoder: Arc<dyn Any + Send + Sync> = partial_decoder.clone();
// let partial_decoder = partial_decoder
// .downcast::<ShardingPartialDecoder>()
// .expect("array is exclusively sharded");

Ok(partial_decoder.inner_chunk_byte_range(&chunk_indices)?)
} else {
Err(ArrayError::UnsupportedMethod(
"the array is not exclusively sharded".to_string(),
))
}
}

fn retrieve_encoded_inner_chunk(
&self,
cache: &ArrayShardedReadableExtCache,
inner_chunk_indices: &[u64],
) -> Result<Option<Vec<u8>>, ArrayError> {
if cache.array_is_exclusively_sharded() {
let (shard_indices, chunk_indices) =
inner_chunk_shard_index_and_chunk_index(self, cache, inner_chunk_indices)?;
let partial_decoder = cache.retrieve(self, &shard_indices)?;
let MaybeShardingPartialDecoder::Sharding(partial_decoder) = partial_decoder else {
unreachable!("exlusively sharded")
};
// TODO: trait upcasting
// let partial_decoder: Arc<dyn Any + Send + Sync> = partial_decoder.clone();
// let partial_decoder = partial_decoder
// .downcast::<ShardingPartialDecoder>()
// .expect("array is exclusively sharded");

Ok(partial_decoder
.retrieve_inner_chunk_encoded(&chunk_indices)?
.map(Vec::from))
} else {
Err(ArrayError::UnsupportedMethod(
"the array is not exclusively sharded".to_string(),
))
}
}

fn retrieve_inner_chunk_opt(
&self,
cache: &ArrayShardedReadableExtCache,
inner_chunk_indices: &[u64],
options: &CodecOptions,
) -> Result<ArrayBytes<'_>, ArrayError> {
if cache.array_is_sharded() {
let array_subset = cache
.inner_chunk_grid()
.subset(inner_chunk_indices, self.shape())?
.ok_or_else(|| {
ArrayError::InvalidChunkGridIndicesError(inner_chunk_indices.to_vec())
})?;
let shards = self.chunks_in_array_subset(&array_subset)?.ok_or_else(|| {
ArrayError::InvalidChunkGridIndicesError(inner_chunk_indices.to_vec())
})?;
if shards.num_elements() != 1 {
// This should not happen, but it is checked just in case.
return Err(ArrayError::InvalidChunkGridIndicesError(
inner_chunk_indices.to_vec(),
));
}
let shard_indices = shards.start();
let shard_origin = self.chunk_origin(shard_indices)?;
let shard_subset = array_subset.relative_to(&shard_origin)?;

let partial_decoder = cache.retrieve(self, shard_indices)?;
let (shard_indices, shard_subset) =
inner_chunk_shard_index_and_subset(self, cache, inner_chunk_indices)?;
let partial_decoder = cache.retrieve(self, &shard_indices)?;
let bytes = partial_decoder
.partial_decode(&[shard_subset], options)?
.remove(0)
Expand Down Expand Up @@ -614,6 +804,21 @@ mod tests {
assert_eq!(compare, test);
assert_eq!(cache.len(), 4);
}

let encoded_inner_chunk = array
.retrieve_encoded_inner_chunk(&cache, &[0, 0])?
.unwrap();
assert_eq!(
array
.inner_chunk_byte_range(&cache, &[0, 0])?
.unwrap()
.length(u64::MAX),
encoded_inner_chunk.len() as u64
);
// assert_eq!(
// u16::from_array_bytes(array.data_type(), encoded_inner_chunk.into())?,
// array.retrieve_chunk_elements::<u16>(&[0, 0])?
// );
} else {
assert_eq!(array.inner_chunk_shape(), None);
assert_eq!(
Expand Down Expand Up @@ -642,6 +847,9 @@ mod tests {
)?;
assert_eq!(compare, test);
assert!(cache.is_empty());

assert!(array.retrieve_encoded_inner_chunk(&cache, &[0, 0]).is_err());
assert!(array.inner_chunk_byte_range(&cache, &[0, 0]).is_err());
}

Ok(())
Expand Down
1 change: 1 addition & 0 deletions zarrs/src/array/codec/array_to_bytes/sharding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub use crate::metadata::v3::array::codec::sharding::{

pub use sharding_codec::ShardingCodec;
pub use sharding_codec_builder::ShardingCodecBuilder;
pub(crate) use sharding_partial_decoder::ShardingPartialDecoder;

use crate::{
array::{
Expand Down
Loading

0 comments on commit 6a8baed

Please sign in to comment.