diff --git a/tempora/__init__.py b/tempora/__init__.py index 04a8b17..d318688 100644 --- a/tempora/__init__.py +++ b/tempora/__init__.py @@ -1,18 +1,21 @@ "Objects and routines pertaining to date and time (tempora)" +from __future__ import annotations + +import contextlib import datetime -import time -import re -import numbers import functools -import contextlib -from numbers import Number -from typing import Union, Tuple, Iterable -from typing import cast +import numbers +import re +import time +from collections.abc import Iterable, Iterator, Sequence +from typing import TYPE_CHECKING, Tuple, Union, cast import dateutil.parser import dateutil.tz +if TYPE_CHECKING: + from typing_extensions import TypeAlias # some useful constants osc_per_year = 290_091_329_207_984_000 @@ -45,8 +48,8 @@ def _needs_year_help() -> bool: return len(datetime.date(900, 1, 1).strftime('%Y')) != 4 -AnyDatetime = Union[datetime.datetime, datetime.date, datetime.time] -StructDatetime = Union[Tuple[int, ...], time.struct_time] +AnyDatetime: TypeAlias = Union[datetime.datetime, datetime.date, datetime.time] +StructDatetime: TypeAlias = Union[Tuple[int, ...], time.struct_time] def ensure_datetime(ob: AnyDatetime) -> datetime.datetime: @@ -65,13 +68,14 @@ def ensure_datetime(ob: AnyDatetime) -> datetime.datetime: return datetime.datetime.combine(date, time) -def infer_datetime(ob: Union[AnyDatetime, StructDatetime]) -> datetime.datetime: +def infer_datetime(ob: AnyDatetime | StructDatetime) -> datetime.datetime: if isinstance(ob, (time.struct_time, tuple)): + # '"int" is not assignable to "tzinfo"', but we don't pass that many parameters ob = datetime.datetime(*ob[:6]) # type: ignore[arg-type] return ensure_datetime(ob) -def strftime(fmt: str, t: Union[AnyDatetime, tuple, time.struct_time]) -> str: +def strftime(fmt: str, t: AnyDatetime | tuple | time.struct_time) -> str: """ Portable strftime. @@ -146,7 +150,7 @@ def doSubs(s): return t.strftime(fmt) -def datetime_mod(dt, period, start=None): +def datetime_mod(dt: datetime.datetime, period, start=None) -> datetime.datetime: """ Find the time which is the specified date/time truncated to the time delta relative to the start date/time. @@ -190,7 +194,7 @@ def get_time_delta_microseconds(td): return result -def datetime_round(dt, period, start=None): +def datetime_round(dt, period: datetime.timedelta, start=None) -> datetime.datetime: """ Find the nearest even period for the specified date/time. @@ -210,7 +214,7 @@ def datetime_round(dt, period, start=None): return result -def get_nearest_year_for_day(day): +def get_nearest_year_for_day(day) -> int: """ Returns the nearest year to now inferred from a Julian date. @@ -235,7 +239,7 @@ def get_nearest_year_for_day(day): return result -def gregorian_date(year, julian_day): +def gregorian_date(year, julian_day) -> datetime.date: """ Gregorian Date is defined as a year and a julian day (1-based index into the days of the year). @@ -248,7 +252,7 @@ def gregorian_date(year, julian_day): return result -def get_period_seconds(period): +def get_period_seconds(period) -> int: """ return the number of seconds in the specified period @@ -279,7 +283,7 @@ def get_period_seconds(period): return result -def get_date_format_string(period): +def get_date_format_string(period) -> str: """ For a given period (e.g. 'month', 'day', or some numeric interval such as 3600 (in secs)), return the format string that can be @@ -307,7 +311,7 @@ def get_date_format_string(period): if isinstance(period, str) and period.lower() == 'month': return '%Y-%m' file_period_secs = get_period_seconds(period) - format_pieces = ('%Y', '-%m-%d', ' %H', '-%M', '-%S') + format_pieces: Sequence[str] = ('%Y', '-%m-%d', ' %H', '-%M', '-%S') seconds_per_second = 1 intervals = ( seconds_per_year, @@ -321,7 +325,7 @@ def get_date_format_string(period): return ''.join(format_pieces) -def calculate_prorated_values(): +def calculate_prorated_values() -> None: """ >>> monkeypatch = getfixture('monkeypatch') >>> import builtins @@ -338,7 +342,7 @@ def calculate_prorated_values(): print(f"per {period}: {value}") -def _prorated_values(rate: str) -> Iterable[Tuple[str, Number]]: +def _prorated_values(rate: str) -> Iterator[tuple[str, float]]: """ Given a rate (a string in units per unit time), and return that same rate for various time periods. @@ -361,7 +365,7 @@ def _prorated_values(rate: str) -> Iterable[Tuple[str, Number]]: yield period, period_value -def parse_timedelta(str): +def parse_timedelta(str) -> datetime.timedelta: """ Take a string representing a span of time and parse it to a time delta. Accepts any string of comma-separated numbers each with a unit indicator. @@ -455,19 +459,19 @@ def parse_timedelta(str): return _parse_timedelta_nanos(str).resolve() -def _parse_timedelta_nanos(str): +def _parse_timedelta_nanos(str) -> _Saved_NS: parts = re.finditer(r'(?P[\d.:]+)\s?(?P[^\W\d_]+)?', str) chk_parts = _check_unmatched(parts, str) deltas = map(_parse_timedelta_part, chk_parts) return sum(deltas, _Saved_NS()) -def _check_unmatched(matches, text): +def _check_unmatched(matches: Iterable[re.Match[str]], text) -> Iterator[re.Match[str]]: """ Ensure no words appear in unmatched text. """ - def check_unmatched(unmatched): + def check_unmatched(unmatched) -> None: found = re.search(r'\w+', unmatched) if found: raise ValueError(f"Unexpected {found.group(0)!r}") @@ -504,14 +508,14 @@ def check_unmatched(unmatched): } -def _resolve_unit(raw_match): +def _resolve_unit(raw_match) -> str: if raw_match is None: return 'second' text = raw_match.lower() return _unit_lookup.get(text, text) -def _parse_timedelta_composite(raw_value, unit): +def _parse_timedelta_composite(raw_value, unit) -> _Saved_NS: if unit != 'seconds': raise ValueError("Cannot specify units with composite delta") values = raw_value.split(':') @@ -520,7 +524,7 @@ def _parse_timedelta_composite(raw_value, unit): return _parse_timedelta_nanos(composed) -def _parse_timedelta_part(match): +def _parse_timedelta_part(match) -> _Saved_NS: unit = _resolve_unit(match.group('unit')) if not unit.endswith('s'): unit += 's' @@ -553,11 +557,11 @@ class _Saved_NS: microseconds=1000, ) - def __init__(self, **kwargs): + def __init__(self, **kwargs) -> None: vars(self).update(kwargs) @classmethod - def derive(cls, unit, value): + def derive(cls, unit, value) -> _Saved_NS: if unit == 'nanoseconds': return _Saved_NS(nanoseconds=value) @@ -588,7 +592,7 @@ def __repr__(self): return f'_Saved_NS(td={self.td!r}, nanoseconds={self.nanoseconds!r})' -def date_range(start=None, stop=None, step=None): +def date_range(start=None, stop=None, step=None) -> Iterator[datetime.datetime]: """ Much like the built-in function range, but works with dates @@ -643,7 +647,7 @@ def date_range(start=None, stop=None, step=None): ) -def parse(*args, **kwargs): +def parse(*args, **kwargs) -> datetime.datetime: """ Parse the input using dateutil.parser.parse with friendly tz support. diff --git a/tempora/schedule.py b/tempora/schedule.py index 7f06fdc..9a726d6 100644 --- a/tempora/schedule.py +++ b/tempora/schedule.py @@ -23,12 +23,19 @@ datetime.datetime(...utc) """ -import datetime -import numbers +from __future__ import annotations + import abc import bisect +import datetime +import numbers +from typing import TYPE_CHECKING, Any + +from .utc import fromtimestamp as from_timestamp +from .utc import now -from .utc import now, fromtimestamp as from_timestamp +if TYPE_CHECKING: + from typing_extensions import Self class DelayedCommand(datetime.datetime): @@ -36,8 +43,11 @@ class DelayedCommand(datetime.datetime): A command to be executed after some delay (seconds or timedelta). """ + delay: datetime.timedelta = datetime.timedelta() + target: Any # Expected type depends on the scheduler used + @classmethod - def from_datetime(cls, other): + def from_datetime(cls, other) -> Self: return cls( other.year, other.month, @@ -50,7 +60,7 @@ def from_datetime(cls, other): ) @classmethod - def after(cls, delay, target): + def after(cls, delay, target) -> Self: if not isinstance(delay, datetime.timedelta): delay = datetime.timedelta(seconds=delay) due_time = now() + delay @@ -71,7 +81,7 @@ def _from_timestamp(input): return from_timestamp(input) @classmethod - def at_time(cls, at, target): + def at_time(cls, at, target) -> Self: """ Construct a DelayedCommand to come due at `at`, where `at` may be a datetime or timestamp. @@ -82,7 +92,7 @@ def at_time(cls, at, target): cmd.target = target return cmd - def due(self): + def due(self) -> bool: return now() >= self @@ -92,19 +102,19 @@ class PeriodicCommand(DelayedCommand): seconds. """ - def _next_time(self): + def _next_time(self) -> Self: """ Add delay to self, localized """ return self + self.delay - def next(self): + def next(self) -> Self: cmd = self.__class__.from_datetime(self._next_time()) cmd.delay = self.delay cmd.target = self.target return cmd - def __setattr__(self, key, value): + def __setattr__(self, key, value) -> None: if key == 'delay' and not value > datetime.timedelta(): raise ValueError("A PeriodicCommand must have a positive, non-zero delay.") super().__setattr__(key, value) @@ -118,7 +128,7 @@ class PeriodicCommandFixedDelay(PeriodicCommand): """ @classmethod - def at_time(cls, at, delay, target): + def at_time(cls, at, delay, target) -> Self: # type: ignore[override] # jaraco/tempora#39 """ >>> cmd = PeriodicCommandFixedDelay.at_time(0, 30, None) >>> cmd.delay.total_seconds() @@ -127,13 +137,13 @@ def at_time(cls, at, delay, target): at = cls._from_timestamp(at) cmd = cls.from_datetime(at) if isinstance(delay, numbers.Number): - delay = datetime.timedelta(seconds=delay) + delay = datetime.timedelta(seconds=delay) # type: ignore[arg-type] # python/mypy#3186#issuecomment-1571512649 cmd.delay = delay cmd.target = target return cmd @classmethod - def daily_at(cls, at, target): + def daily_at(cls, at, target) -> Self: """ Schedule a command to run at a specific time each day. @@ -158,14 +168,13 @@ class Scheduler: and dispatching them on schedule. """ - def __init__(self): - self.queue = [] + def __init__(self) -> None: + self.queue: list[DelayedCommand] = [] - def add(self, command): - assert isinstance(command, DelayedCommand) + def add(self, command: DelayedCommand) -> None: bisect.insort(self.queue, command) - def run_pending(self): + def run_pending(self) -> None: while self.queue: command = self.queue[0] if not command.due(): @@ -176,7 +185,7 @@ def run_pending(self): del self.queue[0] @abc.abstractmethod - def run(self, command): + def run(self, command: DelayedCommand) -> None: """ Run the command """ @@ -187,7 +196,7 @@ class InvokeScheduler(Scheduler): Command targets are functions to be invoked on schedule. """ - def run(self, command): + def run(self, command: DelayedCommand) -> None: command.target() @@ -196,9 +205,9 @@ class CallbackScheduler(Scheduler): Command targets are passed to a dispatch callable on schedule. """ - def __init__(self, dispatch): + def __init__(self, dispatch) -> None: super().__init__() self.dispatch = dispatch - def run(self, command): + def run(self, command: DelayedCommand) -> None: self.dispatch(command.target) diff --git a/tempora/timing.py b/tempora/timing.py index e74b896..45666c4 100644 --- a/tempora/timing.py +++ b/tempora/timing.py @@ -1,12 +1,19 @@ +from __future__ import annotations + import collections.abc import contextlib import datetime import functools import numbers import time +from types import TracebackType +from typing import TYPE_CHECKING import jaraco.functools +if TYPE_CHECKING: + from typing_extensions import Self + class Stopwatch: """ @@ -40,35 +47,40 @@ class Stopwatch: 0 """ - def __init__(self): + def __init__(self) -> None: self.reset() self.start() - def reset(self): + def reset(self) -> None: self.elapsed = datetime.timedelta(0) with contextlib.suppress(AttributeError): del self._start - def _diff(self): + def _diff(self) -> datetime.timedelta: return datetime.timedelta(seconds=time.monotonic() - self._start) - def start(self): + def start(self) -> None: self._start = time.monotonic() - def stop(self): + def stop(self) -> datetime.timedelta: self.elapsed += self._diff() del self._start return self.elapsed - def split(self): + def split(self) -> datetime.timedelta: return self.elapsed + self._diff() # context manager support - def __enter__(self): + def __enter__(self) -> Self: self.start() return self - def __exit__(self, exc_type, exc_value, traceback): + def __exit__( + self, + exc_type: type[BaseException] | None, + exc_value: BaseException | None, + traceback: TracebackType | None, + ) -> None: self.stop() @@ -82,9 +94,9 @@ class IntervalGovernor: 30.0 """ - def __init__(self, min_interval): + def __init__(self, min_interval) -> None: if isinstance(min_interval, numbers.Number): - min_interval = datetime.timedelta(seconds=min_interval) + min_interval = datetime.timedelta(seconds=min_interval) # type: ignore[arg-type] # python/mypy#3186#issuecomment-1571512649 self.min_interval = min_interval self.last_call = None @@ -113,12 +125,12 @@ class Timer(Stopwatch): True """ - def __init__(self, target=float('Inf')): + def __init__(self, target=float('Inf')) -> None: self.target = self._accept(target) super().__init__() @staticmethod - def _accept(target): + def _accept(target: float) -> float: """ Accept None or ∞ or datetime or numeric for target @@ -136,7 +148,7 @@ def _accept(target): return target - def expired(self): + def expired(self) -> bool: return self.split().total_seconds() > self.target @@ -220,45 +232,56 @@ class BackoffDelay(collections.abc.Iterator): True """ - delay = 0 - factor = 1 "Multiplier applied to delay" - jitter = 0 - "Number or callable returning extra seconds to add to delay" + jitter: collections.abc.Callable[[], float] + "Callable returning extra seconds to add to delay" @jaraco.functools.save_method_args - def __init__(self, delay=0, factor=1, limit=float('inf'), jitter=0): + def __init__( + self, + delay: float = 0, + factor=1, + limit: collections.abc.Callable[[float], float] | float = float('inf'), + jitter: collections.abc.Callable[[], float] | float = 0, + ) -> None: self.delay = delay self.factor = factor if isinstance(limit, numbers.Number): limit_ = limit - def limit(n): + def limit_func(n: float, /) -> float: return max(0, min(limit_, n)) - self.limit = limit + else: + # python/mypy#16946 or # python/mypy#13914 + limit_func: collections.abc.Callable[[float], float] = limit # type: ignore[no-redef] + self.limit = limit_func if isinstance(jitter, numbers.Number): jitter_ = jitter - def jitter(): + def jitter_func() -> float: return jitter_ - self.jitter = jitter + else: + # python/mypy#16946 or # python/mypy#13914 + jitter_func: collections.abc.Callable[[], float] = jitter # type: ignore[no-redef] + + self.jitter = jitter_func - def __call__(self): + def __call__(self) -> None: time.sleep(next(self)) - def __next__(self): + def __next__(self) -> int | float: delay = self.delay self.bump() return delay - def __iter__(self): + def __iter__(self) -> Self: return self - def bump(self): + def bump(self) -> None: self.delay = self.limit(self.delay * self.factor + self.jitter()) def reset(self): diff --git a/tempora/utc.py b/tempora/utc.py index 08c5f1f..8d5473e 100644 --- a/tempora/utc.py +++ b/tempora/utc.py @@ -33,11 +33,10 @@ import datetime as std import functools - __all__ = ['now', 'fromtimestamp', 'datetime', 'time'] -def now(): +def now() -> std.datetime: return std.datetime.now(std.timezone.utc)