Skip to content

Commit

Permalink
Optimise bytes codec
Browse files Browse the repository at this point in the history
  • Loading branch information
LDeakin committed Oct 15, 2023
1 parent 94b73f5 commit c48e2d8
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 75 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- **Breaking**: Array subset methods dependent on an `array_shape` now use the `IncompatibleArrayShapeError` error type instead of `IncompatibleDimensionalityError`
- **Breaking**: Various array subset iterators now have validated `new` and unvalidated `new_unchecked` constructors
- Blosc codec: disable parallel encoding/decoding for small inputs/outputs
- Bytes codec: optimise implementation

### Fixed
- Bytes codec handling of complex and raw bits data types
Expand Down
12 changes: 2 additions & 10 deletions benches/codecs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,10 @@ fn codec_bytes(c: &mut Criterion) {

let data = vec![0u8; size3.try_into().unwrap()];
group.throughput(Throughput::Bytes(size3));
group.bench_function(BenchmarkId::new("encode", size3), |b| {
// encode and decode have the same implementation
group.bench_function(BenchmarkId::new("encode_decode", size3), |b| {
b.iter(|| codec.encode(data.clone(), &rep).unwrap());
});
group.bench_function(BenchmarkId::new("decode", size3), |b| {
b.iter(|| codec.decode(data.clone(), &rep).unwrap());
});
group.bench_function(BenchmarkId::new("par_encode", size3), |b| {
b.iter(|| codec.par_encode(data.clone(), &rep).unwrap());
});
group.bench_function(BenchmarkId::new("par_decode", size3), |b| {
b.iter(|| codec.par_decode(data.clone(), &rep).unwrap());
});
}
}

Expand Down
50 changes: 20 additions & 30 deletions src/array/codec/array_to_bytes/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ pub use bytes_configuration::{BytesCodecConfiguration, BytesCodecConfigurationV1
pub use bytes_codec::BytesCodec;

use derive_more::Display;
use rayon::{prelude::ParallelIterator, slice::ParallelSliceMut};

use crate::array::DataType;

Expand Down Expand Up @@ -63,38 +62,29 @@ const NATIVE_ENDIAN: Endianness = Endianness::Big;
#[cfg(target_endian = "little")]
const NATIVE_ENDIAN: Endianness = Endianness::Little;

/// The input length needed to invoke rayon for parallel endianness conversion, if `parallel` is true in `reverse_endianness`.
const MIN_PARALLEL_LENGTH: usize = 4_000_000;

fn reverse_endianness(v: &mut [u8], data_type: &DataType, parallel: bool) {
fn reverse_endianness(v: &mut [u8], data_type: &DataType) {
match data_type {
DataType::Bool | DataType::Int8 | DataType::UInt8 | DataType::RawBits(_) => {}
DataType::Int16
| DataType::Int32
| DataType::Int64
| DataType::UInt16
| DataType::UInt32
| DataType::UInt64
| DataType::Float16
| DataType::Float32
| DataType::Float64
| DataType::BFloat16 => {
if parallel && v.len() >= MIN_PARALLEL_LENGTH {
v.par_chunks_exact_mut(data_type.size())
.for_each(<[u8]>::reverse);
} else {
v.chunks_exact_mut(data_type.size())
.for_each(<[u8]>::reverse);
}
DataType::Int16 | DataType::UInt16 | DataType::Float16 | DataType::BFloat16 => {
let swap = |chunk: &mut [u8]| {
let bytes = u16::from_ne_bytes(chunk.try_into().unwrap());
chunk.copy_from_slice(bytes.swap_bytes().to_ne_bytes().as_slice());
};
v.chunks_exact_mut(2).for_each(swap);
}
DataType::Complex64 | DataType::Complex128 => {
if parallel && v.len() >= MIN_PARALLEL_LENGTH {
v.par_chunks_exact_mut(data_type.size() / 2)
.for_each(<[u8]>::reverse);
} else {
v.chunks_exact_mut(data_type.size() / 2)
.for_each(<[u8]>::reverse);
}
DataType::Int32 | DataType::UInt32 | DataType::Float32 | DataType::Complex64 => {
let swap = |chunk: &mut [u8]| {
let bytes = u32::from_ne_bytes(chunk.try_into().unwrap());
chunk.copy_from_slice(bytes.swap_bytes().to_ne_bytes().as_slice());
};
v.chunks_exact_mut(4).for_each(swap);
}
DataType::Int64 | DataType::UInt64 | DataType::Float64 | DataType::Complex128 => {
let swap = |chunk: &mut [u8]| {
let bytes = u64::from_ne_bytes(chunk.try_into().unwrap());
chunk.copy_from_slice(bytes.swap_bytes().to_ne_bytes().as_slice());
};
v.chunks_exact_mut(8).for_each(swap);
}
}
}
Expand Down
23 changes: 3 additions & 20 deletions src/array/codec/array_to_bytes/bytes/bytes_codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ impl BytesCodec {
&self,
mut value: Vec<u8>,
decoded_representation: &ArrayRepresentation,
parallel: bool,
) -> Result<Vec<u8>, CodecError> {
if value.len() as u64 != decoded_representation.size() {
return Err(CodecError::UnexpectedChunkDecodedSize(
Expand All @@ -94,7 +93,7 @@ impl BytesCodec {

if let Some(endian) = &self.endian {
if !endian.is_native() {
reverse_endianness(&mut value, decoded_representation.data_type(), parallel);
reverse_endianness(&mut value, decoded_representation.data_type());
}
}
Ok(value)
Expand Down Expand Up @@ -124,31 +123,15 @@ impl ArrayCodecTraits for BytesCodec {
decoded_value: Vec<u8>,
decoded_representation: &ArrayRepresentation,
) -> Result<Vec<u8>, CodecError> {
self.do_encode_or_decode(decoded_value, decoded_representation, false)
}

fn par_encode(
&self,
decoded_value: Vec<u8>,
decoded_representation: &ArrayRepresentation,
) -> Result<Vec<u8>, CodecError> {
self.do_encode_or_decode(decoded_value, decoded_representation, true)
self.do_encode_or_decode(decoded_value, decoded_representation)
}

fn decode(
&self,
encoded_value: Vec<u8>,
decoded_representation: &ArrayRepresentation,
) -> Result<Vec<u8>, CodecError> {
self.do_encode_or_decode(encoded_value, decoded_representation, false)
}

fn par_decode(
&self,
encoded_value: Vec<u8>,
decoded_representation: &ArrayRepresentation,
) -> Result<Vec<u8>, CodecError> {
self.do_encode_or_decode(encoded_value, decoded_representation, true)
self.do_encode_or_decode(encoded_value, decoded_representation)
}
}

Expand Down
17 changes: 2 additions & 15 deletions src/array/codec/array_to_bytes/bytes/bytes_partial_decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ impl<'a> BytesPartialDecoder<'a> {
&self,
decoded_representation: &ArrayRepresentation,
decoded_regions: &[ArraySubset],
parallel: bool,
) -> Result<Vec<Vec<u8>>, CodecError> {
let mut bytes = Vec::with_capacity(decoded_regions.len());
for array_subset in decoded_regions {
Expand All @@ -55,11 +54,7 @@ impl<'a> BytesPartialDecoder<'a> {
let mut bytes_subset = decoded.concat();
if let Some(endian) = &self.endian {
if !endian.is_native() {
reverse_endianness(
&mut bytes_subset,
decoded_representation.data_type(),
parallel,
);
reverse_endianness(&mut bytes_subset, decoded_representation.data_type());
}
}
bytes_subset
Expand All @@ -82,14 +77,6 @@ impl ArrayPartialDecoderTraits for BytesPartialDecoder<'_> {
decoded_representation: &ArrayRepresentation,
decoded_regions: &[ArraySubset],
) -> Result<Vec<Vec<u8>>, CodecError> {
self.do_partial_decode(decoded_representation, decoded_regions, false)
}

fn par_partial_decode(
&self,
decoded_representation: &ArrayRepresentation,
decoded_regions: &[ArraySubset],
) -> Result<Vec<Vec<u8>>, CodecError> {
self.do_partial_decode(decoded_representation, decoded_regions, true)
self.do_partial_decode(decoded_representation, decoded_regions)
}
}

0 comments on commit c48e2d8

Please sign in to comment.