Skip to content

Commit

Permalink
fix: workaround for trait upcasting
Browse files Browse the repository at this point in the history
  • Loading branch information
LDeakin committed Jan 1, 2025
1 parent d56840d commit bac776f
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 24 deletions.
93 changes: 75 additions & 18 deletions zarrs/src/array/array_sync_sharded_readable_ext.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use std::any::Any;
use std::{collections::HashMap, sync::Arc};

use rayon::iter::{IntoParallelIterator, ParallelIterator};
use rayon_iter_concurrent_limit::iter_concurrent_limit;
use unsafe_cell_slice::UnsafeCellSlice;
use zarrs_metadata::v3::array::codec::sharding::ShardingCodecConfiguration;
use zarrs_storage::byte_range::ByteRange;
use zarrs_storage::StorageHandle;

use super::array_bytes::{merge_chunks_vlen, update_bytes_flen};
use super::codec::array_to_bytes::sharding::ShardingPartialDecoder;
use super::codec::{CodecError, ShardingCodec};
use super::element::ElementOwned;
use super::{
codec::CodecOptions, concurrency::concurrency_chunks_and_codec, Array, ArrayError,
Expand All @@ -19,7 +20,29 @@ use crate::array::codec::StoragePartialDecoder;
use crate::storage::ReadableStorageTraits;
use crate::{array::codec::ArrayPartialDecoderTraits, array_subset::ArraySubset};

type PartialDecoderHashMap = HashMap<Vec<u64>, Arc<dyn ArrayPartialDecoderTraits>>;
// TODO: Remove with trait upcasting
#[derive(Clone)]
enum MaybeShardingPartialDecoder {
Sharding(Arc<ShardingPartialDecoder>),
Other(Arc<dyn ArrayPartialDecoderTraits>),
}

impl MaybeShardingPartialDecoder {
fn partial_decode(
&self,
array_subsets: &[ArraySubset],
options: &CodecOptions,
) -> Result<Vec<ArrayBytes<'_>>, CodecError> {
match self {
Self::Sharding(partial_decoder) => {
partial_decoder.partial_decode(array_subsets, options)
}
Self::Other(partial_decoder) => partial_decoder.partial_decode(array_subsets, options),

Check warning on line 40 in zarrs/src/array/array_sync_sharded_readable_ext.rs

View check run for this annotation

Codecov / codecov/patch

zarrs/src/array/array_sync_sharded_readable_ext.rs#L40

Added line #L40 was not covered by tests
}
}
}

type PartialDecoderHashMap = HashMap<Vec<u64>, MaybeShardingPartialDecoder>;

/// A cache used for methods in the [`ArrayShardedReadableExt`] trait.
pub struct ArrayShardedReadableExtCache {
Expand Down Expand Up @@ -86,7 +109,7 @@ impl ArrayShardedReadableExtCache {
&self,
array: &Array<TStorage>,
shard_indices: &[u64],
) -> Result<Arc<dyn ArrayPartialDecoderTraits>, ArrayError> {
) -> Result<MaybeShardingPartialDecoder, ArrayError> {
let mut cache = self.cache.lock().unwrap();
if let Some(partial_decoder) = cache.get(shard_indices) {
Ok(partial_decoder.clone())
Expand All @@ -100,21 +123,47 @@ impl ArrayShardedReadableExtCache {
storage_transformer,
array.chunk_key(shard_indices),
));

// --- Workaround for lack off trait upcasting ---
let chunk_representation = array.chunk_array_representation(shard_indices)?;
let partial_decoder = array
let sharding_codec_metadata = array
.codecs()
.array_to_bytes_codec()
.clone()
.partial_decoder(
.create_metadata()
.expect("valid sharding metadata");
let sharding_codec_configuration = sharding_codec_metadata
.to_configuration::<ShardingCodecConfiguration>()
.expect("valid sharding configuration");
let sharding_codec = Arc::new(
ShardingCodec::new_with_configuration(&sharding_codec_configuration).expect(
"supported sharding codec configuration, already instantiated in array",
),
);
let partial_decoder =
MaybeShardingPartialDecoder::Sharding(Arc::new(ShardingPartialDecoder::new(
input_handle,
&chunk_representation,
chunk_representation,
sharding_codec.chunk_shape.clone(),
sharding_codec.inner_codecs.clone(),
&sharding_codec.index_codecs,
sharding_codec.index_location,
&CodecOptions::default(),
)?;
)?));
// // TODO: Trait upcasting
// let partial_decoder = array
// .codecs()
// .array_to_bytes_codec()
// .clone()
// .partial_decoder(
// input_handle,
// &chunk_representation,
// &CodecOptions::default(),
// )?;
cache.insert(shard_indices.to_vec(), partial_decoder.clone());
Ok(partial_decoder)
} else {
let partial_decoder: Arc<dyn ArrayPartialDecoderTraits> =
array.partial_decoder(shard_indices)?;
let partial_decoder =
MaybeShardingPartialDecoder::Other(array.partial_decoder(shard_indices)?);

Check warning on line 166 in zarrs/src/array/array_sync_sharded_readable_ext.rs

View check run for this annotation

Codecov / codecov/patch

zarrs/src/array/array_sync_sharded_readable_ext.rs#L165-L166

Added lines #L165 - L166 were not covered by tests
cache.insert(shard_indices.to_vec(), partial_decoder.clone());
Ok(partial_decoder)
}
Expand Down Expand Up @@ -310,10 +359,14 @@ impl<TStorage: ?Sized + ReadableStorageTraits + 'static> ArrayShardedReadableExt
let (shard_indices, chunk_indices) =
inner_chunk_shard_index_and_chunk_index(self, cache, inner_chunk_indices)?;
let partial_decoder = cache.retrieve(self, &shard_indices)?;
let partial_decoder: Arc<dyn Any + Send + Sync> = partial_decoder.clone();
let partial_decoder = partial_decoder
.downcast::<ShardingPartialDecoder>()
.expect("array is exclusively sharded");
let MaybeShardingPartialDecoder::Sharding(partial_decoder) = partial_decoder else {
unreachable!("exlusively sharded")

Check warning on line 363 in zarrs/src/array/array_sync_sharded_readable_ext.rs

View check run for this annotation

Codecov / codecov/patch

zarrs/src/array/array_sync_sharded_readable_ext.rs#L363

Added line #L363 was not covered by tests
};
// TODO: trait upcasting
// let partial_decoder: Arc<dyn Any + Send + Sync> = partial_decoder.clone();
// let partial_decoder = partial_decoder
// .downcast::<ShardingPartialDecoder>()
// .expect("array is exclusively sharded");

Ok(partial_decoder.inner_chunk_byte_range(&chunk_indices)?)
} else {
Expand All @@ -332,10 +385,14 @@ impl<TStorage: ?Sized + ReadableStorageTraits + 'static> ArrayShardedReadableExt
let (shard_indices, chunk_indices) =
inner_chunk_shard_index_and_chunk_index(self, cache, inner_chunk_indices)?;
let partial_decoder = cache.retrieve(self, &shard_indices)?;
let partial_decoder: Arc<dyn Any + Send + Sync> = partial_decoder.clone();
let partial_decoder = partial_decoder
.downcast::<ShardingPartialDecoder>()
.expect("array is exclusively sharded");
let MaybeShardingPartialDecoder::Sharding(partial_decoder) = partial_decoder else {
unreachable!("exlusively sharded")

Check warning on line 389 in zarrs/src/array/array_sync_sharded_readable_ext.rs

View check run for this annotation

Codecov / codecov/patch

zarrs/src/array/array_sync_sharded_readable_ext.rs#L389

Added line #L389 was not covered by tests
};
// TODO: trait upcasting
// let partial_decoder: Arc<dyn Any + Send + Sync> = partial_decoder.clone();
// let partial_decoder = partial_decoder
// .downcast::<ShardingPartialDecoder>()
// .expect("array is exclusively sharded");

Ok(partial_decoder
.retrieve_inner_chunk_encoded(&chunk_indices)?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ use unsafe_cell_slice::UnsafeCellSlice;
#[derive(Clone, Debug)]
pub struct ShardingCodec {
/// An array of integers specifying the shape of the inner chunks in a shard along each dimension of the outer array.
chunk_shape: ChunkShape,
pub(crate) chunk_shape: ChunkShape,
/// The codecs used to encode and decode inner chunks.
inner_codecs: Arc<CodecChain>,
pub(crate) inner_codecs: Arc<CodecChain>,
/// The codecs used to encode and decode the shard index.
index_codecs: Arc<CodecChain>,
pub(crate) index_codecs: Arc<CodecChain>,
/// Specifies whether the shard index is located at the beginning or end of the file.
index_location: ShardingIndexLocation,
pub(crate) index_location: ShardingIndexLocation,
}

impl ShardingCodec {
Expand Down
2 changes: 0 additions & 2 deletions zarrs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,6 @@
//! Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.
#![cfg_attr(docsrs, feature(doc_auto_cfg))]

#![feature(trait_upcasting)] // FIXME: Remove

pub mod array;
pub mod array_subset;
pub mod config;
Expand Down

0 comments on commit bac776f

Please sign in to comment.