diff --git a/dascore/clients/dirspool.py b/dascore/clients/dirspool.py index 5290be6d..1e5c6cdc 100644 --- a/dascore/clients/dirspool.py +++ b/dascore/clients/dirspool.py @@ -13,9 +13,10 @@ from typing_extensions import Self import dascore as dc -from dascore.core.spool import DataFrameSpool +from dascore.core.spool import DataFrameSpool, BaseSpool from dascore.io.indexer import AbstractIndexer, DirectoryIndexer from dascore.utils.pd import adjust_segments +from dascore.utils.docs import compose_docstring class DirectorySpool(DataFrameSpool): @@ -91,24 +92,16 @@ def spool_path(self): """Return the path in which the spool contents are found.""" return self.indexer.path + @compose_docstring(doc=BaseSpool.get_contents.__doc__) def get_contents(self) -> pd.DataFrame: """ - Return a dataframe of the contents of the data files. - - Parameters - ---------- - time - If not None, a tuple of start/end time where either can be None - indicating an open interval. + {doc} """ return self._df + @compose_docstring(doc=BaseSpool.update.__doc__) def update(self) -> Self: - """ - Updates the contents of the spool and returns a spool. - - Resets any previous selection. - """ + """{doc}""" out = self.__class__( base_path=self.indexer.update(), preferred_format=self._preferred_format, diff --git a/dascore/clients/filespool.py b/dascore/clients/filespool.py index 9f68ad55..599ae541 100644 --- a/dascore/clients/filespool.py +++ b/dascore/clients/filespool.py @@ -9,8 +9,9 @@ import dascore as dc from dascore.constants import SpoolType -from dascore.core.spool import DataFrameSpool +from dascore.core.spool import DataFrameSpool, BaseSpool from dascore.io.core import FiberIO +from dascore.utils.docs import compose_docstring class FileSpool(DataFrameSpool): @@ -65,11 +66,12 @@ def _load_patch(self, kwargs) -> Self: """Given a row from the managed dataframe, return a patch.""" return dc.read(**kwargs)[0] + @compose_docstring(doc=BaseSpool.update.__doc__) def update(self: SpoolType) -> Self: """ - Update the spool. + {doc} - If the file format supports indexing (e.g. DASDAE) this will + Note: If the file format supports indexing (e.g. DASDAE) this will trigger an indexing of the file. """ formater = FiberIO.manager.get_fiberio(self._file_format, self._file_version) diff --git a/dascore/constants.py b/dascore/constants.py index 82f57fba..66e28fc0 100644 --- a/dascore/constants.py +++ b/dascore/constants.py @@ -2,7 +2,7 @@ from __future__ import annotations from pathlib import Path -from typing import TypeVar +from typing import TypeVar, Protocol, runtime_checkable import numpy as np import pandas as pd @@ -13,6 +13,15 @@ SpoolType = TypeVar("SpoolType", bound="dascore.BaseSpool") + +@runtime_checkable +class ExecutorType(Protocol): + """Protocol for Executors that DASCore can use.""" + + def map(self, func, iterables, **kwargs): + """Map function for applying concurrency of some flavor.""" + + # Bump this to force re-downloading of all data file DATA_VERSION = "0.0.0" diff --git a/dascore/core/spool.py b/dascore/core/spool.py index 1762745b..a215deac 100644 --- a/dascore/core/spool.py +++ b/dascore/core/spool.py @@ -5,6 +5,9 @@ from collections.abc import Mapping, Sequence from functools import singledispatch from pathlib import Path +from collections.abc import Generator +from typing import TypeVar +from collections.abc import Callable import numpy as np import pandas as pd @@ -13,13 +16,13 @@ import dascore as dc import dascore.io -from dascore.constants import PatchType, numeric_types, timeable_types -from dascore.exceptions import InvalidSpoolError +from dascore.constants import PatchType, numeric_types, timeable_types, ExecutorType +from dascore.exceptions import InvalidSpoolError, ParameterError from dascore.utils.chunk import ChunkManager from dascore.utils.display import get_dascore_text, get_nice_text from dascore.utils.docs import compose_docstring from dascore.utils.mapping import FrozenDict -from dascore.utils.misc import CacheDescriptor +from dascore.utils.misc import CacheDescriptor, _spool_map from dascore.utils.patch import _force_patch_merge, patches_to_df from dascore.utils.pd import ( _convert_min_max_in_kwargs, @@ -30,6 +33,8 @@ split_df_query, ) +T = TypeVar("T") + class BaseSpool(abc.ABC): """Spool Abstract Base Class (ABC) for defining Spool interface.""" @@ -44,9 +49,46 @@ def __getitem__(self, item: int) -> PatchType: def __iter__(self) -> PatchType: """Iterate through the Patches in the spool.""" - def update(self) -> Self: - """Updates the contents of the spool and returns a spool.""" - return self + @abc.abstractmethod + def __len__(self) -> int: + """Return len of spool.""" + + def __rich__(self): + """Rich rep. of spool.""" + text = get_dascore_text() + Text(" ") + text += Text(self.__class__.__name__, style=self._rich_style) + text += Text(" 🧵 ") + patch_len = len(self) + text += Text(f"({patch_len:d}") + text += Text(" Patches)") if patch_len != 1 else Text(" Patch)") + return text + + def __str__(self): + return str(self.__rich__()) + + __repr__ = __str__ + + def __eq__(self, other) -> bool: + """Simple equality checks on spools.""" + + def _vals_equal(dict1, dict2): + if set(dict1) != set(dict2): + return False + for key in set(dict1): + val1, val2 = dict1[key], dict2[key] + if isinstance(val1, dict): + if not _vals_equal(val1, val2): + return False + elif hasattr(val1, "equals"): + if not val1.equals(val2): + return False + elif val1 != val2: + return False + return True + + my_dict = self.__dict__ + other_dict = getattr(other, "__dict__", {}) + return _vals_equal(my_dict, other_dict) @abc.abstractmethod def chunk( @@ -64,7 +106,7 @@ def chunk( ---------- overlap The amount of overlap between each segment, starting with the end of - first patch. Negative values can be used to induce gaps. + first patch. Negative values can be used to create gaps. keep_partial If True, keep the segments which are smaller than chunk size. This often occurs because of data gaps or at end of chunks. @@ -77,71 +119,173 @@ def chunk( kwargs kwargs are used to specify the dimension along which to chunk, eg: `time=10` chunks along the time axis in 10 second increments. + + Examples + -------- + >>> import dascore as dc + >>> from dascore.units import s + >>> + >>> spool = dc.get_example_spool("random_das") + >>> # get spools with time duration of 10 seconds + >>> time_chunked = spool.chunk(time=10, overlap=1) + >>> # merge along time axis + >>> time_merged = spool.chunk(time=...) """ @abc.abstractmethod def select(self, **kwargs) -> Self: - """Select only part of the data.""" + """ + Sub-select parts of the spool. + + Can be used to specify dimension ranges, or unix-style matches + on string attributes. + + Parameters + ---------- + **kwargs + Specifies query. Can be of the form {dim_name=(start, stop)} + or {attr_name=query}. + + Examples + -------- + >>> import dascore as dc + >>> spool = dc.get_example_spool("diverse_das") + >>> # subselect data in a particular time range + >>> time = ('2020-01-03', '2020-01-03T00:00:10') + >>> time_spool = spool.select(time=time) + >>> # subselect based on matching tag parameter + >>> tag_spool = spool.select(tag='some*') + """ @abc.abstractmethod def get_contents(self) -> pd.DataFrame: - """Get a dataframe of the patches that will be returned by the spool.""" + """ + Get a dataframe of the spool contents. + + Examples + -------- + >>> import dascore as dc + >>> spool = dc.get_example_spool("random_das") + >>> df = spool.get_contents() + """ + + # --- optional methods - @abc.abstractmethod def sort(self, attribute) -> Self: """ Sort the Spool based on a specific attribute. Parameters - --------------- - atribute + ---------- + attribute The attribute or coordinate used for sorting. If a coordinate name is used, the sorting will be based on the minimum value. - """ - raise NotImplementedError( - f"spool of type {self.__class__} has no sort implementation" - ) - - @abc.abstractmethod - def __len__(self) -> int: - """Return len of spool.""" - def __eq__(self, other) -> bool: - """Simple equality checks on spools.""" + Examples + -------- + >>> import dascore as dc + >>> spool = dc.get_example_spool() + >>> # sort spool based on values in time coordinate. + >>> spool_time_sorted = spool.sort("time") + >>> # sort spool based on values in tag + >>> spool_tag_sorted = spool.sort("tag") + """ + msg = f"spool of type {self.__class__} has no sort implementation" + raise NotImplementedError(msg) - def _vals_equal(dict1, dict2): - if set(dict1) != set(dict2): - return False - for key in set(dict1): - val1, val2 = dict1[key], dict2[key] - if isinstance(val1, dict): - if not _vals_equal(val1, val2): - return False - elif hasattr(val1, "equals"): - if not val1.equals(val2): - return False - elif val1 != val2: - return False - return True + def split( + self, + size: int | None = None, + count: int | None = None, + ) -> Generator[Self, None, None]: + """ + Yield sub-patches based on specified parameters. - my_dict = self.__dict__ - other_dict = getattr(other, "__dict__", {}) - return _vals_equal(my_dict, other_dict) + Parameters + ---------- + size + The number of patches desired in each output spool. The last + spool may have fewer patches. + count + The number of spools to include. If count is greater than + the length of the spool then the output will be smaller than + count, with one patch per spool. + + Examples + -------- + >>> import dascore as dc + >>> spool = dc.get_example_spool("diverse_das") + >>> # split spool into list of spools each with 3 patches. + >>> split = spool.split(size=3) + >>> # split spool into 3 evenly sized (if possible) spools + >>> split = spool.split(count=3) + """ + msg = f"spool of type {self.__class__} has no split implementation" + raise NotImplementedError(msg) - def __rich__(self): - """Rich rep. of spool.""" - text = get_dascore_text() + Text(" ") - text += Text(self.__class__.__name__, style=self._rich_style) - text += Text(" 🧵 ") - patch_len = len(self) - text += Text(f"({patch_len:d}") - text += Text(" Patches)") if patch_len != 1 else Text(" Patch)") - return text + def update(self) -> Self: + """ + Updates the contents of the spool and returns a spool. + """ + return self - def __str__(self): - return str(self.__rich__()) + def map( + self, + func: Callable[[dc.Patch, ...], T], + *, + client: ExecutorType | None = None, + size: int | None = None, + progress: bool = True, + **kwargs, + ) -> list[T]: + """ + Map a function of all the contents of the spool. - __repr__ = __str__ + Parameters + ---------- + func + A callable which takes a patch as its first argument. + client + A client, or executor, which has a `map` method. + size + The number of patches in each spool mapped to a client. + If not set, defaults to the number of processors on the host. + Does nothing unless client is defined. + progress + If True, display a progress bar. + **kwargs + kwargs passed to func. + + Notes + ----- + When a client is specified, the spool is split then passed to the + client's map method. This is to avoid serializing loaded patches. + See [`Spool.split`](`dascore.core.spool.BaseSpool.split`) for more + details about the `spool_count` and `spool_size` parameters. + + Examples + -------- + import numpy as np + import dascore as dc + + spool = dc.get_example_spool("random_das") + + # Calculate the std for each channel in 5 second chunks + results = ( + spool.chunk(time=5) + .map(lambda x: np.std(x.data, axis=0)) + ) + # stack back into array. dims are (distance, time chunk) + out = np.stack(results, axis=-1) + """ + return _spool_map( + self, + func, + client=client, + size=size, + progress=progress, + **kwargs, + ) class DataFrameSpool(BaseSpool): @@ -156,7 +300,7 @@ class DataFrameSpool(BaseSpool): # kwargs for filtering contents _select_kwargs: Mapping | None = FrozenDict() # attributes which effect merge groups for internal patches - _group_columns = ("network", "station", "dims", "data_type", "history", "tag") + _group_columns = ("network", "station", "dims", "data_type", "tag", "history") _drop_columns = ("patch",) def _get_df(self): @@ -306,8 +450,9 @@ def new_from_df(self, df, source_df=None, instruction_df=None, select_kwargs=Non new._select_kwargs.update(select_kwargs or {}) return new + @compose_docstring(doc=BaseSpool.select.__doc__) def select(self, **kwargs) -> Self: - """Sub-select certain dimensions for Spool.""" + """{doc}""" _, _, extra_kwargs = split_df_query(kwargs, self._df, ignore_bad_kwargs=True) filtered_df = adjust_segments(self._df, ignore_bad_kwargs=True, **kwargs) inst = adjust_segments( @@ -356,6 +501,22 @@ def sort(self, attribute) -> Self: # create new spool from new dataframes return self.new_from_df(df=sorted_df, instruction_df=new_instruction_df) + @compose_docstring(doc=BaseSpool.split.__doc__) + def split( + self, + size: int | None = None, + count: int | None = None, + ) -> Generator[Self, None, None]: + """{doc}""" + if not ((count is not None) ^ (size is not None)): + msg = "Spool.split requires either spool_count or spool_size." + raise ParameterError(msg) + start = 0 + step = int(np.ceil(len(self) / count if count else size)) + while start < len(self): + yield self[start : start + step] + start += step + @compose_docstring(doc=BaseSpool.get_contents.__doc__) def get_contents(self) -> pd.DataFrame: """{doc}.""" @@ -385,8 +546,6 @@ def __rich__(self): def _load_patch(self, kwargs) -> Self: """Load the patch into memory.""" - # final_kwargs = dict(kwargs) - # final_kwargs.update(self._select_kwargs) return kwargs["patch"] @@ -406,7 +565,7 @@ def spool(obj: Path | str | BaseSpool | Sequence[PatchType], **kwargs) -> BaseSp @spool.register(str) @spool.register(Path) -def spool_from_str(path, **kwargs): +def _spool_from_str(path, **kwargs): """Get a spool from a path.""" path = Path(path) # A directory was passed, create Directory Spool @@ -435,19 +594,19 @@ def spool_from_str(path, **kwargs): @spool.register(BaseSpool) -def spool_from_spool(spool, **kwargs): +def _spool_from_spool(spool, **kwargs): """Return a spool from a spool.""" return spool @spool.register(list) @spool.register(tuple) -def spool_from_patch_list(patch_list, **kwargs): +def _spool_from_patch_list(patch_list, **kwargs): """Return a spool from a sequence of patches.""" return MemorySpool(patch_list) @spool.register(dc.Patch) -def spool_from_patch(patch): +def _spool_from_patch(patch): """Get a spool from a single patch.""" return MemorySpool([patch]) diff --git a/dascore/proc/basic.py b/dascore/proc/basic.py index 513daeb6..938fa635 100644 --- a/dascore/proc/basic.py +++ b/dascore/proc/basic.py @@ -248,7 +248,7 @@ def angle(patch: PatchType) -> PatchType: return patch.new(data=np.angle(patch.data)) -@patch_function() +@patch_function(history=None) def transpose(self: PatchType, *dims: str) -> PatchType: """ Transpose the data array to any dimension order desired. diff --git a/dascore/utils/chunk.py b/dascore/utils/chunk.py index b179a1fb..80be0337 100644 --- a/dascore/utils/chunk.py +++ b/dascore/utils/chunk.py @@ -218,7 +218,7 @@ def _create_df(self, df, name, start_stop, gnum): out = pd.DataFrame(start_stop, columns=list(cols)) out[f"{name}_step"] = get_middle_value(df[f"{name}_step"].values) merger = df.drop(columns=out.columns) - for col in merger: + for col in set(merger.columns): vals = merger[col].unique() if len(vals) > 1: msg = ( diff --git a/dascore/utils/misc.py b/dascore/utils/misc.py index 4b7a526c..441fdd69 100644 --- a/dascore/utils/misc.py +++ b/dascore/utils/misc.py @@ -21,10 +21,11 @@ import dascore as dc from dascore.exceptions import MissingOptionalDependency +from dascore.utils.progress import track class _Sentinel: - """Dump little sentinel for key checks.""" + """Sentinel for key checks.""" def register_func(list_or_dict: list | dict, key=None): @@ -549,3 +550,48 @@ def wrapper(self, *args, **kwargs): return out return wrapper + + +class _MapFuncWrapper: + """A class for unwrapping spools to base applies.""" + + def __init__(self, func, kwargs, progress=True): + self._func = func + self._kwargs = kwargs + self._progress = progress + + def __call__(self, spool): + desc = f"Applying {self._func.__name__} to spool" + iterable = track(spool, desc) if self._progress else spool + return [self._func(x, **self._kwargs) for x in iterable] + + +def _spool_map(spool, func, size=None, client=None, progress=True, **kwargs): + """ + Map a func over a spool. + + Parameters + ---------- + spool + The spool object ot apply func to + size + The number of patches for each spool (ie chunksize) + client + An object with a map method for applying concurrency. + progress + If True, display a progress bar. + **kwargs + Keywords passed to func. + """ + # no client; simple for loop. + desc = f"Applying {func.__name__} to spool" + if client is None: + iterable = track(spool, desc) if progress else spool + return [func(patch, **kwargs) for patch in iterable] + # Now things get interesting. We need to split the spool here + # so that patches don't get serialized. + if size is None: + size = len(spool) / os.cpu_count() + spools = spool.split(size=size) + new_func = _MapFuncWrapper(func, kwargs, progress=progress) + return [x for y in client.map(new_func, spools) for x in y] diff --git a/docs/tutorial/spool.qmd b/docs/tutorial/spool.qmd index 37cb7aee..c09d3676 100644 --- a/docs/tutorial/spool.qmd +++ b/docs/tutorial/spool.qmd @@ -97,7 +97,7 @@ new_spool = spool[1:] # get_contents -Returns a dataframe listing contents. This method may not be supported on all spools, especially those interfacing with remote resources. +The [`get_contents`](`dascore.core.spool.BaseSpool.get_contents`) method returns a dataframe listing the spool contents. This method may not be supported on all spools, especially those interfacing with large remote resources. ```{python} #| output: false @@ -118,8 +118,7 @@ display(contents.drop(columns=['patch'])) # select -Selects a subset of a spool and returns a new spool. `get_contents` will now -reflect a subset of the original data requested by the select operation. +The [select](`dascore.core.spool.BaseSpool.select`) method selects a subset of a spool and returns a new spool. [`get_contents`](`dascore.core.spool.BaseSpool.get_contents`) will now reflect a subset of the original data requested by the select operation. ```{python} import dascore as dc @@ -129,8 +128,7 @@ spool = dc.get_example_spool() subspool = spool.select(time=('2020-01-03T00:00:09', None)) ``` -In addition to trimming the data along a specified dimension (as shown above), -select can be used to filter patches that meet a specified criteria. +In addition to trimming the data along a specified dimension (as shown above), select can be used to filter patches that meet a specified criteria. ```{python} @@ -147,7 +145,7 @@ subspool = spool.select(tag='some*') # chunk -Chunk controls how data are grouped together in patches. It can be used to merge contiguous patches together, specify the size of patches for processing, specify overlap with previous segments, etc. +The [`chunk`](`dascore.core.spool.BaseSpool.chunk`) method controls how data are grouped together in patches within the spool. It can be used to merge contiguous patches together, specify the size of patches for processing, specify overlap with previous patches, etc. ```{python} import dascore as dc @@ -160,3 +158,27 @@ subspool = spool.chunk(time=3, overlap=1, keep_partial=True) # merge all contiguous segments along time dimension merged_spool = spool.chunk(time=None) ``` + +# map + +The [`map`](`dascore.core.spool.BaseSpool.map`) method applies a function to all patches in the spool. It provides an efficient way to process large datasets, especially when combined with clients (aka executors). + +For example, imagine we want to calculate the maximum value for each channel (distance) for 4 second increments with 1 second overlap. + +```{python} +import dascore as dc +spool = dc.get_example_spool() + +# define function for mapping to each patch +def get_dist_max(patch): + """Function which will be mapped to each patch in spool.""" + return patch.aggregate("time", "max") + +# chunk and apply function +map_out = spool.chunk(time=5, overlap=1).map(get_dist_max) + +# combine output back into a single patch +agg_patch = dc.spool(map_out).chunk(time=None)[0] + +print(agg_patch) +``` diff --git a/tests/test_core/test_spool.py b/tests/test_core/test_spool.py index cd33a824..54e21d4c 100644 --- a/tests/test_core/test_spool.py +++ b/tests/test_core/test_spool.py @@ -2,17 +2,38 @@ from __future__ import annotations import copy +from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor +import numpy as np import pandas as pd import pytest import dascore as dc from dascore.clients.filespool import FileSpool from dascore.core.spool import BaseSpool, MemorySpool -from dascore.exceptions import InvalidSpoolError +from dascore.exceptions import InvalidSpoolError, ParameterError from dascore.utils.time import to_datetime64, to_timedelta64 +def _gigo(garbage): + """Dummy func which can be serialized.""" + return garbage + + +class _SerialClient: + """Serial client for testing mapping logic.""" + + def map(self, func, iterable_thing, **kwargs): + for thing in iterable_thing: + yield func(thing, **kwargs) + + +@pytest.fixture(scope="session") +def random_spool_len_10(): + """Return a spool of length 10.""" + return dc.examples.get_example_spool(length=10) + + class TestSpoolBasics: """Tests for the basics of the spool.""" @@ -52,7 +73,7 @@ def test_eq_self(self, random_spool): assert random_spool == random_spool def test_unequal_attr(self, random_spool): - """Simulate some attribute which isnt equal.""" + """Simulate some attribute which isn't equal.""" new1 = copy.deepcopy(random_spool) new1.__dict__["bad_attr"] = 1 new2 = copy.deepcopy(random_spool) @@ -270,6 +291,114 @@ def test_sorting_attr_distance(self, diverse_spool): assert df["distance_min"].is_monotonic_increasing +class TestSplit: + """Tests splitting spools into smaller spools.""" + + @pytest.fixture(scope="class") + def split_10(self, random_spool_len_10): + """Split the spools using spool size.""" + spools = tuple(random_spool_len_10.split(size=3)) + return spools + + def test_both_parameters_raises(self, random_spool): + """Ensure split raises when both spool_size and spool_count are defined.""" + msg = "requires either spool_count or spool_size" + with pytest.raises(ParameterError, match=msg): + list(random_spool.split(size=1, count=2)) + + def test_spool_size(self, split_10): + """Ensure spool size can be split.""" + # because there are 10 patches in the spool its len should be 4 + assert len(split_10) == 4 + for i in range(3): + assert len(split_10[i]) == 3 + assert len(split_10[-1]) == 1 + + def test_yielded_spools_indexable(self, split_10): + """Ensure we can pull the first patch from each spool.""" + for spool in split_10: + patch = spool[0] + assert isinstance(patch, dc.Patch) + + def test_spool_count(self, random_spool): + """Ensure we can split based on desired size of spool.""" + split = list(random_spool.split(size=2)) + assert len(split) == 2 + assert len(split[0]) == 2 + assert len(split[1]) == 1 + + def test_base_split_raises(self, random_spool): + """Ensure BaseSpool split raises NoteImplementedError.""" + msg = "has no split implementation" + with pytest.raises(NotImplementedError, match=msg): + BaseSpool.split( + random_spool, + ) + + +class TestMap: + """Test for mapping spool contents onto functions.""" + + @pytest.fixture(scope="class") + def thread_client(self): + return ThreadPoolExecutor() + + @pytest.fixture(scope="class") + def proc_client(self): + return ProcessPoolExecutor() + + def test_simple(self, random_spool): + """Simplest case for mapping a function on all patches.""" + out = list(random_spool.map(lambda x: x)) + assert len(out) == len(random_spool) + assert dc.spool(out) == random_spool + + def test_non_patch_return(self, random_spool): + """Ensure outputs don't have to be patches.""" + out = list(random_spool.map(lambda x: np.max(x.data))) + for val in out: + assert isinstance(val, np.float_) + + def test_dummy_client(self, random_spool): + """Ensure a client arguments works.""" + out = list(random_spool.map(lambda x: x, client=_SerialClient())) + assert len(out) == len(random_spool) + assert dc.spool(out) == random_spool + + def test_thread_client(self, random_spool, thread_client): + """Ensure a thread client works""" + out = list(random_spool.map(lambda x: x, client=thread_client)) + assert len(out) == len(random_spool) + assert dc.spool(out) == random_spool + + def test_process_client(self, random_spool, proc_client): + """Ensure process pool also works.""" + out = list(random_spool.map(_gigo, client=proc_client)) + assert len(out) == len(random_spool) + assert dc.spool(out) == random_spool + + def test_map_docstring(self, random_spool): + """Ensure the docstring examples work.""" + results_list = list( + random_spool.chunk(time=5).map(lambda x: np.std(x.data, axis=0)) + ) + out = np.stack(results_list, axis=-1) + assert out.size + + def test_map_docs(self, random_spool): + """Test the doc code for map.""" + + def get_dist_max(patch): + """Function which will be mapped to each patch in spool.""" + return patch.aggregate("time", "max") + + out = list(random_spool.chunk(time=5, overlap=1).map(get_dist_max)) + new_spool = dc.spool(out) + merged = new_spool.chunk(time=None) + assert merged + assert isinstance(merged[0], dc.Patch) + + class TestGetSpool: """Test getting spool from various sources."""