From fc9af8ce9ba365802cee22ffd0baf284a055a31d Mon Sep 17 00:00:00 2001 From: derrick chambers Date: Sat, 19 Aug 2023 18:22:43 -0600 Subject: [PATCH 1/7] start spool split --- dascore/core/spool.py | 125 +++++++++++++++++++++------------- tests/test_core/test_spool.py | 22 +++++- 2 files changed, 99 insertions(+), 48 deletions(-) diff --git a/dascore/core/spool.py b/dascore/core/spool.py index 1762745b..ae1de5eb 100644 --- a/dascore/core/spool.py +++ b/dascore/core/spool.py @@ -5,6 +5,7 @@ from collections.abc import Mapping, Sequence from functools import singledispatch from pathlib import Path +from collections.abc import Generator import numpy as np import pandas as pd @@ -14,7 +15,7 @@ import dascore as dc import dascore.io from dascore.constants import PatchType, numeric_types, timeable_types -from dascore.exceptions import InvalidSpoolError +from dascore.exceptions import InvalidSpoolError, ParameterError from dascore.utils.chunk import ChunkManager from dascore.utils.display import get_dascore_text, get_nice_text from dascore.utils.docs import compose_docstring @@ -44,9 +45,46 @@ def __getitem__(self, item: int) -> PatchType: def __iter__(self) -> PatchType: """Iterate through the Patches in the spool.""" - def update(self) -> Self: - """Updates the contents of the spool and returns a spool.""" - return self + @abc.abstractmethod + def __len__(self) -> int: + """Return len of spool.""" + + def __rich__(self): + """Rich rep. of spool.""" + text = get_dascore_text() + Text(" ") + text += Text(self.__class__.__name__, style=self._rich_style) + text += Text(" 🧵 ") + patch_len = len(self) + text += Text(f"({patch_len:d}") + text += Text(" Patches)") if patch_len != 1 else Text(" Patch)") + return text + + def __str__(self): + return str(self.__rich__()) + + __repr__ = __str__ + + def __eq__(self, other) -> bool: + """Simple equality checks on spools.""" + + def _vals_equal(dict1, dict2): + if set(dict1) != set(dict2): + return False + for key in set(dict1): + val1, val2 = dict1[key], dict2[key] + if isinstance(val1, dict): + if not _vals_equal(val1, val2): + return False + elif hasattr(val1, "equals"): + if not val1.equals(val2): + return False + elif val1 != val2: + return False + return True + + my_dict = self.__dict__ + other_dict = getattr(other, "__dict__", {}) + return _vals_equal(my_dict, other_dict) @abc.abstractmethod def chunk( @@ -87,61 +125,43 @@ def select(self, **kwargs) -> Self: def get_contents(self) -> pd.DataFrame: """Get a dataframe of the patches that will be returned by the spool.""" - @abc.abstractmethod + # --- optional methods + def sort(self, attribute) -> Self: """ Sort the Spool based on a specific attribute. Parameters --------------- - atribute + attribute The attribute or coordinate used for sorting. If a coordinate name is used, the sorting will be based on the minimum value. """ - raise NotImplementedError( - f"spool of type {self.__class__} has no sort implementation" - ) - - @abc.abstractmethod - def __len__(self) -> int: - """Return len of spool.""" - - def __eq__(self, other) -> bool: - """Simple equality checks on spools.""" - - def _vals_equal(dict1, dict2): - if set(dict1) != set(dict2): - return False - for key in set(dict1): - val1, val2 = dict1[key], dict2[key] - if isinstance(val1, dict): - if not _vals_equal(val1, val2): - return False - elif hasattr(val1, "equals"): - if not val1.equals(val2): - return False - elif val1 != val2: - return False - return True + msg = f"spool of type {self.__class__} has no sort implementation" + raise NotImplementedError(msg) - my_dict = self.__dict__ - other_dict = getattr(other, "__dict__", {}) - return _vals_equal(my_dict, other_dict) - - def __rich__(self): - """Rich rep. of spool.""" - text = get_dascore_text() + Text(" ") - text += Text(self.__class__.__name__, style=self._rich_style) - text += Text(" 🧵 ") - patch_len = len(self) - text += Text(f"({patch_len:d}") - text += Text(" Patches)") if patch_len != 1 else Text(" Patch)") - return text + def split( + self, + spool_size: int | None = None, + spool_count: int | None = None, + ) -> Generator[Self, None, None]: + """ + Yield sub-patches based on specified parameters. - def __str__(self): - return str(self.__rich__()) + Parameters + ---------- + spool_size + The number of patches desired in each output spool. The last + spool may have fewer patches. + spool_count + The number of spools + """ + msg = f"spool of type {self.__class__} has no split implementation" + raise NotImplementedError(msg) - __repr__ = __str__ + def update(self) -> Self: + """Updates the contents of the spool and returns a spool.""" + return self class DataFrameSpool(BaseSpool): @@ -356,6 +376,17 @@ def sort(self, attribute) -> Self: # create new spool from new dataframes return self.new_from_df(df=sorted_df, instruction_df=new_instruction_df) + @compose_docstring(doc=BaseSpool.split.__doc__) + def split( + self, + spool_size: int | None = None, + spool_count: int | None = None, + ) -> Generator[Self, None, None]: + """{doc}""" + if spool_count is not None and spool_count is not None: + msg = "spool_count and spool_size cannot both be defined." + raise ParameterError(msg) + @compose_docstring(doc=BaseSpool.get_contents.__doc__) def get_contents(self) -> pd.DataFrame: """{doc}.""" diff --git a/tests/test_core/test_spool.py b/tests/test_core/test_spool.py index cd33a824..0352be11 100644 --- a/tests/test_core/test_spool.py +++ b/tests/test_core/test_spool.py @@ -9,10 +9,16 @@ import dascore as dc from dascore.clients.filespool import FileSpool from dascore.core.spool import BaseSpool, MemorySpool -from dascore.exceptions import InvalidSpoolError +from dascore.exceptions import InvalidSpoolError, ParameterError from dascore.utils.time import to_datetime64, to_timedelta64 +@pytest.fixture(scope="session") +def random_spool_len_10(): + """Return a spool of length 10.""" + return dc.examples.get_example_spool(length=10) + + class TestSpoolBasics: """Tests for the basics of the spool.""" @@ -270,6 +276,20 @@ def test_sorting_attr_distance(self, diverse_spool): assert df["distance_min"].is_monotonic_increasing +class TestSplit: + """Tests splitting spools into smaller spools.""" + + def test_both_parameters_raises(self, random_spool): + """Ensure split raises when both spool_size and spool_count are defined.""" + msg = "spool_count and spool_size cannot" + with pytest.raises(ParameterError, match=msg): + random_spool.split(spool_size=1, spool_count=2) + + def test_spool_size(self, random_spool_len_10): + """Ensure spool size can be split.""" + # spools = list(random_spool_len_10.split(spool_size=3)) + + class TestGetSpool: """Test getting spool from various sources.""" From b65d3259d52d27a58f65e100cbedf2bc08068e8c Mon Sep 17 00:00:00 2001 From: derrick chambers Date: Sat, 19 Aug 2023 18:22:43 -0600 Subject: [PATCH 2/7] start spool split --- dascore/core/spool.py | 125 +++++++++++++++++++++------------- tests/test_core/test_spool.py | 22 +++++- 2 files changed, 99 insertions(+), 48 deletions(-) diff --git a/dascore/core/spool.py b/dascore/core/spool.py index 1762745b..ae1de5eb 100644 --- a/dascore/core/spool.py +++ b/dascore/core/spool.py @@ -5,6 +5,7 @@ from collections.abc import Mapping, Sequence from functools import singledispatch from pathlib import Path +from collections.abc import Generator import numpy as np import pandas as pd @@ -14,7 +15,7 @@ import dascore as dc import dascore.io from dascore.constants import PatchType, numeric_types, timeable_types -from dascore.exceptions import InvalidSpoolError +from dascore.exceptions import InvalidSpoolError, ParameterError from dascore.utils.chunk import ChunkManager from dascore.utils.display import get_dascore_text, get_nice_text from dascore.utils.docs import compose_docstring @@ -44,9 +45,46 @@ def __getitem__(self, item: int) -> PatchType: def __iter__(self) -> PatchType: """Iterate through the Patches in the spool.""" - def update(self) -> Self: - """Updates the contents of the spool and returns a spool.""" - return self + @abc.abstractmethod + def __len__(self) -> int: + """Return len of spool.""" + + def __rich__(self): + """Rich rep. of spool.""" + text = get_dascore_text() + Text(" ") + text += Text(self.__class__.__name__, style=self._rich_style) + text += Text(" 🧵 ") + patch_len = len(self) + text += Text(f"({patch_len:d}") + text += Text(" Patches)") if patch_len != 1 else Text(" Patch)") + return text + + def __str__(self): + return str(self.__rich__()) + + __repr__ = __str__ + + def __eq__(self, other) -> bool: + """Simple equality checks on spools.""" + + def _vals_equal(dict1, dict2): + if set(dict1) != set(dict2): + return False + for key in set(dict1): + val1, val2 = dict1[key], dict2[key] + if isinstance(val1, dict): + if not _vals_equal(val1, val2): + return False + elif hasattr(val1, "equals"): + if not val1.equals(val2): + return False + elif val1 != val2: + return False + return True + + my_dict = self.__dict__ + other_dict = getattr(other, "__dict__", {}) + return _vals_equal(my_dict, other_dict) @abc.abstractmethod def chunk( @@ -87,61 +125,43 @@ def select(self, **kwargs) -> Self: def get_contents(self) -> pd.DataFrame: """Get a dataframe of the patches that will be returned by the spool.""" - @abc.abstractmethod + # --- optional methods + def sort(self, attribute) -> Self: """ Sort the Spool based on a specific attribute. Parameters --------------- - atribute + attribute The attribute or coordinate used for sorting. If a coordinate name is used, the sorting will be based on the minimum value. """ - raise NotImplementedError( - f"spool of type {self.__class__} has no sort implementation" - ) - - @abc.abstractmethod - def __len__(self) -> int: - """Return len of spool.""" - - def __eq__(self, other) -> bool: - """Simple equality checks on spools.""" - - def _vals_equal(dict1, dict2): - if set(dict1) != set(dict2): - return False - for key in set(dict1): - val1, val2 = dict1[key], dict2[key] - if isinstance(val1, dict): - if not _vals_equal(val1, val2): - return False - elif hasattr(val1, "equals"): - if not val1.equals(val2): - return False - elif val1 != val2: - return False - return True + msg = f"spool of type {self.__class__} has no sort implementation" + raise NotImplementedError(msg) - my_dict = self.__dict__ - other_dict = getattr(other, "__dict__", {}) - return _vals_equal(my_dict, other_dict) - - def __rich__(self): - """Rich rep. of spool.""" - text = get_dascore_text() + Text(" ") - text += Text(self.__class__.__name__, style=self._rich_style) - text += Text(" 🧵 ") - patch_len = len(self) - text += Text(f"({patch_len:d}") - text += Text(" Patches)") if patch_len != 1 else Text(" Patch)") - return text + def split( + self, + spool_size: int | None = None, + spool_count: int | None = None, + ) -> Generator[Self, None, None]: + """ + Yield sub-patches based on specified parameters. - def __str__(self): - return str(self.__rich__()) + Parameters + ---------- + spool_size + The number of patches desired in each output spool. The last + spool may have fewer patches. + spool_count + The number of spools + """ + msg = f"spool of type {self.__class__} has no split implementation" + raise NotImplementedError(msg) - __repr__ = __str__ + def update(self) -> Self: + """Updates the contents of the spool and returns a spool.""" + return self class DataFrameSpool(BaseSpool): @@ -356,6 +376,17 @@ def sort(self, attribute) -> Self: # create new spool from new dataframes return self.new_from_df(df=sorted_df, instruction_df=new_instruction_df) + @compose_docstring(doc=BaseSpool.split.__doc__) + def split( + self, + spool_size: int | None = None, + spool_count: int | None = None, + ) -> Generator[Self, None, None]: + """{doc}""" + if spool_count is not None and spool_count is not None: + msg = "spool_count and spool_size cannot both be defined." + raise ParameterError(msg) + @compose_docstring(doc=BaseSpool.get_contents.__doc__) def get_contents(self) -> pd.DataFrame: """{doc}.""" diff --git a/tests/test_core/test_spool.py b/tests/test_core/test_spool.py index cd33a824..0352be11 100644 --- a/tests/test_core/test_spool.py +++ b/tests/test_core/test_spool.py @@ -9,10 +9,16 @@ import dascore as dc from dascore.clients.filespool import FileSpool from dascore.core.spool import BaseSpool, MemorySpool -from dascore.exceptions import InvalidSpoolError +from dascore.exceptions import InvalidSpoolError, ParameterError from dascore.utils.time import to_datetime64, to_timedelta64 +@pytest.fixture(scope="session") +def random_spool_len_10(): + """Return a spool of length 10.""" + return dc.examples.get_example_spool(length=10) + + class TestSpoolBasics: """Tests for the basics of the spool.""" @@ -270,6 +276,20 @@ def test_sorting_attr_distance(self, diverse_spool): assert df["distance_min"].is_monotonic_increasing +class TestSplit: + """Tests splitting spools into smaller spools.""" + + def test_both_parameters_raises(self, random_spool): + """Ensure split raises when both spool_size and spool_count are defined.""" + msg = "spool_count and spool_size cannot" + with pytest.raises(ParameterError, match=msg): + random_spool.split(spool_size=1, spool_count=2) + + def test_spool_size(self, random_spool_len_10): + """Ensure spool size can be split.""" + # spools = list(random_spool_len_10.split(spool_size=3)) + + class TestGetSpool: """Test getting spool from various sources.""" From 21bef7df6c6010657da4e9d3d6ea400d1bfe61dc Mon Sep 17 00:00:00 2001 From: derrick chambers Date: Mon, 21 Aug 2023 10:32:13 -0600 Subject: [PATCH 3/7] work on map --- dascore/constants.py | 11 +++- dascore/core/spool.py | 45 +++++++++++++--- dascore/utils/misc.py | 30 ++++++++++- tests/test_core/test_spool.py | 96 +++++++++++++++++++++++++++++++++-- 4 files changed, 169 insertions(+), 13 deletions(-) diff --git a/dascore/constants.py b/dascore/constants.py index 82f57fba..66e28fc0 100644 --- a/dascore/constants.py +++ b/dascore/constants.py @@ -2,7 +2,7 @@ from __future__ import annotations from pathlib import Path -from typing import TypeVar +from typing import TypeVar, Protocol, runtime_checkable import numpy as np import pandas as pd @@ -13,6 +13,15 @@ SpoolType = TypeVar("SpoolType", bound="dascore.BaseSpool") + +@runtime_checkable +class ExecutorType(Protocol): + """Protocol for Executors that DASCore can use.""" + + def map(self, func, iterables, **kwargs): + """Map function for applying concurrency of some flavor.""" + + # Bump this to force re-downloading of all data file DATA_VERSION = "0.0.0" diff --git a/dascore/core/spool.py b/dascore/core/spool.py index ae1de5eb..1ef063e1 100644 --- a/dascore/core/spool.py +++ b/dascore/core/spool.py @@ -6,6 +6,8 @@ from functools import singledispatch from pathlib import Path from collections.abc import Generator +from typing import TypeVar +from collections.abc import Callable import numpy as np import pandas as pd @@ -14,13 +16,13 @@ import dascore as dc import dascore.io -from dascore.constants import PatchType, numeric_types, timeable_types +from dascore.constants import PatchType, numeric_types, timeable_types, ExecutorType from dascore.exceptions import InvalidSpoolError, ParameterError from dascore.utils.chunk import ChunkManager from dascore.utils.display import get_dascore_text, get_nice_text from dascore.utils.docs import compose_docstring from dascore.utils.mapping import FrozenDict -from dascore.utils.misc import CacheDescriptor +from dascore.utils.misc import CacheDescriptor, _spool_map from dascore.utils.patch import _force_patch_merge, patches_to_df from dascore.utils.pd import ( _convert_min_max_in_kwargs, @@ -32,6 +34,9 @@ ) +T = TypeVar("T") + + class BaseSpool(abc.ABC): """Spool Abstract Base Class (ABC) for defining Spool interface.""" @@ -154,7 +159,9 @@ def split( The number of patches desired in each output spool. The last spool may have fewer patches. spool_count - The number of spools + The number of spools to include. If spool_count is greater than + the length of the spool then the output will be smaller than + spool_count, with one patch per spool. """ msg = f"spool of type {self.__class__} has no split implementation" raise NotImplementedError(msg) @@ -163,6 +170,27 @@ def update(self) -> Self: """Updates the contents of the spool and returns a spool.""" return self + def map( + self, + func: Callable[[dc.Patch, ...], T], + *, + client: ExecutorType | None = None, + **kwargs, + ) -> Generator[T]: + """ + Map a function of all the contents of the spool. + + Parameters + ---------- + func + A callable which takes a patch as its first argument. + client + A client, or executor, which has a `map` method. + **kwargs + kwargs passed to func. + """ + yield from _spool_map(self, func, client=client, **kwargs) + class DataFrameSpool(BaseSpool): """An abstract class for spools whose contents are managed by a dataframe.""" @@ -383,9 +411,14 @@ def split( spool_count: int | None = None, ) -> Generator[Self, None, None]: """{doc}""" - if spool_count is not None and spool_count is not None: - msg = "spool_count and spool_size cannot both be defined." + if not ((spool_count is not None) ^ (spool_size is not None)): + msg = "Spool.split requires either spool_count or spool_size." raise ParameterError(msg) + start = 0 + step = int(np.ceil(len(self) / spool_count) if spool_count else spool_size) + while start < len(self): + yield self[start : start + step] + start += step @compose_docstring(doc=BaseSpool.get_contents.__doc__) def get_contents(self) -> pd.DataFrame: @@ -416,8 +449,6 @@ def __rich__(self): def _load_patch(self, kwargs) -> Self: """Load the patch into memory.""" - # final_kwargs = dict(kwargs) - # final_kwargs.update(self._select_kwargs) return kwargs["patch"] diff --git a/dascore/utils/misc.py b/dascore/utils/misc.py index 4b7a526c..bad6856c 100644 --- a/dascore/utils/misc.py +++ b/dascore/utils/misc.py @@ -24,7 +24,7 @@ class _Sentinel: - """Dump little sentinel for key checks.""" + """Sentinel for key checks.""" def register_func(list_or_dict: list | dict, key=None): @@ -549,3 +549,31 @@ def wrapper(self, *args, **kwargs): return out return wrapper + + +class _MapFuncWrapper: + """A class for unwrapping spools to base applies.""" + + def __init__(self, func, kwargs): + self._func = func + self._kwargs = kwargs + + def __call__(self, spool): + return [self._func(x, **self._kwargs) for x in spool] + + +def _spool_map(spool, func, client=None, **kwargs): + """ + Map a func over a spool. + """ + # no client; simple for loop. + if client is None: + for patch in spool: + yield func(patch, **kwargs) + return + # Now things get interesting. We need to split the spool here + # so that patches don't get serialized. + spools = spool.split(spool_count=os.cpu_count()) + new_func = _MapFuncWrapper(func, kwargs) + for out in client.map(new_func, spools): + yield from out diff --git a/tests/test_core/test_spool.py b/tests/test_core/test_spool.py index 0352be11..cffbcd4c 100644 --- a/tests/test_core/test_spool.py +++ b/tests/test_core/test_spool.py @@ -2,7 +2,9 @@ from __future__ import annotations import copy +from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor +import numpy as np import pandas as pd import pytest @@ -13,6 +15,19 @@ from dascore.utils.time import to_datetime64, to_timedelta64 +def _gigo(garbage): + """Dummy func which can be serialized.""" + return garbage + + +class _SerialClient: + """Serial client for testing mapping logic.""" + + def map(self, func, iterable_thing, **kwargs): + for thing in iterable_thing: + yield func(thing, **kwargs) + + @pytest.fixture(scope="session") def random_spool_len_10(): """Return a spool of length 10.""" @@ -279,15 +294,88 @@ def test_sorting_attr_distance(self, diverse_spool): class TestSplit: """Tests splitting spools into smaller spools.""" + @pytest.fixture(scope="class") + def split_10(self, random_spool_len_10): + """Split the spools using spool size.""" + spools = tuple(random_spool_len_10.split(spool_size=3)) + return spools + def test_both_parameters_raises(self, random_spool): """Ensure split raises when both spool_size and spool_count are defined.""" - msg = "spool_count and spool_size cannot" + msg = "requires either spool_count or spool_size" with pytest.raises(ParameterError, match=msg): - random_spool.split(spool_size=1, spool_count=2) + list(random_spool.split(spool_size=1, spool_count=2)) - def test_spool_size(self, random_spool_len_10): + def test_spool_size(self, split_10): """Ensure spool size can be split.""" - # spools = list(random_spool_len_10.split(spool_size=3)) + # because there are 10 patches in the spool its len should be 4 + assert len(split_10) == 4 + for i in range(3): + assert len(split_10[i]) == 3 + assert len(split_10[-1]) == 1 + + def test_yielded_spools_indexable(self, split_10): + """Ensure we can pull the first patch from each spool.""" + for spool in split_10: + patch = spool[0] + assert isinstance(patch, dc.Patch) + + def test_spool_count(self, random_spool): + """Ensure we can split based on desired size of spool.""" + split = list(random_spool.split(spool_size=2)) + assert len(split) == 2 + assert len(split[0]) == 2 + assert len(split[1]) == 1 + + def test_base_split_raises(self, random_spool): + """Ensure BaseSpool split raises NoteImplementedError.""" + msg = "has no split implementation" + with pytest.raises(NotImplementedError, match=msg): + BaseSpool.split( + random_spool, + ) + + +class TestMap: + """Test for mapping spool contents onto functions.""" + + @pytest.fixture(scope="class") + def thread_client(self): + return ThreadPoolExecutor() + + @pytest.fixture(scope="class") + def proc_client(self): + return ProcessPoolExecutor() + + def test_simple(self, random_spool): + """Simplest case for mapping a function on all patches.""" + out = list(random_spool.map(lambda x: x)) + assert len(out) == len(random_spool) + assert dc.spool(out) == random_spool + + def test_non_patch_return(self, random_spool): + """Ensure outputs don't have to be patches.""" + out = list(random_spool.map(lambda x: np.max(x.data))) + for val in out: + assert isinstance(val, np.float_) + + def test_dummy_client(self, random_spool): + """Ensure a client arguments works.""" + out = list(random_spool.map(lambda x: x, client=_SerialClient())) + assert len(out) == len(random_spool) + assert dc.spool(out) == random_spool + + def test_thread_client(self, random_spool, thread_client): + """Ensure a thread client works""" + out = list(random_spool.map(lambda x: x, client=thread_client)) + assert len(out) == len(random_spool) + assert dc.spool(out) == random_spool + + def test_process_client(self, random_spool, proc_client): + """Ensure process pool also works.""" + out = list(random_spool.map(_gigo, client=proc_client)) + assert len(out) == len(random_spool) + assert dc.spool(out) == random_spool class TestGetSpool: From 1341899964821c5db6d95831d8ecb44bf524dd9b Mon Sep 17 00:00:00 2001 From: derrick chambers Date: Mon, 21 Aug 2023 11:15:16 -0600 Subject: [PATCH 4/7] start examples --- dascore/core/spool.py | 24 +++++++++++++++++++++++- dascore/utils/misc.py | 6 ++++-- tests/test_core/test_spool.py | 2 +- 3 files changed, 28 insertions(+), 4 deletions(-) diff --git a/dascore/core/spool.py b/dascore/core/spool.py index 1ef063e1..4c702f72 100644 --- a/dascore/core/spool.py +++ b/dascore/core/spool.py @@ -175,6 +175,8 @@ def map( func: Callable[[dc.Patch, ...], T], *, client: ExecutorType | None = None, + spool_count: int | None = None, + spool_size: int | None = None, **kwargs, ) -> Generator[T]: """ @@ -186,10 +188,30 @@ def map( A callable which takes a patch as its first argument. client A client, or executor, which has a `map` method. + spool_count + The total number of spools which get mapped to a client. + Does nothing if client is None and can't be used with spool_size. + spool_size + The number of patches in each spool mapped to a client. + Does nothing if client is None and can't be used with spool_count. **kwargs kwargs passed to func. + + Notes + ----- + When a client is specified, the spool is split then passed to the + client's map method. This is to avoid serializing loaded patches. + See [`Spool.split`](`dacore.core.spool.BaseSpool.split`) for more + details about the `spool_count` and `spool_size` parameters. + + Examples + -------- + >>> import dascore + """ - yield from _spool_map(self, func, client=client, **kwargs) + yield from _spool_map( + self, func, client=client, spool_count=None, spool_size=None, **kwargs + ) class DataFrameSpool(BaseSpool): diff --git a/dascore/utils/misc.py b/dascore/utils/misc.py index bad6856c..3bb9725f 100644 --- a/dascore/utils/misc.py +++ b/dascore/utils/misc.py @@ -562,7 +562,7 @@ def __call__(self, spool): return [self._func(x, **self._kwargs) for x in spool] -def _spool_map(spool, func, client=None, **kwargs): +def _spool_map(spool, func, spool_count=None, spool_size=None, client=None, **kwargs): """ Map a func over a spool. """ @@ -573,7 +573,9 @@ def _spool_map(spool, func, client=None, **kwargs): return # Now things get interesting. We need to split the spool here # so that patches don't get serialized. - spools = spool.split(spool_count=os.cpu_count()) + if spool_count is None and spool_size is None: + spool_count = os.cpu_count() + spools = spool.split(spool_count=spool_count, spool_size=spool_size) new_func = _MapFuncWrapper(func, kwargs) for out in client.map(new_func, spools): yield from out diff --git a/tests/test_core/test_spool.py b/tests/test_core/test_spool.py index cffbcd4c..43966760 100644 --- a/tests/test_core/test_spool.py +++ b/tests/test_core/test_spool.py @@ -73,7 +73,7 @@ def test_eq_self(self, random_spool): assert random_spool == random_spool def test_unequal_attr(self, random_spool): - """Simulate some attribute which isnt equal.""" + """Simulate some attribute which isn't equal.""" new1 = copy.deepcopy(random_spool) new1.__dict__["bad_attr"] = 1 new2 = copy.deepcopy(random_spool) From ef6c04d0a152cebf3d31520fcc13d086d29ade5b Mon Sep 17 00:00:00 2001 From: derrick chambers Date: Mon, 21 Aug 2023 18:09:04 -0600 Subject: [PATCH 5/7] update spool map --- dascore/core/spool.py | 25 ++++++++++++++++++++----- dascore/utils/chunk.py | 2 +- docs/tutorial/spool.qmd | 19 ++++++++++++++++++- tests/test_core/test_spool.py | 20 ++++++++++++++++++++ 4 files changed, 59 insertions(+), 7 deletions(-) diff --git a/dascore/core/spool.py b/dascore/core/spool.py index 4c702f72..0d24573d 100644 --- a/dascore/core/spool.py +++ b/dascore/core/spool.py @@ -201,16 +201,31 @@ def map( ----- When a client is specified, the spool is split then passed to the client's map method. This is to avoid serializing loaded patches. - See [`Spool.split`](`dacore.core.spool.BaseSpool.split`) for more + See [`Spool.split`](`dascore.core.spool.BaseSpool.split`) for more details about the `spool_count` and `spool_size` parameters. Examples -------- - >>> import dascore - + >>> import numpy as np + >>> import dascore as dc + >>> + >>> spool = dc.get_example_spool("random_das") + >>> + >>> # Calculate the std for each channel in 5 second chunks + >>> results_list = list( + ... spool.chunk(time=5) + ... .map(lambda x: np.std(x.data, axis=0)) + ... ) + >>> # stack back into array. dims are (distance, time chunk) + >>> out = np.stack(results_list, axis=-1) """ yield from _spool_map( - self, func, client=client, spool_count=None, spool_size=None, **kwargs + self, + func, + client=client, + spool_count=spool_count, + spool_size=spool_size, + **kwargs, ) @@ -226,7 +241,7 @@ class DataFrameSpool(BaseSpool): # kwargs for filtering contents _select_kwargs: Mapping | None = FrozenDict() # attributes which effect merge groups for internal patches - _group_columns = ("network", "station", "dims", "data_type", "history", "tag") + _group_columns = ("network", "station", "dims", "data_type", "tag") _drop_columns = ("patch",) def _get_df(self): diff --git a/dascore/utils/chunk.py b/dascore/utils/chunk.py index b179a1fb..239c6ff0 100644 --- a/dascore/utils/chunk.py +++ b/dascore/utils/chunk.py @@ -218,7 +218,7 @@ def _create_df(self, df, name, start_stop, gnum): out = pd.DataFrame(start_stop, columns=list(cols)) out[f"{name}_step"] = get_middle_value(df[f"{name}_step"].values) merger = df.drop(columns=out.columns) - for col in merger: + for col in set(merger.columns) & set(self._group_columns): vals = merger[col].unique() if len(vals) > 1: msg = ( diff --git a/docs/tutorial/spool.qmd b/docs/tutorial/spool.qmd index 37cb7aee..3d05cc3d 100644 --- a/docs/tutorial/spool.qmd +++ b/docs/tutorial/spool.qmd @@ -147,7 +147,7 @@ subspool = spool.select(tag='some*') # chunk -Chunk controls how data are grouped together in patches. It can be used to merge contiguous patches together, specify the size of patches for processing, specify overlap with previous segments, etc. +The [`chunk`](`dascore.core.spool.BaseSpool.chunk`) method controls how data are grouped together in patches within the spool. It can be used to merge contiguous patches together, specify the size of patches for processing, specify overlap with previous patches, etc. ```{python} import dascore as dc @@ -160,3 +160,20 @@ subspool = spool.chunk(time=3, overlap=1, keep_partial=True) # merge all contiguous segments along time dimension merged_spool = spool.chunk(time=None) ``` + +# map + +The [`map`](`dascore.core.spool.BaseSpool.map`) method applies a function to all patches in the spool. It provides an efficient way to process large datasets, especially when combined with clients (aka executors). + +For example, imagine we want to calculate the maximum value for each channel (distance) for 4 second increments with 1 second overlap. + +```{.python} +import dascore as dc +spool = dc.get_example_spool() + +def get_dist_max(patch): + """Function which will be mapped to each patch in spool.""" + return patch.aggregate("time", "max") + +out = spool.chunk(time=5, overlap=1).map(get_dist_max) +``` diff --git a/tests/test_core/test_spool.py b/tests/test_core/test_spool.py index 43966760..274f4cd6 100644 --- a/tests/test_core/test_spool.py +++ b/tests/test_core/test_spool.py @@ -377,6 +377,26 @@ def test_process_client(self, random_spool, proc_client): assert len(out) == len(random_spool) assert dc.spool(out) == random_spool + def test_map_docstring(self, random_spool): + """Ensure the docstring examples work.""" + results_list = list( + random_spool.chunk(time=5).map(lambda x: np.std(x.data, axis=0)) + ) + out = np.stack(results_list, axis=-1) + assert out.size + + def test_map_docs(self, random_spool): + """Test the doc code for map.""" + + def get_dist_max(patch): + """Function which will be mapped to each patch in spool.""" + return patch.aggregate("time", "max") + + out = list(random_spool.chunk(time=5, overlap=1).map(get_dist_max)) + new_spool = dc.spool(out) + merged = new_spool.chunk(time=None) + assert isinstance(merged[0], dc.Patch) + class TestGetSpool: """Test getting spool from various sources.""" From ca519d2815ab7c7477876550459dbd1d32928c49 Mon Sep 17 00:00:00 2001 From: derrick chambers Date: Wed, 23 Aug 2023 13:37:03 -0600 Subject: [PATCH 6/7] rework split --- dascore/core/spool.py | 44 +++++++++++++++++------------------ dascore/utils/misc.py | 40 +++++++++++++++++++++---------- tests/test_core/test_spool.py | 6 ++--- 3 files changed, 53 insertions(+), 37 deletions(-) diff --git a/dascore/core/spool.py b/dascore/core/spool.py index f9e827a7..cf2ad966 100644 --- a/dascore/core/spool.py +++ b/dascore/core/spool.py @@ -146,21 +146,21 @@ def sort(self, attribute) -> Self: def split( self, - spool_size: int | None = None, - spool_count: int | None = None, + size: int | None = None, + count: int | None = None, ) -> Generator[Self, None, None]: """ Yield sub-patches based on specified parameters. Parameters ---------- - spool_size + size The number of patches desired in each output spool. The last spool may have fewer patches. - spool_count - The number of spools to include. If spool_count is greater than + count + The number of spools to include. If count is greater than the length of the spool then the output will be smaller than - spool_count, with one patch per spool. + count, with one patch per spool. """ msg = f"spool of type {self.__class__} has no split implementation" raise NotImplementedError(msg) @@ -174,10 +174,10 @@ def map( func: Callable[[dc.Patch, ...], T], *, client: ExecutorType | None = None, - spool_count: int | None = None, - spool_size: int | None = None, + size: int | None = None, + progress: bool = True, **kwargs, - ) -> Generator[T]: + ) -> list[T]: """ Map a function of all the contents of the spool. @@ -187,12 +187,12 @@ def map( A callable which takes a patch as its first argument. client A client, or executor, which has a `map` method. - spool_count - The total number of spools which get mapped to a client. - Does nothing if client is None and can't be used with spool_size. - spool_size + size The number of patches in each spool mapped to a client. - Does nothing if client is None and can't be used with spool_count. + If not set, defaults to the number of processors on the host. + Does nothing unless client is defined. + progress + If True, display a progress bar. **kwargs kwargs passed to func. @@ -218,12 +218,12 @@ def map( >>> # stack back into array. dims are (distance, time chunk) >>> out = np.stack(results_list, axis=-1) """ - yield from _spool_map( + return _spool_map( self, func, client=client, - spool_count=spool_count, - spool_size=spool_size, + size=size, + progress=progress, **kwargs, ) @@ -240,7 +240,7 @@ class DataFrameSpool(BaseSpool): # kwargs for filtering contents _select_kwargs: Mapping | None = FrozenDict() # attributes which effect merge groups for internal patches - _group_columns = ("network", "station", "dims", "data_type", "tag") + _group_columns = ("network", "station", "dims", "data_type", "tag", "history") _drop_columns = ("patch",) def _get_df(self): @@ -443,15 +443,15 @@ def sort(self, attribute) -> Self: @compose_docstring(doc=BaseSpool.split.__doc__) def split( self, - spool_size: int | None = None, - spool_count: int | None = None, + size: int | None = None, + count: int | None = None, ) -> Generator[Self, None, None]: """{doc}""" - if not ((spool_count is not None) ^ (spool_size is not None)): + if not ((count is not None) ^ (size is not None)): msg = "Spool.split requires either spool_count or spool_size." raise ParameterError(msg) start = 0 - step = int(np.ceil(len(self) / spool_count) if spool_count else spool_size) + step = int(np.ceil(len(self) / count if count else size)) while start < len(self): yield self[start : start + step] start += step diff --git a/dascore/utils/misc.py b/dascore/utils/misc.py index 3bb9725f..c8663b8b 100644 --- a/dascore/utils/misc.py +++ b/dascore/utils/misc.py @@ -21,6 +21,7 @@ import dascore as dc from dascore.exceptions import MissingOptionalDependency +from dascore.utils.progress import track class _Sentinel: @@ -554,28 +555,43 @@ def wrapper(self, *args, **kwargs): class _MapFuncWrapper: """A class for unwrapping spools to base applies.""" - def __init__(self, func, kwargs): + def __init__(self, func, kwargs, progress=True): self._func = func self._kwargs = kwargs + self._progress = progress def __call__(self, spool): - return [self._func(x, **self._kwargs) for x in spool] + desc = f"Applying {self._func.__name__} to {spool}" + iterable = track(spool, desc) if self._progress else spool + return [self._func(x, **self._kwargs) for x in iterable] -def _spool_map(spool, func, spool_count=None, spool_size=None, client=None, **kwargs): +def _spool_map(spool, func, size=None, client=None, progress=True, **kwargs): """ Map a func over a spool. + + Parameters + ---------- + spool + The spool object ot apply func to + size + The number of patches for each spool (ie chunksize) + client + An object with a map method for applying concurrency. + progress + If True, display a progress bar. + **kwargs + Keywords passed to func. """ # no client; simple for loop. + desc = f"Applying {func.__name__} to {spool}" if client is None: - for patch in spool: - yield func(patch, **kwargs) - return + iterable = track(spool, desc) if progress else spool + return [func(patch, **kwargs) for patch in iterable] # Now things get interesting. We need to split the spool here # so that patches don't get serialized. - if spool_count is None and spool_size is None: - spool_count = os.cpu_count() - spools = spool.split(spool_count=spool_count, spool_size=spool_size) - new_func = _MapFuncWrapper(func, kwargs) - for out in client.map(new_func, spools): - yield from out + if size is None: + size = len(spool) / os.cpu_count() + spools = spool.split(size=size) + new_func = _MapFuncWrapper(func, kwargs, progress=progress) + return [x for y in client.map(new_func, spools) for x in y] diff --git a/tests/test_core/test_spool.py b/tests/test_core/test_spool.py index 274f4cd6..6ed1e762 100644 --- a/tests/test_core/test_spool.py +++ b/tests/test_core/test_spool.py @@ -297,14 +297,14 @@ class TestSplit: @pytest.fixture(scope="class") def split_10(self, random_spool_len_10): """Split the spools using spool size.""" - spools = tuple(random_spool_len_10.split(spool_size=3)) + spools = tuple(random_spool_len_10.split(size=3)) return spools def test_both_parameters_raises(self, random_spool): """Ensure split raises when both spool_size and spool_count are defined.""" msg = "requires either spool_count or spool_size" with pytest.raises(ParameterError, match=msg): - list(random_spool.split(spool_size=1, spool_count=2)) + list(random_spool.split(size=1, count=2)) def test_spool_size(self, split_10): """Ensure spool size can be split.""" @@ -322,7 +322,7 @@ def test_yielded_spools_indexable(self, split_10): def test_spool_count(self, random_spool): """Ensure we can split based on desired size of spool.""" - split = list(random_spool.split(spool_size=2)) + split = list(random_spool.split(size=2)) assert len(split) == 2 assert len(split[0]) == 2 assert len(split[1]) == 1 From 5ed70530bee07b7e4aa4fa142d5af1bbc31d6966 Mon Sep 17 00:00:00 2001 From: derrick chambers Date: Wed, 23 Aug 2023 15:05:30 -0600 Subject: [PATCH 7/7] add better docs to spool methods --- dascore/clients/dirspool.py | 19 ++---- dascore/clients/filespool.py | 8 ++- dascore/core/spool.py | 105 +++++++++++++++++++++++++++------- dascore/proc/basic.py | 2 +- dascore/utils/chunk.py | 2 +- dascore/utils/misc.py | 4 +- docs/tutorial/spool.qmd | 19 +++--- tests/test_core/test_spool.py | 1 + 8 files changed, 111 insertions(+), 49 deletions(-) diff --git a/dascore/clients/dirspool.py b/dascore/clients/dirspool.py index 5290be6d..1e5c6cdc 100644 --- a/dascore/clients/dirspool.py +++ b/dascore/clients/dirspool.py @@ -13,9 +13,10 @@ from typing_extensions import Self import dascore as dc -from dascore.core.spool import DataFrameSpool +from dascore.core.spool import DataFrameSpool, BaseSpool from dascore.io.indexer import AbstractIndexer, DirectoryIndexer from dascore.utils.pd import adjust_segments +from dascore.utils.docs import compose_docstring class DirectorySpool(DataFrameSpool): @@ -91,24 +92,16 @@ def spool_path(self): """Return the path in which the spool contents are found.""" return self.indexer.path + @compose_docstring(doc=BaseSpool.get_contents.__doc__) def get_contents(self) -> pd.DataFrame: """ - Return a dataframe of the contents of the data files. - - Parameters - ---------- - time - If not None, a tuple of start/end time where either can be None - indicating an open interval. + {doc} """ return self._df + @compose_docstring(doc=BaseSpool.update.__doc__) def update(self) -> Self: - """ - Updates the contents of the spool and returns a spool. - - Resets any previous selection. - """ + """{doc}""" out = self.__class__( base_path=self.indexer.update(), preferred_format=self._preferred_format, diff --git a/dascore/clients/filespool.py b/dascore/clients/filespool.py index 9f68ad55..599ae541 100644 --- a/dascore/clients/filespool.py +++ b/dascore/clients/filespool.py @@ -9,8 +9,9 @@ import dascore as dc from dascore.constants import SpoolType -from dascore.core.spool import DataFrameSpool +from dascore.core.spool import DataFrameSpool, BaseSpool from dascore.io.core import FiberIO +from dascore.utils.docs import compose_docstring class FileSpool(DataFrameSpool): @@ -65,11 +66,12 @@ def _load_patch(self, kwargs) -> Self: """Given a row from the managed dataframe, return a patch.""" return dc.read(**kwargs)[0] + @compose_docstring(doc=BaseSpool.update.__doc__) def update(self: SpoolType) -> Self: """ - Update the spool. + {doc} - If the file format supports indexing (e.g. DASDAE) this will + Note: If the file format supports indexing (e.g. DASDAE) this will trigger an indexing of the file. """ formater = FiberIO.manager.get_fiberio(self._file_format, self._file_version) diff --git a/dascore/core/spool.py b/dascore/core/spool.py index cf2ad966..a215deac 100644 --- a/dascore/core/spool.py +++ b/dascore/core/spool.py @@ -106,7 +106,7 @@ def chunk( ---------- overlap The amount of overlap between each segment, starting with the end of - first patch. Negative values can be used to induce gaps. + first patch. Negative values can be used to create gaps. keep_partial If True, keep the segments which are smaller than chunk size. This often occurs because of data gaps or at end of chunks. @@ -119,15 +119,55 @@ def chunk( kwargs kwargs are used to specify the dimension along which to chunk, eg: `time=10` chunks along the time axis in 10 second increments. + + Examples + -------- + >>> import dascore as dc + >>> from dascore.units import s + >>> + >>> spool = dc.get_example_spool("random_das") + >>> # get spools with time duration of 10 seconds + >>> time_chunked = spool.chunk(time=10, overlap=1) + >>> # merge along time axis + >>> time_merged = spool.chunk(time=...) """ @abc.abstractmethod def select(self, **kwargs) -> Self: - """Select only part of the data.""" + """ + Sub-select parts of the spool. + + Can be used to specify dimension ranges, or unix-style matches + on string attributes. + + Parameters + ---------- + **kwargs + Specifies query. Can be of the form {dim_name=(start, stop)} + or {attr_name=query}. + + Examples + -------- + >>> import dascore as dc + >>> spool = dc.get_example_spool("diverse_das") + >>> # subselect data in a particular time range + >>> time = ('2020-01-03', '2020-01-03T00:00:10') + >>> time_spool = spool.select(time=time) + >>> # subselect based on matching tag parameter + >>> tag_spool = spool.select(tag='some*') + """ @abc.abstractmethod def get_contents(self) -> pd.DataFrame: - """Get a dataframe of the patches that will be returned by the spool.""" + """ + Get a dataframe of the spool contents. + + Examples + -------- + >>> import dascore as dc + >>> spool = dc.get_example_spool("random_das") + >>> df = spool.get_contents() + """ # --- optional methods @@ -136,10 +176,19 @@ def sort(self, attribute) -> Self: Sort the Spool based on a specific attribute. Parameters - --------------- + ---------- attribute The attribute or coordinate used for sorting. If a coordinate name is used, the sorting will be based on the minimum value. + + Examples + -------- + >>> import dascore as dc + >>> spool = dc.get_example_spool() + >>> # sort spool based on values in time coordinate. + >>> spool_time_sorted = spool.sort("time") + >>> # sort spool based on values in tag + >>> spool_tag_sorted = spool.sort("tag") """ msg = f"spool of type {self.__class__} has no sort implementation" raise NotImplementedError(msg) @@ -161,12 +210,23 @@ def split( The number of spools to include. If count is greater than the length of the spool then the output will be smaller than count, with one patch per spool. + + Examples + -------- + >>> import dascore as dc + >>> spool = dc.get_example_spool("diverse_das") + >>> # split spool into list of spools each with 3 patches. + >>> split = spool.split(size=3) + >>> # split spool into 3 evenly sized (if possible) spools + >>> split = spool.split(count=3) """ msg = f"spool of type {self.__class__} has no split implementation" raise NotImplementedError(msg) def update(self) -> Self: - """Updates the contents of the spool and returns a spool.""" + """ + Updates the contents of the spool and returns a spool. + """ return self def map( @@ -205,18 +265,18 @@ def map( Examples -------- - >>> import numpy as np - >>> import dascore as dc - >>> - >>> spool = dc.get_example_spool("random_das") - >>> - >>> # Calculate the std for each channel in 5 second chunks - >>> results_list = list( - ... spool.chunk(time=5) - ... .map(lambda x: np.std(x.data, axis=0)) - ... ) - >>> # stack back into array. dims are (distance, time chunk) - >>> out = np.stack(results_list, axis=-1) + import numpy as np + import dascore as dc + + spool = dc.get_example_spool("random_das") + + # Calculate the std for each channel in 5 second chunks + results = ( + spool.chunk(time=5) + .map(lambda x: np.std(x.data, axis=0)) + ) + # stack back into array. dims are (distance, time chunk) + out = np.stack(results, axis=-1) """ return _spool_map( self, @@ -390,8 +450,9 @@ def new_from_df(self, df, source_df=None, instruction_df=None, select_kwargs=Non new._select_kwargs.update(select_kwargs or {}) return new + @compose_docstring(doc=BaseSpool.select.__doc__) def select(self, **kwargs) -> Self: - """Sub-select certain dimensions for Spool.""" + """{doc}""" _, _, extra_kwargs = split_df_query(kwargs, self._df, ignore_bad_kwargs=True) filtered_df = adjust_segments(self._df, ignore_bad_kwargs=True, **kwargs) inst = adjust_segments( @@ -504,7 +565,7 @@ def spool(obj: Path | str | BaseSpool | Sequence[PatchType], **kwargs) -> BaseSp @spool.register(str) @spool.register(Path) -def spool_from_str(path, **kwargs): +def _spool_from_str(path, **kwargs): """Get a spool from a path.""" path = Path(path) # A directory was passed, create Directory Spool @@ -533,19 +594,19 @@ def spool_from_str(path, **kwargs): @spool.register(BaseSpool) -def spool_from_spool(spool, **kwargs): +def _spool_from_spool(spool, **kwargs): """Return a spool from a spool.""" return spool @spool.register(list) @spool.register(tuple) -def spool_from_patch_list(patch_list, **kwargs): +def _spool_from_patch_list(patch_list, **kwargs): """Return a spool from a sequence of patches.""" return MemorySpool(patch_list) @spool.register(dc.Patch) -def spool_from_patch(patch): +def _spool_from_patch(patch): """Get a spool from a single patch.""" return MemorySpool([patch]) diff --git a/dascore/proc/basic.py b/dascore/proc/basic.py index 513daeb6..938fa635 100644 --- a/dascore/proc/basic.py +++ b/dascore/proc/basic.py @@ -248,7 +248,7 @@ def angle(patch: PatchType) -> PatchType: return patch.new(data=np.angle(patch.data)) -@patch_function() +@patch_function(history=None) def transpose(self: PatchType, *dims: str) -> PatchType: """ Transpose the data array to any dimension order desired. diff --git a/dascore/utils/chunk.py b/dascore/utils/chunk.py index 239c6ff0..80be0337 100644 --- a/dascore/utils/chunk.py +++ b/dascore/utils/chunk.py @@ -218,7 +218,7 @@ def _create_df(self, df, name, start_stop, gnum): out = pd.DataFrame(start_stop, columns=list(cols)) out[f"{name}_step"] = get_middle_value(df[f"{name}_step"].values) merger = df.drop(columns=out.columns) - for col in set(merger.columns) & set(self._group_columns): + for col in set(merger.columns): vals = merger[col].unique() if len(vals) > 1: msg = ( diff --git a/dascore/utils/misc.py b/dascore/utils/misc.py index c8663b8b..441fdd69 100644 --- a/dascore/utils/misc.py +++ b/dascore/utils/misc.py @@ -561,7 +561,7 @@ def __init__(self, func, kwargs, progress=True): self._progress = progress def __call__(self, spool): - desc = f"Applying {self._func.__name__} to {spool}" + desc = f"Applying {self._func.__name__} to spool" iterable = track(spool, desc) if self._progress else spool return [self._func(x, **self._kwargs) for x in iterable] @@ -584,7 +584,7 @@ def _spool_map(spool, func, size=None, client=None, progress=True, **kwargs): Keywords passed to func. """ # no client; simple for loop. - desc = f"Applying {func.__name__} to {spool}" + desc = f"Applying {func.__name__} to spool" if client is None: iterable = track(spool, desc) if progress else spool return [func(patch, **kwargs) for patch in iterable] diff --git a/docs/tutorial/spool.qmd b/docs/tutorial/spool.qmd index 3d05cc3d..c09d3676 100644 --- a/docs/tutorial/spool.qmd +++ b/docs/tutorial/spool.qmd @@ -97,7 +97,7 @@ new_spool = spool[1:] # get_contents -Returns a dataframe listing contents. This method may not be supported on all spools, especially those interfacing with remote resources. +The [`get_contents`](`dascore.core.spool.BaseSpool.get_contents`) method returns a dataframe listing the spool contents. This method may not be supported on all spools, especially those interfacing with large remote resources. ```{python} #| output: false @@ -118,8 +118,7 @@ display(contents.drop(columns=['patch'])) # select -Selects a subset of a spool and returns a new spool. `get_contents` will now -reflect a subset of the original data requested by the select operation. +The [select](`dascore.core.spool.BaseSpool.select`) method selects a subset of a spool and returns a new spool. [`get_contents`](`dascore.core.spool.BaseSpool.get_contents`) will now reflect a subset of the original data requested by the select operation. ```{python} import dascore as dc @@ -129,8 +128,7 @@ spool = dc.get_example_spool() subspool = spool.select(time=('2020-01-03T00:00:09', None)) ``` -In addition to trimming the data along a specified dimension (as shown above), -select can be used to filter patches that meet a specified criteria. +In addition to trimming the data along a specified dimension (as shown above), select can be used to filter patches that meet a specified criteria. ```{python} @@ -167,13 +165,20 @@ The [`map`](`dascore.core.spool.BaseSpool.map`) method applies a function to all For example, imagine we want to calculate the maximum value for each channel (distance) for 4 second increments with 1 second overlap. -```{.python} +```{python} import dascore as dc spool = dc.get_example_spool() +# define function for mapping to each patch def get_dist_max(patch): """Function which will be mapped to each patch in spool.""" return patch.aggregate("time", "max") -out = spool.chunk(time=5, overlap=1).map(get_dist_max) +# chunk and apply function +map_out = spool.chunk(time=5, overlap=1).map(get_dist_max) + +# combine output back into a single patch +agg_patch = dc.spool(map_out).chunk(time=None)[0] + +print(agg_patch) ``` diff --git a/tests/test_core/test_spool.py b/tests/test_core/test_spool.py index 6ed1e762..54e21d4c 100644 --- a/tests/test_core/test_spool.py +++ b/tests/test_core/test_spool.py @@ -395,6 +395,7 @@ def get_dist_max(patch): out = list(random_spool.chunk(time=5, overlap=1).map(get_dist_max)) new_spool = dc.spool(out) merged = new_spool.chunk(time=None) + assert merged assert isinstance(merged[0], dc.Patch)