Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error reading sharded array past the first inner chunk #2018

Open
darsnack opened this issue Jul 8, 2024 · 3 comments · May be fixed by #2022
Open

Error reading sharded array past the first inner chunk #2018

darsnack opened this issue Jul 8, 2024 · 3 comments · May be fixed by #2022
Labels
bug Potential issues with the zarr-python library

Comments

@darsnack
Copy link

darsnack commented Jul 8, 2024

Zarr version

v3.0.0a0

Numcodecs version

v0.12.1

Python Version

3.12

Operating System

Linux

Installation

Using Poetry

Description

I have an array stored using Zarr v3 in a sharded format where the inner chunk size is 1. Reading past the first chunk results in an error show below in the MWE. If the chunk size is > 1 (e.g. k), then the no errors occur for indices 0 through k - 1, but the same error occurs when accessing index k onwards.

Steps to reproduce

First, create a sharded store:

In [1]: import zarr

In [2]: import numpy as np

In [3]: store = zarr.store.LocalStore("./test.zarr", mode="w")

In [4]: from zarr.codecs import ShardingCodec

In [5]: arr = zarr.create(store=store,
   ...:                   shape=(1000, 1000),
   ...:                   chunk_shape=(20, 1000),
   ...:                   zarr_format=3,
   ...:                   codecs=[ShardingCodec(chunk_shape=(1, 1000))])

In [6]: arr[:] = np.ones((1000, 1000))

Now, attempt to open the store and read a single chunk at a time:

In [1]: import zarr

In [2]: store = zarr.store.LocalStore("./test.zarr", mode="r")

In [3]: arr = zarr.open_array(store=store, zarr_format=3)

In [4]: arr[0].shape
Out[4]: (1000,)

In [5]: arr[1].shape
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
Cell In[5], line 1
----> 1 arr[1].shape

File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/array.py:711, in Array.__getitem__(self, selection)
    709     return self.get_orthogonal_selection(pure_selection, fields=fields)
    710 else:
--> 711     return self.get_basic_selection(cast(BasicSelection, pure_selection), fields=fields)

File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/array.py:733, in Array.get_basic_selection(self, selection, out, prototype, fields)
    731     raise NotImplementedError
    732 else:
--> 733     return sync(
    734         self._async_array._get_selection(
    735             BasicIndexer(selection, self.shape, self.metadata.chunk_grid),
    736             out=out,
    737             fields=fields,
    738             prototype=prototype,
    739         )
    740     )

File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/sync.py:92, in sync(coro, loop, timeout)
     89 return_result = next(iter(finished)).result()
     91 if isinstance(return_result, BaseException):
---> 92     raise return_result
     93 else:
     94     return return_result

File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/sync.py:51, in _runner(coro)
     46 """
     47 Await a coroutine and return the result of running it. If awaiting the coroutine raises an
     48 exception, the exception will be returned.
     49 """
     50 try:
---> 51     return await coro
     52 except Exception as ex:
     53     return ex

File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/array.py:447, in AsyncArray._get_selection(self, indexer, prototype, out, fields)
    439     out_buffer = prototype.nd_buffer.create(
    440         shape=indexer.shape,
    441         dtype=out_dtype,
    442         order=self.order,
    443         fill_value=self.metadata.fill_value,
    444     )
    445 if product(indexer.shape) > 0:
    446     # reading chunks and decoding them
--> 447     await self.metadata.codec_pipeline.read(
    448         [
    449             (
    450                 self.store_path / self.metadata.encode_chunk_key(chunk_coords),
    451                 self.metadata.get_chunk_spec(chunk_coords, self.order, prototype=prototype),
    452                 chunk_selection,
    453                 out_selection,
    454             )
    455             for chunk_coords, chunk_selection, out_selection in indexer
    456         ],
    457         out_buffer,
    458         drop_axes=indexer.drop_axes,
    459     )
    460 return out_buffer.as_ndarray_like()

File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/codecs/pipeline.py:489, in BatchedCodecPipeline.read(self, batch_info, out, drop_axes)
    483 async def read(
    484     self,
    485     batch_info: Iterable[tuple[ByteGetter, ArraySpec, SelectorTuple, SelectorTuple]],
    486     out: NDBuffer,
    487     drop_axes: tuple[int, ...] = (),
    488 ) -> None:
--> 489     await concurrent_map(
    490         [
    491             (single_batch_info, out, drop_axes)
    492             for single_batch_info in batched(batch_info, self.batch_size)
    493         ],
    494         self.read_batch,
    495         config.get("async.concurrency"),
    496     )

File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/common.py:53, in concurrent_map(items, func, limit)
     49 async def concurrent_map(
     50     items: list[T], func: Callable[..., Awaitable[V]], limit: int | None = None
     51 ) -> list[V]:
     52     if limit is None:
---> 53         return await asyncio.gather(*[func(*item) for item in items])
     55     else:
     56         sem = asyncio.Semaphore(limit)

File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/codecs/pipeline.py:298, in BatchedCodecPipeline.read_batch(self, batch_info, out, drop_axes)
    291 async def read_batch(
    292     self,
    293     batch_info: Iterable[tuple[ByteGetter, ArraySpec, SelectorTuple, SelectorTuple]],
    294     out: NDBuffer,
    295     drop_axes: tuple[int, ...] = (),
    296 ) -> None:
    297     if self.supports_partial_decode:
--> 298         chunk_array_batch = await self.decode_partial_batch(
    299             [
    300                 (byte_getter, chunk_selection, chunk_spec)
    301                 for byte_getter, chunk_spec, chunk_selection, _ in batch_info
    302             ]
    303         )
    304         for chunk_array, (_, chunk_spec, _, out_selection) in zip(
    305             chunk_array_batch, batch_info, strict=False
    306         ):
    307             if chunk_array is not None:

File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/codecs/pipeline.py:254, in BatchedCodecPipeline.decode_partial_batch(self, batch_info)
    252 assert self.supports_partial_decode
    253 assert isinstance(self.array_bytes_codec, ArrayBytesCodecPartialDecodeMixin)
--> 254 return await self.array_bytes_codec.decode_partial(batch_info)

File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/abc/codec.py:182, in ArrayBytesCodecPartialDecodeMixin.decode_partial(self, batch_info)
    162 async def decode_partial(
    163     self,
    164     batch_info: Iterable[tuple[ByteGetter, SelectorTuple, ArraySpec]],
    165 ) -> Iterable[NDBuffer | None]:
    166     """Partially decodes a batch of chunks.
    167     This method determines parts of a chunk from the slice selection,
    168     fetches these parts from the store (via ByteGetter) and decodes them.
   (...)
    180     Iterable[NDBuffer | None]
    181     """
--> 182     return await concurrent_map(
    183         [
    184             (byte_getter, selection, chunk_spec)
    185             for byte_getter, selection, chunk_spec in batch_info
    186         ],
    187         self._decode_partial_single,
    188         config.get("async.concurrency"),
    189     )

File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/common.py:53, in concurrent_map(items, func, limit)
     49 async def concurrent_map(
     50     items: list[T], func: Callable[..., Awaitable[V]], limit: int | None = None
     51 ) -> list[V]:
     52     if limit is None:
---> 53         return await asyncio.gather(*[func(*item) for item in items])
     55     else:
     56         sem = asyncio.Semaphore(limit)

File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/codecs/sharding.py:477, in ShardingCodec._decode_partial_single(self, byte_getter, selection, shard_spec)
    474                 shard_dict[chunk_coords] = chunk_bytes
    476 # decoding chunks and writing them into the output buffer
--> 477 await self.codecs.read(
    478     [
    479         (
    480             _ShardingByteGetter(shard_dict, chunk_coords),
    481             chunk_spec,
    482             chunk_selection,
    483             out_selection,
    484         )
    485         for chunk_coords, chunk_selection, out_selection in indexer
    486     ],
    487     out,
    488 )
    489 return out

File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/codecs/pipeline.py:489, in BatchedCodecPipeline.read(self, batch_info, out, drop_axes)
    483 async def read(
    484     self,
    485     batch_info: Iterable[tuple[ByteGetter, ArraySpec, SelectorTuple, SelectorTuple]],
    486     out: NDBuffer,
    487     drop_axes: tuple[int, ...] = (),
    488 ) -> None:
--> 489     await concurrent_map(
    490         [
    491             (single_batch_info, out, drop_axes)
    492             for single_batch_info in batched(batch_info, self.batch_size)
    493         ],
    494         self.read_batch,
    495         config.get("async.concurrency"),
    496     )

File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/common.py:53, in concurrent_map(items, func, limit)
     49 async def concurrent_map(
     50     items: list[T], func: Callable[..., Awaitable[V]], limit: int | None = None
     51 ) -> list[V]:
     52     if limit is None:
---> 53         return await asyncio.gather(*[func(*item) for item in items])
     55     else:
     56         sem = asyncio.Semaphore(limit)

File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/codecs/pipeline.py:320, in BatchedCodecPipeline.read_batch(self, batch_info, out, drop_axes)
    311 else:
    312     chunk_bytes_batch = await concurrent_map(
    313         [
    314             (byte_getter, array_spec.prototype)
   (...)
    318         config.get("async.concurrency"),
    319     )
--> 320     chunk_array_batch = await self.decode_batch(
    321         [
    322             (chunk_bytes, chunk_spec)
    323             for chunk_bytes, (_, chunk_spec, _, _) in zip(
    324                 chunk_bytes_batch, batch_info, strict=False
    325             )
    326         ],
    327     )
    328     for chunk_array, (_, chunk_spec, chunk_selection, out_selection) in zip(
    329         chunk_array_batch, batch_info, strict=False
    330     ):
    331         if chunk_array is not None:

File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/codecs/pipeline.py:237, in BatchedCodecPipeline.decode_batch(self, chunk_bytes_and_specs)
    232     chunk_bytes_batch = await bb_codec.decode(
    233         zip(chunk_bytes_batch, chunk_spec_batch, strict=False)
    234     )
    236 ab_codec, chunk_spec_batch = ab_codec_with_spec
--> 237 chunk_array_batch = await ab_codec.decode(
    238     zip(chunk_bytes_batch, chunk_spec_batch, strict=False)
    239 )
    241 for aa_codec, chunk_spec_batch in aa_codecs_with_spec[::-1]:
    242     chunk_array_batch = await aa_codec.decode(
    243         zip(chunk_array_batch, chunk_spec_batch, strict=False)
    244     )

File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/abc/codec.py:107, in _Codec.decode(self, chunks_and_specs)
     91 async def decode(
     92     self,
     93     chunks_and_specs: Iterable[tuple[CodecOutput | None, ArraySpec]],
     94 ) -> Iterable[CodecInput | None]:
     95     """Decodes a batch of chunks.
     96     Chunks can be None in which case they are ignored by the codec.
     97
   (...)
    105     Iterable[CodecInput | None]
    106     """
--> 107     return await batching_helper(self._decode_single, chunks_and_specs)

File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/abc/codec.py:392, in batching_helper(func, batch_info)
    388 async def batching_helper(
    389     func: Callable[[CodecInput, ArraySpec], Awaitable[CodecOutput | None]],
    390     batch_info: Iterable[tuple[CodecInput | None, ArraySpec]],
    391 ) -> list[CodecOutput | None]:
--> 392     return await concurrent_map(
    393         [(chunk_array, chunk_spec) for chunk_array, chunk_spec in batch_info],
    394         noop_for_none(func),
    395         config.get("async.concurrency"),
    396     )

File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/common.py:53, in concurrent_map(items, func, limit)
     49 async def concurrent_map(
     50     items: list[T], func: Callable[..., Awaitable[V]], limit: int | None = None
     51 ) -> list[V]:
     52     if limit is None:
---> 53         return await asyncio.gather(*[func(*item) for item in items])
     55     else:
     56         sem = asyncio.Semaphore(limit)

File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/abc/codec.py:405, in noop_for_none.<locals>.wrap(chunk, chunk_spec)
    403 if chunk is None:
    404     return None
--> 405 return await func(chunk, chunk_spec)

File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/codecs/bytes.py:89, in BytesCodec._decode_single(self, chunk_bytes, chunk_spec)
     87 # ensure correct chunk shape
     88 if chunk_array.shape != chunk_spec.shape:
---> 89     chunk_array = chunk_array.reshape(
     90         chunk_spec.shape,
     91     )
     92 return chunk_array

File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/buffer.py:375, in NDBuffer.reshape(self, newshape)
    374 def reshape(self, newshape: ChunkCoords | Literal[-1]) -> Self:
--> 375     return self.__class__(self._data.reshape(newshape))

ValueError: cannot reshape array of size 2000 into shape (1,1000)

If we try to access arr[2] then the error will try to reshape an array of size 3000. It seems that doing arr[i] reads chunks from 0 through i (inclusive) instead of a single chunk.

Additional output

No response

@darsnack darsnack added the bug Potential issues with the zarr-python library label Jul 8, 2024
@darsnack darsnack changed the title Error reading sharded array with inner chunk size of 1 Error reading sharded array past the first inner chunk Jul 8, 2024
@darsnack
Copy link
Author

darsnack commented Jul 9, 2024

This error persists on the v3 branch as well

@d-v-b
Copy link
Contributor

d-v-b commented Jul 9, 2024

thanks for the bug report @darsnack, I'm planning on making some improvements to the sharding test suite this week so I will hopefully get some time to replicate (and maybe fix) this bug

@darsnack
Copy link
Author

After some debugging, I think I've narrowed down the cause of the issue. Here's a summary:

  1. When building up the shard dict mapping, we call shard_index.get_chunk_slice here which returns the start and end index (in bytes) of the inner chunk.
  2. A few lines down, we actually read the bytes from storage.
  3. Eventually, this will hit _get since we are dealing with a local store on disk for each shard.
  4. This logic interprets the byte_range (i.e. the chunk slice from Step 1) as a start index and total length to read. This is the error, since we specified a start and end index instead.

From this, we get the behavior described in the bug report.

  • Reading the first chunk works fine, since the start index is 0 and the end index is the size of a chunk
  • Reading the second chunk fails, since the end index is double the chunk size
  • Reading the third chunk also fails, since the end index is triple the chunk size
  • The effect of doubling, tripling, etc. is reflected in the final error since we try to reshape a larger and larger array into the requested final size

Currently, I could try modifying get_chunk_slice from Step 1 or do the even more minimal change of computing the total length from the output of get_chunk_slice. I am not sure what the downstream effects of the former will be, since I am not familiar with this codebase. I'm happy to put up a PR with the bug fix, but I'll need some guidance on what fix the maintainers prefer.

@darsnack darsnack linked a pull request Jul 10, 2024 that will close this issue
6 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Potential issues with the zarr-python library
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants