Skip to content

Commit

Permalink
feat: add fletcher32 codec (#123)
Browse files Browse the repository at this point in the history
* feat(metadata): add fletcher32 metadata support

* feat: add fletcher32 codec

* fletcher32 cleanup

* fix(metadata): re-export fletcher32 in v2
  • Loading branch information
LDeakin authored Jan 7, 2025
1 parent cb3e564 commit bf3e1db
Show file tree
Hide file tree
Showing 23 changed files with 563 additions and 32 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Add `Group::[set_]consolidated_metadata`
- Add `Node::consolidate_metadata`
- Consolidated metadata is not currently used to optimise node hierarchy requests
- Add experimental `fletcher32` checksum codec based on the numcodecs implementation
- Adds `fletcher32` feature flag

### Changed
- **Breaking**: Seal `Array` extension traits: `ArraySharded[Readable]Ext` and `ArrayChunkCacheExt`
Expand Down
1 change: 1 addition & 0 deletions zarrs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ bitround = [] # Enable the experimental bitround codec
blosc = ["dep:blosc-sys"] # Enable the blosc codec
bz2 = ["dep:bzip2"] # Enable the experimental bz2 codec
crc32c = ["dep:crc32c"] # Enable the crc32c checksum codec
fletcher32 = [] # Enable the fletcher32 checksum codec
gdeflate = ["dep:gdeflate-sys"] # Enable the experimental gdeflate codec
gzip = ["dep:flate2"] # Enable the gzip codec
pcodec = ["dep:pco"] # Enable the experimental pcodec codec
Expand Down
2 changes: 2 additions & 0 deletions zarrs/doc/status/codecs_experimental.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ This is configurable with [`Config::experimental_codec_names_mut`](config::Confi
| | [vlen-utf8] | <https://codec.zarrs.dev/array_to_bytes/vlen_utf8> | &check; | &check; | |
| Bytes to Bytes | [bz2] | <https://codec.zarrs.dev/bytes_to_bytes/bz2> | &check; | &check; | bz2 |
| | [gdeflate] | <https://codec.zarrs.dev/bytes_to_bytes/gdeflate> | &check; | | gdeflate |
| | [fletcher32] | <https://codec.zarrs.dev/bytes_to_bytes/fletcher32> | &check; | &check; | fletcher32 |

[bitround]: (crate::array::codec::array_to_array::bitround)
[zfp]: crate::array::codec::array_to_bytes::zfp
Expand All @@ -23,3 +24,4 @@ This is configurable with [`Config::experimental_codec_names_mut`](config::Confi
[vlen-utf8]: crate::array::codec::array_to_bytes::vlen_utf8
[bz2]: crate::array::codec::bytes_to_bytes::bz2
[gdeflate]: crate::array::codec::bytes_to_bytes::gdeflate
[fletcher32]: crate::array::codec::bytes_to_bytes::fletcher32
5 changes: 5 additions & 0 deletions zarrs/src/array/codec/bytes_to_bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ pub mod blosc;
pub mod bz2;
#[cfg(feature = "crc32c")]
pub mod crc32c;
#[cfg(feature = "fletcher32")]
pub mod fletcher32;
#[cfg(feature = "gdeflate")]
pub mod gdeflate;
#[cfg(feature = "gzip")]
Expand All @@ -15,3 +17,6 @@ pub mod zstd;

#[cfg(test)]
pub mod test_unbounded;

#[cfg(any(feature = "crc32c", feature = "fletcher32"))]
mod strip_suffix_partial_decoder;
1 change: 0 additions & 1 deletion zarrs/src/array/codec/bytes_to_bytes/crc32c.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
//! See <https://zarr-specs.readthedocs.io/en/latest/v3/codecs/crc32c/v1.0.html>.
mod crc32c_codec;
mod crc32c_partial_decoder;

use std::sync::Arc;

Expand Down
23 changes: 13 additions & 10 deletions zarrs/src/array/codec/bytes_to_bytes/crc32c/crc32c_codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{borrow::Cow, sync::Arc};
use crate::{
array::{
codec::{
bytes_to_bytes::strip_suffix_partial_decoder::StripSuffixPartialDecoder,
BytesPartialDecoderTraits, BytesPartialEncoderDefault, BytesPartialEncoderTraits,
BytesToBytesCodecTraits, CodecError, CodecOptions, CodecTraits, RecommendedConcurrency,
},
Expand All @@ -14,10 +15,10 @@ use crate::{
#[cfg(feature = "async")]
use crate::array::codec::AsyncBytesPartialDecoderTraits;

use super::{
crc32c_partial_decoder, Crc32cCodecConfiguration, Crc32cCodecConfigurationV1, CHECKSUM_SIZE,
IDENTIFIER,
};
#[cfg(feature = "async")]
use crate::array::codec::bytes_to_bytes::strip_suffix_partial_decoder::AsyncStripSuffixPartialDecoder;

use super::{Crc32cCodecConfiguration, Crc32cCodecConfigurationV1, CHECKSUM_SIZE, IDENTIFIER};

/// A `crc32c` (CRC32C checksum) codec implementation.
#[derive(Clone, Debug, Default)]
Expand Down Expand Up @@ -106,8 +107,9 @@ impl BytesToBytesCodecTraits for Crc32cCodec {
_decoded_representation: &BytesRepresentation,
_options: &CodecOptions,
) -> Result<Arc<dyn BytesPartialDecoderTraits>, CodecError> {
Ok(Arc::new(crc32c_partial_decoder::Crc32cPartialDecoder::new(
Ok(Arc::new(StripSuffixPartialDecoder::new(
input_handle,
CHECKSUM_SIZE,
)))
}

Expand All @@ -133,9 +135,10 @@ impl BytesToBytesCodecTraits for Crc32cCodec {
_decoded_representation: &BytesRepresentation,
_options: &CodecOptions,
) -> Result<Arc<dyn AsyncBytesPartialDecoderTraits>, CodecError> {
Ok(Arc::new(
crc32c_partial_decoder::AsyncCrc32cPartialDecoder::new(input_handle),
))
Ok(Arc::new(AsyncStripSuffixPartialDecoder::new(
input_handle,
CHECKSUM_SIZE,
)))
}

fn compute_encoded_size(
Expand All @@ -144,10 +147,10 @@ impl BytesToBytesCodecTraits for Crc32cCodec {
) -> BytesRepresentation {
match decoded_representation {
BytesRepresentation::FixedSize(size) => {
BytesRepresentation::FixedSize(size + core::mem::size_of::<u32>() as u64)
BytesRepresentation::FixedSize(size + CHECKSUM_SIZE as u64)
}
BytesRepresentation::BoundedSize(size) => {
BytesRepresentation::BoundedSize(size + core::mem::size_of::<u32>() as u64)
BytesRepresentation::BoundedSize(size + CHECKSUM_SIZE as u64)
}
BytesRepresentation::UnboundedSize => BytesRepresentation::UnboundedSize,
}
Expand Down
189 changes: 189 additions & 0 deletions zarrs/src/array/codec/bytes_to_bytes/fletcher32.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
//! The `fletcher32` bytes to bytes codec.
//!
//! Appends a fletcher32 checksum of the input bytestream.
//!
//! This is based on the `numcodecs` implementation.
//! See <https://numcodecs.readthedocs.io/en/latest/checksum32.html#fletcher32>.
//!
//! <div class="warning">
//! This codec is experimental and may be incompatible with other Zarr V3 implementations.
//! </div>
//!
//! This codec requires the `fletcher32` feature, which is disabled by default.
//!
//! See [`Fletcher32CodecConfigurationV1`] for example `JSON` metadata.
mod fletcher32_codec;

use std::sync::Arc;

pub use crate::metadata::v3::array::codec::fletcher32::{
Fletcher32CodecConfiguration, Fletcher32CodecConfigurationV1,
};
pub use fletcher32_codec::Fletcher32Codec;

use crate::{
array::codec::{Codec, CodecPlugin},
metadata::v3::{array::codec::fletcher32, MetadataV3},
plugin::{PluginCreateError, PluginMetadataInvalidError},
};

pub use fletcher32::IDENTIFIER;

// Register the codec.
inventory::submit! {
CodecPlugin::new(IDENTIFIER, is_name_fletcher32, create_codec_fletcher32)
}

fn is_name_fletcher32(name: &str) -> bool {
name.eq(IDENTIFIER)
}

pub(crate) fn create_codec_fletcher32(metadata: &MetadataV3) -> Result<Codec, PluginCreateError> {
let configuration = metadata
.to_configuration()
.map_err(|_| PluginMetadataInvalidError::new(IDENTIFIER, "codec", metadata.clone()))?;
let codec = Arc::new(Fletcher32Codec::new_with_configuration(&configuration));
Ok(Codec::BytesToBytes(codec))
}

const CHECKSUM_SIZE: usize = core::mem::size_of::<u32>();

#[cfg(test)]
mod tests {
use std::{borrow::Cow, sync::Arc};

use crate::{
array::{
codec::{BytesToBytesCodecTraits, CodecOptions, CodecTraits},
BytesRepresentation,
},
byte_range::ByteRange,
};

use super::*;

const JSON1: &str = r#"{}"#;

#[test]
fn codec_fletcher32_configuration_none() {
let codec_configuration: Fletcher32CodecConfiguration =
serde_json::from_str(r#"{}"#).unwrap();
let codec = Fletcher32Codec::new_with_configuration(&codec_configuration);
let metadata = codec.create_metadata().unwrap();
assert_eq!(
serde_json::to_string(&metadata).unwrap(),
r#"{"name":"fletcher32"}"#
);
}

#[test]
fn codec_fletcher32() {
let elements: Vec<u8> = (0..6).collect();
let bytes = elements;
let bytes_representation = BytesRepresentation::FixedSize(bytes.len() as u64);

let codec_configuration: Fletcher32CodecConfiguration =
serde_json::from_str(JSON1).unwrap();
let codec = Fletcher32Codec::new_with_configuration(&codec_configuration);

let encoded = codec
.encode(Cow::Borrowed(&bytes), &CodecOptions::default())
.unwrap();
let decoded = codec
.decode(
encoded.clone(),
&bytes_representation,
&CodecOptions::default(),
)
.unwrap();
assert_eq!(bytes, decoded.to_vec());

// Check that the checksum is correct
let checksum: &[u8; 4] = &encoded
[encoded.len() - core::mem::size_of::<u32>()..encoded.len()]
.try_into()
.unwrap();
println!("checksum {checksum:?}");
assert_eq!(checksum, &[9, 6, 14, 8]); // TODO: CHECK
}

#[test]
fn codec_fletcher32_partial_decode() {
let elements: Vec<u8> = (0..32).collect();
let bytes = elements;
let bytes_representation = BytesRepresentation::FixedSize(bytes.len() as u64);

let codec_configuration: Fletcher32CodecConfiguration =
serde_json::from_str(JSON1).unwrap();
let codec = Arc::new(Fletcher32Codec::new_with_configuration(
&codec_configuration,
));

let encoded = codec
.encode(Cow::Owned(bytes), &CodecOptions::default())
.unwrap();
let decoded_regions = [ByteRange::FromStart(3, Some(2))];
let input_handle = Arc::new(std::io::Cursor::new(encoded));
let partial_decoder = codec
.partial_decoder(
input_handle,
&bytes_representation,
&CodecOptions::default(),
)
.unwrap();
let decoded_partial_chunk = partial_decoder
.partial_decode(&decoded_regions, &CodecOptions::default())
.unwrap()
.unwrap();
let answer: &[Vec<u8>] = &[vec![3, 4]];
assert_eq!(
answer,
decoded_partial_chunk
.into_iter()
.map(|v| v.to_vec())
.collect::<Vec<_>>()
);
}

#[cfg(feature = "async")]
#[tokio::test]
async fn codec_fletcher32_async_partial_decode() {
let elements: Vec<u8> = (0..32).collect();
let bytes = elements;
let bytes_representation = BytesRepresentation::FixedSize(bytes.len() as u64);

let codec_configuration: Fletcher32CodecConfiguration =
serde_json::from_str(JSON1).unwrap();
let codec = Arc::new(Fletcher32Codec::new_with_configuration(
&codec_configuration,
));

let encoded = codec
.encode(Cow::Owned(bytes), &CodecOptions::default())
.unwrap();
let decoded_regions = [ByteRange::FromStart(3, Some(2))];
let input_handle = Arc::new(std::io::Cursor::new(encoded));
let partial_decoder = codec
.async_partial_decoder(
input_handle,
&bytes_representation,
&CodecOptions::default(),
)
.await
.unwrap();
let decoded_partial_chunk = partial_decoder
.partial_decode(&decoded_regions, &CodecOptions::default())
.await
.unwrap()
.unwrap();
let answer: &[Vec<u8>] = &[vec![3, 4]];
assert_eq!(
answer,
decoded_partial_chunk
.into_iter()
.map(|v| v.to_vec())
.collect::<Vec<_>>()
);
}
}
Loading

0 comments on commit bf3e1db

Please sign in to comment.