Skip to content

Commit

Permalink
feat: add ShardingCodecBuilder
Browse files Browse the repository at this point in the history
  • Loading branch information
LDeakin committed Oct 17, 2023
1 parent 19ec126 commit aeb0f99
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 43 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added `parallel_chunks` option to `Array`, enabled by default. Lets `store_array_subset` and `retrieve_array_subset` (and their variants) encode/decode chunks in parallel
- Added experimental `zfp` codec implementation behind `zfp` feature flag (disabled by default)
- Added experimental `bitround` codec implementation behind `bitround` feature flag (disabled by default)
- Added `ShardingCodecBuilder`

### Changed
- **Breaking**: `array::data_type::DataType` is now marked `#[non_exhaustive]`
Expand Down
22 changes: 3 additions & 19 deletions benches/array_uncompressed.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use zarrs::array::{
codec::{self, ShardingCodec},
codec::{self, array_to_bytes::sharding::ShardingCodecBuilder, ShardingCodec},
CodecChain,
};

Expand Down Expand Up @@ -37,15 +37,7 @@ fn array_write_all_sharded(c: &mut Criterion) {
group.bench_with_input(BenchmarkId::from_parameter(size), size, |b, &size| {
b.iter(|| {
let store = zarrs::storage::store::MemoryStore::new();
let inner_codecs =
CodecChain::new(vec![], Box::new(codec::BytesCodec::default()), vec![]);
let index_codecs =
CodecChain::new(vec![], Box::new(codec::BytesCodec::default()), vec![]);
let sharding_codec = Box::new(ShardingCodec::new(
vec![32; 3].into(),
inner_codecs,
index_codecs,
));
let sharding_codec = Box::new(ShardingCodecBuilder::new(vec![32; 3]).build());
let array = zarrs::array::ArrayBuilder::new(
vec![size; 3],
zarrs::array::DataType::UInt8,
Expand Down Expand Up @@ -101,15 +93,7 @@ fn array_read_all_sharded(c: &mut Criterion) {
group.bench_with_input(BenchmarkId::from_parameter(size), size, |b, &size| {
// Write the data
let store = zarrs::storage::store::MemoryStore::new();
let inner_codecs =
CodecChain::new(vec![], Box::new(codec::BytesCodec::default()), vec![]);
let index_codecs =
CodecChain::new(vec![], Box::new(codec::BytesCodec::default()), vec![]);
let sharding_codec = Box::new(ShardingCodec::new(
vec![32; 3].into(),
inner_codecs,
index_codecs,
));
let sharding_codec = Box::new(ShardingCodecBuilder::new(vec![32; 3]).build());
let array = zarrs::array::ArrayBuilder::new(
vec![size; 3],
zarrs::array::DataType::UInt8,
Expand Down
32 changes: 9 additions & 23 deletions examples/sharded_array_write_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
fn sharded_array_write_read() -> Result<(), Box<dyn std::error::Error>> {
use zarrs::{
array::{
codec::{self, ShardingCodec},
CodecChain, DataType, FillValue,
codec::{self, array_to_bytes::sharding::ShardingCodecBuilder},
DataType, FillValue,
},
array_subset::ArraySubset,
node::Node,
Expand Down Expand Up @@ -33,33 +33,19 @@ fn sharded_array_write_read() -> Result<(), Box<dyn std::error::Error>> {

// Create an array
let array_path = "/group/array";
let inner_codecs = CodecChain::new(
vec![],
Box::new(codec::BytesCodec::little()),
vec![
#[cfg(feature = "gzip")]
Box::new(codec::GzipCodec::new(5)?),
],
);
let index_codecs = CodecChain::new(
vec![],
Box::new(codec::BytesCodec::little()),
vec![
#[cfg(feature = "crc32c")]
Box::new(codec::Crc32cCodec::new()),
],
);
let mut sharding_codec_builder = ShardingCodecBuilder::new(vec![4, 4]);
sharding_codec_builder.bytes_to_bytes_codecs(vec![
#[cfg(feature = "gzip")]
Box::new(codec::GzipCodec::new(5)?),
]);

let array = zarrs::array::ArrayBuilder::new(
vec![8, 8], // array shape
DataType::UInt16,
vec![4, 8].into(), // shard shape,
FillValue::from(0u16),
)
.array_to_bytes_codec(Box::new(ShardingCodec::new(
vec![4, 4], // inner chunk shape
inner_codecs,
index_codecs,
)))
.array_to_bytes_codec(Box::new(sharding_codec_builder.build()))
.dimension_names(vec!["y".into(), "x".into()])
.storage_transformers(vec![])
.build(store.clone(), array_path)?;
Expand Down
1 change: 1 addition & 0 deletions src/array/array_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use super::{
/// # Ok(())
/// # }
/// ```
#[derive(Debug)]
pub struct ArrayBuilder {
shape: ArrayShape,
data_type: DataType,
Expand Down
10 changes: 10 additions & 0 deletions src/array/codec/array_to_bytes/sharding.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
//! The sharding `array->bytes` codec.
//!
//! Sharding logically splits chunks (shards) into sub-chunks (inner chunks) that can be individually compressed and accessed.
//! This allows to colocate multiple chunks within one storage object, bundling them in shards.
//!
//! See <https://zarr-specs.readthedocs.io/en/latest/v3/codecs/sharding-indexed/v1.0.html>.
//!
//! This codec requires the `sharding` feature, which is enabled by default.
//!
//! See [`ShardingCodecConfigurationV1`] for example `JSON` metadata.
//! The [`ShardingCodecBuilder`] can help with creating a [`ShardingCodec`].
mod sharding_codec;
mod sharding_codec_builder;
mod sharding_configuration;
mod sharding_partial_decoder;

pub use sharding_configuration::{ShardingCodecConfiguration, ShardingCodecConfigurationV1};

pub use sharding_codec::ShardingCodec;
pub use sharding_codec_builder::ShardingCodecBuilder;
use thiserror::Error;

use crate::array::{
Expand Down
112 changes: 112 additions & 0 deletions src/array/codec/array_to_bytes/sharding/sharding_codec_builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
use codec::CodecChain;

use crate::array::codec::{
self, ArrayToArrayCodecTraits, ArrayToBytesCodecTraits, BytesToBytesCodecTraits,
};

use super::ShardingCodec;

/// A [`ShardingCodec`] builder.
///
/// By default, both the inner chunks and the index are encoded with the `bytes` codec with native endian encoding.
/// The index is additionally encoded with the `crc32c checksum` codec (if supported).
///
/// Use the methods in the sharding codec builder to change the configuration away from these defaults, and then build the sharding codec with [`build`](ShardingCodecBuilder::build).
#[derive(Debug)]
pub struct ShardingCodecBuilder {
inner_chunk_shape: Vec<u64>,
index_array_to_bytes_codec: Box<dyn ArrayToBytesCodecTraits>,
index_bytes_to_bytes_codecs: Vec<Box<dyn BytesToBytesCodecTraits>>,
array_to_array_codecs: Vec<Box<dyn ArrayToArrayCodecTraits>>,
array_to_bytes_codec: Box<dyn ArrayToBytesCodecTraits>,
bytes_to_bytes_codecs: Vec<Box<dyn BytesToBytesCodecTraits>>,
}

impl ShardingCodecBuilder {
/// Create a new sharding codec builder.
#[must_use]
pub fn new(inner_chunk_shape: Vec<u64>) -> Self {
Self {
inner_chunk_shape,
index_array_to_bytes_codec: Box::<codec::BytesCodec>::default(),
index_bytes_to_bytes_codecs: vec![
#[cfg(feature = "crc32c")]
Box::new(codec::Crc32cCodec::new()),
],
array_to_array_codecs: Vec::default(),
array_to_bytes_codec: Box::<codec::BytesCodec>::default(),
bytes_to_bytes_codecs: Vec::default(),
}
}

/// Set the index array to bytes codec.
///
/// If left unmodified, the index will be encoded with the `bytes` codec with native endian encoding.
pub fn index_array_to_bytes_codec(
&mut self,
index_array_to_bytes_codec: Box<dyn ArrayToBytesCodecTraits>,
) -> &mut Self {
self.index_array_to_bytes_codec = index_array_to_bytes_codec;
self
}

/// Set the index bytes to bytes codecs.
///
/// If left unmodified, the index will be encoded with the `crc32c checksum` codec (if supported).
pub fn index_bytes_to_bytes_codecs(
&mut self,
index_bytes_to_bytes_codecs: Vec<Box<dyn BytesToBytesCodecTraits>>,
) -> &mut Self {
self.index_bytes_to_bytes_codecs = index_bytes_to_bytes_codecs;
self
}

/// Set the inner chunk array to array codecs.
///
/// If left unmodified, no array to array codecs will be applied for the inner chunks.
pub fn array_to_array_codecs(
&mut self,
array_to_array_codecs: Vec<Box<dyn ArrayToArrayCodecTraits>>,
) -> &mut Self {
self.array_to_array_codecs = array_to_array_codecs;
self
}

/// Set the inner chunk array to bytes codec.
///
/// If left unmodified, the inner chunks will be encoded with the `bytes` codec with native endian encoding.
pub fn array_to_bytes_codec(
&mut self,
array_to_bytes_codec: Box<dyn ArrayToBytesCodecTraits>,
) -> &mut Self {
self.array_to_bytes_codec = array_to_bytes_codec;
self
}

/// Set the inner chunk bytes to bytes codecs.
///
/// If left unmodified, no bytes to bytes codecs will be applied for the inner chunks.
pub fn bytes_to_bytes_codecs(
&mut self,
bytes_to_bytes_codecs: Vec<Box<dyn BytesToBytesCodecTraits>>,
) -> &mut Self {
self.bytes_to_bytes_codecs = bytes_to_bytes_codecs;
self
}

/// Build into a [`ShardingCodec`].
#[must_use]
pub fn build(&self) -> ShardingCodec {
let inner_codecs = CodecChain::new(
self.array_to_array_codecs.clone(),
self.array_to_bytes_codec.clone(),
self.bytes_to_bytes_codecs.clone(),
);
let index_codecs = CodecChain::new(
vec![],
self.index_array_to_bytes_codec.clone(),
self.index_bytes_to_bytes_codecs.clone(),
);
ShardingCodec::new(self.inner_chunk_shape.clone(), inner_codecs, index_codecs)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,39 @@ pub enum ShardingCodecConfiguration {
/// Sharding codec configuration parameters.
///
/// See <https://zarr-specs.readthedocs.io/en/latest/v3/codecs/sharding-indexed/v1.0.html#configuration-parameters>.
///
/// ### Example sharding codec configuration
/// ```rust
/// # let JSON = r#"
/// {
/// "chunk_shape": [32, 32, 32],
/// "codecs": [
/// {
/// "name": "endian",
/// "configuration": {
/// "endian": "little"
/// }
/// },
/// {
/// "name": "gzip",
/// "configuration": {
/// "level": 1
/// }
/// }
/// ],
/// "index_codecs": [
/// {
/// "name": "endian",
/// "configuration": {
/// "endian": "little"
/// }
/// },
/// { "name": "crc32c" }
/// ]
/// }
/// # "#;
/// # let configuration: zarrs::array::codec::ShardingCodecConfigurationV1 = serde_json::from_str(JSON).unwrap();
/// ```
#[derive(Serialize, Deserialize, Clone, Eq, PartialEq, Debug, Display)]
#[serde(deny_unknown_fields)]
#[display(fmt = "{}", "serde_json::to_string(self).unwrap_or_default()")]
Expand All @@ -22,7 +55,7 @@ pub struct ShardingCodecConfigurationV1 {
pub chunk_shape: Vec<u64>,
/// A list of codecs to be used for encoding and decoding inner chunks.
pub codecs: Vec<Metadata>,
/// A list of codecs to be used for encoding and decoding shard index.
/// A list of codecs to be used for encoding and decoding the shard index.
pub index_codecs: Vec<Metadata>,
}

Expand Down

0 comments on commit aeb0f99

Please sign in to comment.