Skip to content

Commit

Permalink
sharded_array_write_read example changes
Browse files Browse the repository at this point in the history
  • Loading branch information
LDeakin committed Oct 17, 2023
1 parent 82bfdb9 commit 69d3432
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- **Breaking**: `ArrayToArrayCodecTraits::compute_encoded_size` and `ArrayToBytesCodecTraits::compute_encoded_size` can now return a `CodecError`
- `ArrayBuilder::build()` and `GroupBuilder::build` now accept unsized storage
- **Breaking**: `StoragePartialDecoder` now takes a `ReadableStorage` input
- `sharded_array_write_read` example now prints storage operations and demonstrates retrieving inner chunks directly from a partial decoder

### Fixed
- Bytes codec handling of complex and raw bits data types
Expand Down
70 changes: 59 additions & 11 deletions examples/sharded_array_write_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ fn sharded_array_write_read() -> Result<(), Box<dyn std::error::Error>> {
},
array_subset::ArraySubset,
node::Node,
storage::store,
storage::{
storage_transformer::{StorageTransformerExtension, UsageLogStorageTransformer},
store,
},
};

use rayon::prelude::{IntoParallelIterator, ParallelIterator};
Expand All @@ -18,6 +21,16 @@ fn sharded_array_write_read() -> Result<(), Box<dyn std::error::Error>> {
// let store = Arc::new(store::FilesystemStore::new(path.path())?);
// let store = Arc::new(store::FilesystemStore::new("tests/data/sharded_array_write_read.zarr")?);
let store = Arc::new(store::MemoryStore::default());
let log_writer = Arc::new(std::sync::Mutex::new(
// std::io::BufWriter::new(
std::io::stdout(),
// )
));
let usage_log = UsageLogStorageTransformer::new(log_writer, || {
chrono::Utc::now().format("[%T%.3f] ").to_string()
});
let store_readable_listable = usage_log.create_readable_listable_transformer(store.clone());
let store = usage_log.create_readable_writable_transformer(store.clone());

// Create a group and write metadata to filesystem
let group_path = "/group";
Expand All @@ -33,16 +46,17 @@ fn sharded_array_write_read() -> Result<(), Box<dyn std::error::Error>> {

// Create an array
let array_path = "/group/array";
let mut sharding_codec_builder = ShardingCodecBuilder::new(vec![4, 4]);
let shard_shape = vec![4, 8];
let inner_chunk_shape = vec![4, 4];
let mut sharding_codec_builder = ShardingCodecBuilder::new(inner_chunk_shape.clone());
sharding_codec_builder.bytes_to_bytes_codecs(vec![
#[cfg(feature = "gzip")]
Box::new(codec::GzipCodec::new(5)?),
]);

let array = zarrs::array::ArrayBuilder::new(
vec![8, 8], // array shape
DataType::UInt16,
vec![4, 8].into(), // shard shape,
shard_shape.into(),
FillValue::from(0u16),
)
.array_to_bytes_codec(Box::new(sharding_codec_builder.build()))
Expand All @@ -53,6 +67,12 @@ fn sharded_array_write_read() -> Result<(), Box<dyn std::error::Error>> {
// Write array metadata to store
array.store_metadata()?;

// The array metadata is
println!(
"The array metadata is:\n{}\n",
serde_json::to_string_pretty(&array.metadata()).unwrap()
);

// Write some shards (in parallel)
(0..2)
.into_par_iter()
Expand All @@ -72,12 +92,6 @@ fn sharded_array_write_read() -> Result<(), Box<dyn std::error::Error>> {
})
.collect::<Vec<_>>();

// The array metadata is
println!(
"The array metadata is:\n{}\n",
serde_json::to_string_pretty(&array.metadata()).unwrap()
);

// Read the whole array
let subset_all = ArraySubset::new_with_start_shape(vec![0, 0], array.shape().to_vec())?; // the center 4x2 region
let data_all = array.retrieve_array_subset_ndarray::<u16>(&subset_all)?;
Expand All @@ -98,8 +112,42 @@ fn sharded_array_write_read() -> Result<(), Box<dyn std::error::Error>> {
let data_4x2 = array.retrieve_array_subset_ndarray::<u16>(&subset_4x2)?;
println!("The middle 4x2 subset is:\n{:?}\n", data_4x2);

// Decode inner chunks
// In some cases, it might be preferable to decode inner chunks in a shard directly.
// If using the partial decoder, then the shard index will only be read once from the store.
let partial_decoder = array.partial_decoder(&[0, 0]);
let chunk_representation = array.chunk_array_representation(&[0, 0], array.shape())?;
let inner_chunks_to_decode = vec![
ArraySubset::new_with_start_shape(vec![0, 0], inner_chunk_shape.clone())?,
ArraySubset::new_with_start_shape(vec![0, 4], inner_chunk_shape.clone())?,
];
let decoded_inner_chunks =
partial_decoder.par_partial_decode(&chunk_representation, &inner_chunks_to_decode)?;
let decoded_inner_chunks = decoded_inner_chunks
.into_iter()
.map(|bytes| {
let elements = safe_transmute::transmute_many_permissive::<u16>(&bytes)
.unwrap()
.to_vec();
ndarray::ArrayD::<u16>::from_shape_vec(
inner_chunk_shape
.iter()
.map(|u| *u as usize)
.collect::<Vec<_>>(),
elements,
)
.unwrap()
})
.collect::<Vec<_>>();
println!("Decoded inner chunks:");
for (inner_chunk_subset, decoded_inner_chunk) in
std::iter::zip(inner_chunks_to_decode, decoded_inner_chunks)
{
println!("{inner_chunk_subset:?}\n{decoded_inner_chunk:?}\n");
}

// Show the hierarchy
let node = Node::new_with_store(&*store, "/").unwrap();
let node = Node::new_with_store(&*store_readable_listable, "/").unwrap();
let tree = node.hierarchy_tree();
println!("The zarr hierarchy tree is:\n{}", tree);

Expand Down

0 comments on commit 69d3432

Please sign in to comment.