Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Zarr3 default and deprecations #1242

Open
wants to merge 75 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 72 commits
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
ee27d04
make Zarr3 default DataFormat
normanrz Jan 22, 2025
18b94d9
compress=True
normanrz Jan 22, 2025
52a1479
remove deprecations
normanrz Jan 24, 2025
8a6caff
wip resize
normanrz Jan 25, 2025
de60749
test fixes
normanrz Jan 27, 2025
a5899b7
down to 16
normanrz Jan 27, 2025
d6cd701
changelog
normanrz Jan 28, 2025
074e661
test fixes
normanrz Jan 28, 2025
a10fd54
fix /test_dataset_add_remote_mag_and_layer.py
normanrz Jan 29, 2025
15ce077
stuff
normanrz Jan 29, 2025
b0eaaf6
ci
normanrz Jan 29, 2025
2ca6b46
ci
normanrz Jan 29, 2025
7d5fbf4
Merge branch 'master' into zarr3-default
normanrz Jan 29, 2025
8fec17e
error on fork
normanrz Jan 29, 2025
e5f5a2f
ci
normanrz Jan 29, 2025
7d1e739
ci
normanrz Jan 29, 2025
112d0b4
less alignment checks
normanrz Jan 29, 2025
1d1b396
allow_unaligned
normanrz Jan 29, 2025
1486e2d
ci.yml aktualisieren
normanrz Jan 29, 2025
4a03f1f
ci testing
normanrz Jan 30, 2025
7d79eaa
Merge branch 'zarr3-default' of github.com:scalableminds/webknossos-l…
normanrz Jan 30, 2025
f0ae78a
merge
normanrz Jan 30, 2025
2831987
ci
normanrz Jan 30, 2025
4692499
sequential tests
normanrz Jan 30, 2025
87ecac5
ci
normanrz Jan 30, 2025
5bee5c2
ci
normanrz Jan 30, 2025
5f3f698
ci
normanrz Jan 31, 2025
9c5c9ba
ci
normanrz Jan 31, 2025
9c1eb1b
parameterize python for kubernetes dockerfile
normanrz Jan 31, 2025
6c3a40f
test
normanrz Jan 31, 2025
c82e2eb
Merge remote-tracking branch 'origin/master' into zarr3-default
normanrz Jan 31, 2025
c18274b
change defaults
normanrz Jan 31, 2025
9389dc4
mirrored test images
normanrz Feb 3, 2025
266bf2e
mp logging
normanrz Feb 3, 2025
7b874a8
mp debugging
normanrz Feb 3, 2025
7fbb764
debug
normanrz Feb 3, 2025
6ae372c
debug
normanrz Feb 3, 2025
97c4aa2
debug
normanrz Feb 3, 2025
a495df2
debugging
normanrz Feb 3, 2025
d9e37a0
pyproject.toml
normanrz Feb 3, 2025
edf1351
py3.12
normanrz Feb 3, 2025
d2374eb
debugging
normanrz Feb 3, 2025
1c23297
wip
normanrz Feb 3, 2025
b6d877f
merge
normanrz Feb 3, 2025
d42888a
all python versions
normanrz Feb 3, 2025
7c5f5a3
revert debug changes in cluster_tools
normanrz Feb 3, 2025
a4598a5
fixes
normanrz Feb 3, 2025
e8e7c2b
larger ci runner
normanrz Feb 3, 2025
7a3e159
default ci runner
normanrz Feb 3, 2025
6bc2185
rm pytest-timeout
normanrz Feb 5, 2025
c60e64e
merge
normanrz Feb 5, 2025
8d57971
test
normanrz Feb 5, 2025
93c23a7
Revert "rm pytest-timeout"
normanrz Feb 5, 2025
9d97a18
Revert "test"
normanrz Feb 5, 2025
a154570
ci
normanrz Feb 5, 2025
25e3cf3
ci
normanrz Feb 5, 2025
5e48469
ci
normanrz Feb 5, 2025
07f56a7
ci
normanrz Feb 5, 2025
591cbfc
ci
normanrz Feb 5, 2025
83d038f
ci
normanrz Feb 5, 2025
cc04294
properly implement SequentialExecutor
normanrz Feb 5, 2025
ae20b23
ci
normanrz Feb 5, 2025
2006f18
changelog
normanrz Feb 5, 2025
94b9e93
allow_unaligned wip
normanrz Feb 5, 2025
8c7f0f9
ci
normanrz Feb 6, 2025
9a28f2b
wip
normanrz Feb 6, 2025
4c6faa7
fix tests
normanrz Feb 6, 2025
eee265f
fix test
normanrz Feb 6, 2025
efe2c6e
examples
normanrz Feb 6, 2025
688e4c8
longer sleep in slurm test
normanrz Feb 6, 2025
7396ae1
format
normanrz Feb 6, 2025
9e41146
longer sleep in slurm test
normanrz Feb 6, 2025
686c5b4
Apply suggestions from code review
normanrz Feb 8, 2025
cf89fd9
add methods
normanrz Feb 8, 2025
a0aedd8
more robust patch
normanrz Feb 8, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ jobs:
strategy:
max-parallel: 4
matrix:
python-version: ["3.13", "3.12", "3.11", "3.10", "3.9"]
python-version: ["3.12", "3.13", "3.11", "3.10", "3.9"]
group: [1, 2, 3]
fail-fast: false
defaults:
Expand Down Expand Up @@ -193,11 +193,15 @@ jobs:
if: ${{ matrix.group == 1 && matrix.python-version == '3.11' }}
run: ./typecheck.sh

- name: Patch Python standard library to assert that fork is not allowed
run: |
sed -i '66i \ \ \ \ \ \ \ \ raise Exception("fork is not allowed.")' /home/runner/.local/share/uv/python/cpython-${{ matrix.python-version }}.*-linux-x86_64-gnu/lib/python${{ matrix.python-version }}/multiprocessing/popen_fork.py
normanrz marked this conversation as resolved.
Show resolved Hide resolved

- name: Python tests
timeout-minutes: 30
env:
WK_TOKEN: ${{ secrets.WK_TOKEN }}
run: ./test.sh -vv -p no:faulthandler --splits 3 --group ${{ matrix.group }} --splitting-algorithm least_duration
PYTHON_VERSION: ${{ matrix.python-version }}
run: ./test.sh --splits 3 --group ${{ matrix.group }} --splitting-algorithm least_duration

- name: Check if git is dirty
run: |
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import functools
import logging
import multiprocessing
import os
import sys
import threading
import traceback
Expand Down Expand Up @@ -33,7 +34,11 @@ def __init__(self, name: str, wrapped_handler: logging.Handler) -> None:
self.setFormatter(self.wrapped_handler.formatter)
self.filters = self.wrapped_handler.filters

self._manager = multiprocessing.Manager()
# Make sure to use a multiprocessing context with
# explicit start method to avoid unwanted forks
self._manager = multiprocessing.get_context(
os.environ.get("MULTIPROCESSING_DEFAULT_START_METHOD", "spawn")
).Manager()
self.queue = self._manager.Queue(-1)
self._is_closed = False
# Use thread to asynchronously receive messages from the queue
Expand Down Expand Up @@ -86,14 +91,15 @@ def decrement_usage(self) -> None:
root_logger.addHandler(self.wrapped_handler)

self._is_closed = True
self._queue_thread.join()
self._queue_thread.join(30)
self._manager.shutdown()
self.wrapped_handler.close()
super().close()

def close(self) -> None:
if not self._is_closed:
self._is_closed = True
self._queue_thread.join()
self._queue_thread.join(30)
self._manager.shutdown()
self.wrapped_handler.close()
super().close()
Expand Down
11 changes: 2 additions & 9 deletions cluster_tools/cluster_tools/executors/multiprocessing_.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,7 @@ def _execute_and_persist_function(
def map_to_futures(
self,
fn: Callable[[_S], _T],
args: Iterable[
_S
], # TODO change: allow more than one arg per call #noqa: FIX002 Line contains TODO
args: Iterable[_S],
output_pickle_path_getter: Optional[Callable[[_S], os.PathLike]] = None,
) -> List[Future[_T]]:
if output_pickle_path_getter is not None:
Expand Down Expand Up @@ -217,11 +215,6 @@ def forward_log(self, fut: Future[_T]) -> _T:
return fut.result()

def shutdown(self, wait: bool = True, *, cancel_futures: bool = False) -> None:
if cancel_futures:
# cancel_futures was added in Python 3.9, ignoring it as 3.8 is supported:
logging.warning(
"The provided cancel_futures argument is ignored by MultiprocessingExecutor."
)
super().shutdown(wait=wait)
super().shutdown(wait=wait, cancel_futures=cancel_futures)
if self._mp_logging_handler_pool is not None:
self._mp_logging_handler_pool.close()
79 changes: 60 additions & 19 deletions cluster_tools/cluster_tools/executors/sequential.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,30 @@
from concurrent.futures import Future
from multiprocessing.context import BaseContext
import warnings
from collections.abc import Iterable, Iterator
from concurrent.futures import Executor, Future, as_completed
from os import PathLike
from pathlib import Path
from typing import Any, Callable, Optional, Tuple, TypeVar, cast
from typing import Any, Callable, Optional, TypeVar, cast

from typing_extensions import ParamSpec

from cluster_tools._utils.warning import enrich_future_with_uncaught_warning
from cluster_tools.executors.multiprocessing_ import CFutDict, MultiprocessingExecutor

_T = TypeVar("_T")
_S = TypeVar("_S")
_P = ParamSpec("_P")


# Strictly speaking, this executor doesn't need to inherit from MultiprocessingExecutor
# but could inherit from futures.Executor instead. However, this would require to duplicate
# quite a few methods to adhere to the executor protocol (as_completed, map_to_futures, map, forward_log, shutdown).
class SequentialExecutor(MultiprocessingExecutor):
class SequentialExecutor(Executor):
"""
The same as MultiprocessingExecutor, but synchronous and uses only one core.
"""

def __init__(
self,
*,
start_method: Optional[str] = None,
mp_context: Optional[BaseContext] = None,
initializer: Optional[Callable] = None,
initargs: Tuple = (),
**__kwargs: Any,
) -> None:
super().__init__(
max_workers=1,
start_method=start_method,
mp_context=mp_context,
initializer=initializer,
initargs=initargs,
)
pass

def submit( # type: ignore[override]
self,
Expand All @@ -61,3 +50,55 @@ def submit( # type: ignore[override]
fut.set_result(result)
enrich_future_with_uncaught_warning(fut)
return fut

@classmethod
def as_completed(cls, futures: list[Future[_T]]) -> Iterator[Future[_T]]:
return as_completed(futures)

def map_to_futures(
self,
fn: Callable[[_S], _T],
args: Iterable[_S],
output_pickle_path_getter: Optional[Callable[[_S], PathLike]] = None,
) -> list[Future[_T]]:
if output_pickle_path_getter is not None:
futs = [
self.submit( # type: ignore[call-arg]
fn,
arg,
__cfut_options={
"output_pickle_path": output_pickle_path_getter(arg)
},
)
for arg in args
]
else:
futs = [self.submit(fn, arg) for arg in args]

return futs

def map( # type: ignore[override]
self,
fn: Callable[[_S], _T],
iterables: Iterable[_S],
timeout: Optional[float] = None,
chunksize: Optional[int] = None,
) -> Iterator[_T]:
if timeout is not None:
warnings.warn(
"timeout is not implemented for SequentialExecutor.map",
category=UserWarning,
)
if chunksize is not None:
warnings.warn(
"chunksize is not implemented for SequentialExecutor.map",
category=UserWarning,
)
for item in iterables:
yield fn(item)

def forward_log(self, fut: Future[_T]) -> _T:
return fut.result()

def shutdown(self, wait: bool = True, *, cancel_futures: bool = False) -> None:
pass
6 changes: 6 additions & 0 deletions cluster_tools/tests/test_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@

import pytest

import cluster_tools.executors
import cluster_tools.schedulers

if TYPE_CHECKING:
from distributed import LocalCluster

Expand Down Expand Up @@ -253,6 +256,9 @@ def test_unordered_sleep(exc: cluster_tools.Executor) -> None:

with exc:
durations = [5, 0]
# Slurm can be a bit slow to start up, so we need to increase the sleep time
if isinstance(exc, cluster_tools.SlurmExecutor):
durations = [20, 0]
philippotto marked this conversation as resolved.
Show resolved Hide resolved
futures = [exc.submit(sleep, n) for n in durations]
# For synchronous executors, the futures should be completed after submit returns.
# .as_completed() would return them in reverse order in that case.
Expand Down
93 changes: 93 additions & 0 deletions webknossos/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,99 @@ For upgrade instructions, please check the respective _Breaking Changes_ section
[Commits](https://github.com/scalableminds/webknossos-libs/compare/v0.16.8...HEAD)

### Breaking Changes
- Changed writing behavior. There is a new argument `allow_resize` for `MagView.write`, which defaults to `False`. If set to `True`, the bounding box of underlying `Layer` will be resized to fit the to-be-written data. That largely mirrors the previous behavior. However, it is not safe for concurrent operations, so it is disabled by default. It is recommended to set the `Layer.bounding_box` to the desired size before writing. Additionally, by default, writes need to be aligned with the underlying shard grid to guard against concurrency issues and avoid performance footguns. There is a new argument `allow_unaligned`, which defaults to `False`. If set to `True`, the check for shard alignment is skipped.
normanrz marked this conversation as resolved.
Show resolved Hide resolved
- Removed deprecated functions, properties and arguments:
normanrz marked this conversation as resolved.
Show resolved Hide resolved
- Functions:
- `open_annotation`, use `Annotation.load()` instead
- `Dataset.get_color_layer`, use `Dataset.get_color_layers()` instead
- `Dataset.get_segmentation_layer`, use `Dataset.get_segmentation_layers()` instead
- `Dataset.create`, use `Dataset.__init__` instead
- `Dataset.get_or_create`, use `Dataset.__init__` with `exist_ok=True` instead
- `Layer.get_best_mag`, use `Layer.get_finest_mag` instead
- `View.read_bbox`, use `read` with `relative_bounding_box` or `absolute_bounding_box` instead
- `View.__enter__` and `View.__exit__`, context managers are not needed anymore
- `open_nml`, use `Skeleton.load()` instead
- `Group.add_graph`, use `Group.add_tree` instead
- `Group.get_max_graph_id`, use `Group.get_max_tree_id` instead
- `Group.flattened_graphs`, use `Group.flattened_trees` instead
- `Group.get_graph_by_id`, use `Group.get_tree_by_id` instead
- `Skeleton.from_path`, use `Skeleton.load()` instead
- `Skeleton.write`, use `Skeleton.save()` instead
- Properties:
- `Annotation.username`, use `Annotation.owner_name` instead
- `Annotation.scale`, use `Annotation.voxel_size` instead
- `Annotation.user_id`, use `Annotation.owner_id` instead
- `ArrayInfo.shard_size`, use `ArrayInfo.shard_shape` instead
- `Dataset.scale`, use `Dataset.voxel_size` instead
- `MagView.global_offset`, always `(0, 0, 0, ...)`
- `MagView.size`, use `mag_view.bounding_box.in_mag(mag_view.mag).bottomright`
- `MagViewProperties.resolution`, use `MagViewProperties.mag` instead
- `LayerProperties.resolutions`, use `LayerProperties.mags` instead
- `View.header`, use `View.info` instead
- `View.global_offset`, use `view.bounding_box.in_mag(view.mag).topleft` instead
- `View.size`, use `view.bounding_box.in_mag(view.mag).size` instead
- `Group.graphs`, use `Group.trees`
- `Skeleton.scale`, use `Skeleton.voxel_size` instead
- Arguments:
- `annotation_type` in `Annotation.download`, not needed anymore
- `annotation_type` in `Annotation.open_as_remote_dataset`, not needed anymore
- `size` in `BufferedSliceReader.__init__`, use `relative_bounding_box` or `absolute_bounding_box` instead
- `offset` in `BufferedSliceReader.__init__`, use `relative_bounding_box` or `absolute_bounding_box` instead
- `offset` in `BufferedSliceWriter.__init__`, use `relative_bounding_box` or `absolute_bounding_box` instead
- `json_update_allowed` in `BufferedSliceWriter.__init__`, not supported anymore
- `offset` in `BufferedSliceWriter.reset_offset`, use `relative_offset` or `absolute_offset` instead
- `scale` in `Dataset.__init__`, use `voxel_size` or `voxel_size_with_unit` instead
- `dtype` in `Dataset.add_layer`, use `dtype_per_channel` instead
- `dtype` in `Dataset.get_or_add_layer`, use `dtype_per_channel` instead
- `chunk_size` in `Dataset.add_layer_from_images`, use `chunk_shape` instead
- `chunk_size` in `Dataset.copy_dataset`, use `chunk_shape` instead
- `block_len` in `Dataset.copy_dataset`, use `chunk_shape` instead
- `file_len` in `Dataset.copy_dataset`, use `chunks_per_shard` instead
- `args` in `Dataset.copy_dataset`, use `executor` instead
- `chunk_size` in `Layer.add_mag`, use `chunk_shape` instead
- `block_len` in `Layer.add_mag`, use `chunk_shape` instead
- `file_len` in `Layer.add_mag`, use `chunks_per_shard` instead
- `chunk_size` in `Layer.get_or_add_mag`, use `chunk_shape` instead
- `block_len` in `Layer.get_or_add_mag`, use `chunk_shape` instead
- `file_len` in `Layer.get_or_add_mag`, use `chunks_per_shard` instead
- `args` in `Layer.downsample`, use `executor` instead
- `args` in `Layer.downsample_mag`, use `executor` instead
- `args` in `Layer.redownsample`, use `executor` instead
- `args` in `Layer.downsample_mag_list`, use `executor` instead
- `args` in `Layer.downsample_mag_list`, use `executor` instead
- `buffer_edge_len` in `Layer.upsample`, use `buffer_shape` instead
- `args` in `Layer.upsample`, use `executor` instead
- `min_mag` in `Layer.upsample`, use `finest_mag` instead
- `offset` in `MagView.write`, use `relative_offset`, `absolute_offset`, `relative_bounding_box`, or `absolute_bounding_box` instead
- `args` in `MagView.compress`, use `executor` instead
- `offset` in `View.write`, use `relative_offset`, `absolute_offset`, `relative_bounding_box`, or `absolute_bounding_box` instead
- `json_update_allowed` in `View.write`, not supported anymore
- `offset` in `View.read`, use `relative_offset`, `absolute_offset`, `relative_bounding_box`, or `absolute_bounding_box` instead
- `offset` in `View.get_view`, use `relative_offset`, `absolute_offset`, `relative_bounding_box`, or `absolute_bounding_box` instead
- `offset` in `View.get_buffered_slice_writer`, use `relative_offset`, `absolute_offset`, `relative_bounding_box`, or `absolute_bounding_box` instead
- `offset` in `View.get_buffered_slice_reader`, use `relative_bounding_box`, or `absolute_bounding_box` instead
- `size` in `View.get_buffered_slice_reader`, use `relative_bounding_box`, or `absolute_bounding_box` instead
- `chunk_size` in `View.for_each_chunk`, use `chunk_shape` instead
- `source_chunk_size` in `View.for_zipped_chunks`, use `source_chunk_shape` instead
- `target_chunk_size` in `View.for_zipped_chunks`, use `target_chunk_shape` instead
- `args` in `View.content_is_equal`, use `executor` instead
- Classes:
- `Graph`, use `Tree` instead
- Changed defaults:
- `exist_ok` in `Dataset.__init__` is now `False`
- `compress` in `Dataset.from_images` is now `True`
- `compress` in `Dataset.add_layer_from_images` is now `True`
- `DEFAULT_DATA_FORMAT` is now `Zarr3`
- `compress` in `Layer.add_mag` is now `True`
- `compress` in `Layer.upsample` is now `True`
- `buffer_size` in `View.get_buffered_slice_reader` is now computed from the shard shape
- `buffer_size` in `View.get_buffered_slice_writer` is now computed from the shard shape
- Moved from positional argument to keyword-only argument:
- `json_update_allowed` in `MagView.write`
- Added arguments:
- `allow_resize` in `MagView.write` with default `False`
- `allow_unaligned` in `MagView.write` with default `False`


### Added

Expand Down
2 changes: 1 addition & 1 deletion webknossos/examples/apply_merger_mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def main() -> None:
dtype_per_layer=in_layer.dtype_per_layer,
largest_segment_id=in_layer.largest_segment_id,
)
out_mag1 = out_layer.add_mag("1", compress=True)
out_mag1 = out_layer.add_mag("1")
out_layer.bounding_box = in_layer.bounding_box

###################
Expand Down
7 changes: 6 additions & 1 deletion webknossos/examples/convert_4d_tiff.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,12 @@ def main() -> None:
# data.shape -> (1, 2, 5, 100, 400) # first value is the channel dimension

# Write some data to a given position
mag_view.write(data, absolute_bounding_box=read_bbox.offset((2, 0, 0, 0)))
mag_view.write(
data,
absolute_bounding_box=read_bbox.offset((2, 0, 0, 0)),
allow_resize=True,
allow_unaligned=True,
)


if __name__ == "__main__":
Expand Down
8 changes: 7 additions & 1 deletion webknossos/examples/dataset_usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ def main() -> None:

dataset = wk.Dataset("testoutput/my_new_dataset", voxel_size=(1, 1, 1))
layer = dataset.add_layer(
layer_name="color", category="color", dtype_per_channel="uint8", num_channels=3
layer_name="color",
category="color",
dtype_per_channel="uint8",
num_channels=3,
bounding_box=wk.BoundingBox((10, 20, 30), (512, 512, 32)),
)
mag1 = layer.add_mag("1")
mag2 = layer.add_mag("2")
Expand All @@ -37,11 +41,13 @@ def main() -> None:
absolute_offset=(10, 20, 30),
# assuming the layer has 3 channels:
data=(np.random.rand(3, 512, 512, 32) * 255).astype(np.uint8),
allow_unaligned=True,
)

mag2.write(
absolute_offset=(10, 20, 30),
data=(np.random.rand(3, 256, 256, 16) * 255).astype(np.uint8),
allow_unaligned=True,
)

##########################
Expand Down
16 changes: 2 additions & 14 deletions webknossos/examples/upload_image_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,13 @@ def main() -> None:

# The example microscopy data has two channels
# Channel 0 contains cell membranes, channel 1 contains nuclei.
layer_membranes = ds.add_layer(
"cell_membranes",
COLOR_CATEGORY,
dtype_per_layer=img.dtype,
)

layer_membranes.add_mag(1, compress=True).write(img[0, :])
layer_membranes = ds.write_layer("cell_membranes", COLOR_CATEGORY, data=img[0, :])

layer_membranes.default_view_configuration = LayerViewConfiguration(
color=(17, 212, 17), intensity_range=(0, 16000)
)

layer_nuclei = ds.add_layer(
"nuclei",
COLOR_CATEGORY,
dtype_per_layer=img.dtype,
)

layer_nuclei.add_mag(1, compress=True).write(img[1, :])
layer_nuclei = ds.write_layer("nuclei", COLOR_CATEGORY, data=img[1, :])

layer_nuclei.default_view_configuration = LayerViewConfiguration(
color=(212, 17, 17), intensity_range=(3000, 30000)
Expand Down
Loading