Skip to content

Commit

Permalink
feat: add experimental zfp codec implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
LDeakin committed Oct 15, 2023
1 parent 17d54cb commit a48d92a
Show file tree
Hide file tree
Showing 15 changed files with 849 additions and 10 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- run: sudo apt update && sudo apt install -y libblosc-dev
- run: sudo apt update && sudo apt install -y libblosc-dev cmake clang-15
- uses: dtolnay/rust-toolchain@stable
- run: rustup component add rustfmt clippy
- uses: Swatinem/rust-cache@v2
- run: cargo build # zarss-ffi tests need build
- run: cargo test
- run: cargo build --all-features
- run: cargo test --all-features
- run: cargo check # default features
- run: cargo check --no-default-features
- run: cargo fmt --all -- --check
- run: cargo clippy -- -D warnings

7 changes: 4 additions & 3 deletions BUILD.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ cargo build --release
## Testing
```bash
# Must have no warnings/errors to pass CI
cargo build && \
cargo test && \
cargo doc && \
cargo build --all-features && \
cargo test --all-features && \
cargo doc --all-features && \
cargo fmt --all -- --check && \
cargo clippy -- -D warnings && \
cargo check && \
cargo check --no-default-features
```

Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added parallel encoding/decoding to tests
- Added `array_subset::ArrayStoreBytesError`, `store_bytes`, and `store_bytes_unchecked`
- Added `parallel_chunks` option to `Array`, enabled by default. Lets `store_array_subset` and `retrieve_array_subset` (and their variants) encode/decode chunks in parallel
- Added experimental `zfp` codec implementation behind `zfp` feature flag (disabled by default)

### Changed
- **Breaking**: `array::data_type::DataType` is now marked `#[non_exhaustive]`
Expand Down
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@ blosc = ["dep:blosc-sys"]
gzip = ["dep:flate2"]
sharding = []
crc32c = ["dep:crc32fast"]
zfp = ["dep:zfp-sys"]
zstd = ["dep:zstd"]
# Stores
http = ["dep:reqwest", "dep:url"]
zip = ["dep:zip"]
# Utilities
ndarray = ["dep:ndarray"] # Adds ndarray utility functions to Array

[package.metadata.docs.rs]
all-features = true

[lib]
crate-type = ["lib"]
bench = false
Expand All @@ -50,6 +54,7 @@ serde_json = { version = "1.0.71", features = ["preserve_order"] }
thiserror = "1.0.7"
url = { version = "2", optional = true }
walkdir = "2.3.2"
zfp-sys = {version = "0.1.4", features = ["static"], optional = true }
zip = { version = "0.6", optional = true }
zstd = { version = "0.12", optional = true }

Expand Down
9 changes: 8 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
FROM rust:latest

RUN apt update
RUN apt install -y cmake clang-15

WORKDIR /usr/src/zarrs
COPY . .

RUN cargo build && cargo test && cargo doc
RUN cargo build --all-features && \
cargo test --all-features && \
cargo doc --all-features && \
cargo check && \
cargo check --no-default-features
3 changes: 3 additions & 0 deletions src/array/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@ pub use array_to_array::transpose::{
pub use array_to_bytes::sharding::{
ShardingCodec, ShardingCodecConfiguration, ShardingCodecConfigurationV1,
};
#[cfg(feature = "zfp")]
pub use array_to_bytes::zfp::{ZfpCodec, ZfpCodecConfiguration, ZfpCodecConfigurationV1};
pub use array_to_bytes::{
bytes::{BytesCodec, BytesCodecConfiguration, BytesCodecConfigurationV1},
codec_chain::CodecChain,
};
// pub use array_to_bytes::zfp::{ZfpCodec, ZfpCodecConfiguration, ZfpCodecConfigurationV1};

#[cfg(feature = "blosc")]
pub use bytes_to_bytes::blosc::{BloscCodec, BloscCodecConfiguration, BloscCodecConfigurationV1};
Expand Down
3 changes: 3 additions & 0 deletions src/array/codec/array_to_bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,6 @@ pub mod codec_chain;

#[cfg(feature = "sharding")]
pub mod sharding;

#[cfg(feature = "zfp")]
pub mod zfp;
203 changes: 203 additions & 0 deletions src/array/codec/array_to_bytes/zfp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
//! The zfp `array->bytes` codec.
//!
//! [Zfp](https://zfp.io/) is a compressed number format for 1D to 4D arrays of 32/64-bit floating point or integer data.
//!
//! This codec requires the `zfp` feature, which is disabled by default.
//!
//! See [`ZfpCodecConfigurationV1`] for example `JSON` metadata.
//!
mod zfp_bitstream;
mod zfp_codec;
mod zfp_configuration;
mod zfp_field;
mod zfp_partial_decoder;
mod zfp_stream;

use serde::{Deserialize, Serialize};

pub use zfp_codec::ZfpCodec;
pub use zfp_configuration::{
ZfpCodecConfiguration, ZfpCodecConfigurationV1, ZfpExpertConfiguration,
ZfpFixedAccuracyConfiguration, ZfpFixedPrecisionConfiguration, ZfpFixedRateConfiguration,
};

use zfp_sys::{
zfp_decompress, zfp_stream_rewind, zfp_stream_set_bit_stream, zfp_type,
zfp_type_zfp_type_double, zfp_type_zfp_type_float, zfp_type_zfp_type_int32,
zfp_type_zfp_type_int64,
};

use crate::array::{codec::CodecError, ArrayRepresentation, DataType};

use self::{zfp_bitstream::ZfpBitstream, zfp_field::ZfpField, zfp_stream::ZfpStream};

/// The zfp mode.
#[derive(Clone, Copy, Debug)]
pub enum ZfpMode {
/// Expert mode.
Expert(ZfpExpertParams),
/// Fixed rate mode.
FixedRate(f64),
/// Fixed precision mode.
FixedPrecision(u32),
/// Fixed accuracy mode.
FixedAccuracy(f64),
/// Reversible mode.
Reversible,
}

/// Zfp expert parameters.
#[derive(Serialize, Deserialize, Clone, Copy, Debug, Eq, PartialEq)]
pub struct ZfpExpertParams {
/// The minimum number of compressed bits used to represent a block.
///
/// Usually this parameter equals one bit, unless each and every block is to be stored using a fixed number of bits to facilitate random access, in which case it should be set to the same value as `maxbits`.
pub minbits: u32,
/// The maximum number of bits used to represent a block.
///
/// This parameter sets a hard upper bound on compressed block size and governs the rate in fixed-rate mode. It may also be used as an upper storage limit to guard against buffer overruns in combination with the accuracy constraints given by zfp_stream.maxprec and zfp_stream.minexp.
/// Maxbits must be large enough to allow the common block exponent and any control bits to be encoded. This implies maxbits ≥ 9 for single-precision data and maxbits ≥ 12 for double-precision data.
pub maxbits: u32,
/// The maximum number of bit planes encoded.
///
/// This parameter governs the number of most significant uncompressed bits encoded per transform coefficient.
/// It does not directly correspond to the number of uncompressed mantissa bits for the floating-point or integer values being compressed, but is closely related.
/// This is the parameter that specifies the precision in fixed-precision mode, and it provides a mechanism for controlling the relative error.
/// Note that this parameter selects how many bits planes to encode regardless of the magnitude of the common floating-point exponent within the block.
pub maxprec: u32,
/// The smallest absolute bit plane number encoded (applies to floating-point data only; this parameter is ignored for integer data).
///
/// The place value of each transform coefficient bit depends on the common floating-point exponent, $e$, that scales the integer coefficients. If the most significant coefficient bit has place value $2^e$, then the number of bit planes encoded is (one plus) the difference between e and zfp_stream.minexp.
/// This parameter governs the absolute error in fixed-accuracy mode.
pub minexp: i32,
}

fn zarr_data_type_to_zfp_data_type(data_type: &DataType) -> Option<zfp_type> {
match data_type {
DataType::Int32 | DataType::UInt32 => Some(zfp_type_zfp_type_int32),
DataType::Int64 | DataType::UInt64 => Some(zfp_type_zfp_type_int64),
DataType::Float32 => Some(zfp_type_zfp_type_float),
DataType::Float64 => Some(zfp_type_zfp_type_double),
_ => None,
}
}

fn zfp_decode(
zfp_mode: &ZfpMode,
mut encoded_value: Vec<u8>,
decoded_representation: &ArrayRepresentation,
) -> Result<Vec<u8>, CodecError> {
let Some(zfp_type) = zarr_data_type_to_zfp_data_type(decoded_representation.data_type()) else {
return Err(CodecError::from(
"data type {} is unsupported for zfp codec",
));
};

let mut decoded_value = vec![0u8; usize::try_from(decoded_representation.size()).unwrap()];
let Some(field) = ZfpField::new(
&mut decoded_value,
zfp_type,
&decoded_representation
.shape()
.iter()
.map(|u| usize::try_from(*u).unwrap())
.collect::<Vec<usize>>(),
) else {
return Err(CodecError::from("failed to create zfp field"));
};
let Some(zfp) = ZfpStream::new(zfp_mode, zfp_type) else {
return Err(CodecError::from("failed to create zfp stream"));
};

let Some(stream) = ZfpBitstream::new(&mut encoded_value) else {
return Err(CodecError::from("failed to create zfp field"));
};
unsafe {
zfp_stream_set_bit_stream(zfp.as_zfp_stream(), stream.as_bitstream());
zfp_stream_rewind(zfp.as_zfp_stream());
}

let ret = unsafe { zfp_decompress(zfp.as_zfp_stream(), field.as_zfp_field()) };
if ret == 0 {
Err(CodecError::from("zfp decompression failed"))
} else {
Ok(decoded_value)
}
}

#[cfg(test)]
mod tests {
use crate::{
array::{
codec::{ArrayCodecTraits, ArrayToBytesCodecTraits},
ArrayRepresentation, DataType,
},
array_subset::ArraySubset,
};

use super::*;

const JSON_VALID: &'static str = r#"{
"mode": "fixedprecision",
"precision": 12
}"#;

#[test]
fn codec_zfp_round_trip1() {
let array_representation =
ArrayRepresentation::new(vec![3, 3, 3], DataType::Float32, 0.0f32.into()).unwrap();
let elements: Vec<f32> = (0..27).map(|i| i as f32).collect();
let bytes = safe_transmute::transmute_to_bytes(&elements).to_vec();

let configuration: ZfpCodecConfiguration = serde_json::from_str(JSON_VALID).unwrap();
let codec = ZfpCodec::new_with_configuration(&configuration);

let encoded = codec.encode(bytes.clone(), &array_representation).unwrap();
let decoded = codec
.decode(encoded.clone(), &array_representation)
.unwrap();

let decoded_elements = safe_transmute::transmute_many_permissive::<f32>(&decoded)
.unwrap()
.to_vec();
assert_eq!(elements, decoded_elements);
}

#[test]
fn codec_zfp_partial_decode() {
let array_representation =
ArrayRepresentation::new(vec![3, 3, 3], DataType::Float32, 0.0f32.into()).unwrap();
let elements: Vec<f32> = (0..27).map(|i| i as f32).collect();
let bytes = safe_transmute::transmute_to_bytes(&elements).to_vec();

let configuration: ZfpCodecConfiguration = serde_json::from_str(JSON_VALID).unwrap();
let codec = ZfpCodec::new_with_configuration(&configuration);

let encoded = codec.encode(bytes.clone(), &array_representation).unwrap();
let decoded_regions = [
ArraySubset::new_with_start_shape((&[0, 0, 0]).to_vec(), (&[1, 2, 3]).to_vec())
.unwrap(),
ArraySubset::new_with_start_shape((&[0, 1, 2]).to_vec(), (&[3, 2, 1]).to_vec())
.unwrap(),
];

let input_handle = Box::new(std::io::Cursor::new(encoded));
let partial_decoder = codec.partial_decoder(input_handle);
let decoded_partial_chunk = partial_decoder
.partial_decode(&array_representation, &decoded_regions)
.unwrap();

let decoded_partial_chunk: Vec<f32> = decoded_partial_chunk
.into_iter()
.flatten()
.collect::<Vec<_>>()
.chunks(std::mem::size_of::<f32>())
.map(|b| f32::from_ne_bytes(b.try_into().unwrap()))
.collect();
let answer: Vec<f32> = vec![
0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 5.0, 8.0, 14.0, 17.0, 23.0, 26.0,
];
assert_eq!(answer, decoded_partial_chunk);
}
}
26 changes: 26 additions & 0 deletions src/array/codec/array_to_bytes/zfp/zfp_bitstream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use std::ptr::NonNull;

use zfp_sys::{bitstream, stream_close, stream_open};

/// A zfp bitstream.
pub struct ZfpBitstream(NonNull<bitstream>);

impl Drop for ZfpBitstream {
fn drop(&mut self) {
unsafe {
stream_close(self.0.as_ptr());
}
}
}

impl ZfpBitstream {
pub fn new(buffer: &mut Vec<u8>) -> Option<Self> {
let stream =
unsafe { stream_open(buffer.as_mut_ptr().cast::<std::ffi::c_void>(), buffer.len()) };
NonNull::new(stream).map(Self)
}

pub fn as_bitstream(&self) -> *mut bitstream {
self.0.as_ptr()
}
}
Loading

0 comments on commit a48d92a

Please sign in to comment.