Skip to content

Commit

Permalink
Merge pull request sinzlab#52 from christoph-blessing/unit_of_work
Browse files Browse the repository at this point in the history
Add unit of work
  • Loading branch information
christoph-blessing authored Oct 24, 2023
2 parents 705da2d + 508f348 commit ba9e0bb
Show file tree
Hide file tree
Showing 11 changed files with 666 additions and 285 deletions.
87 changes: 50 additions & 37 deletions link/domain/link.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import Any, FrozenSet, Iterable, Mapping, Optional, TypeVar
from typing import Any, Iterable, Iterator, Mapping, Optional, Set, Tuple, TypeVar

from .custom_types import Identifier
from .state import (
Expand Down Expand Up @@ -68,8 +68,9 @@ def create_entity(identifier: Identifier) -> Entity:
return Entity(
identifier,
state=state,
current_process=processes_map.get(identifier),
current_process=processes_map.get(identifier, Processes.NONE),
is_tainted=is_tainted(identifier),
operation_results=tuple(),
)

return {create_entity(identifier) for identifier in assignments[Components.SOURCE]}
Expand All @@ -90,14 +91,60 @@ def assign_to_component(component: Components) -> set[Entity]:
return Link(entity_assignments[Components.SOURCE])


class Link(FrozenSet[Entity]):
class Link(Set[Entity]):
"""The state of a link between two databases."""

def __init__(
self, entities: Iterable[Entity], operation_results: Tuple[LinkOperationResult, ...] = tuple()
) -> None:
"""Initialize the link."""
self._entities = set(entities)
self._operation_results = operation_results

@property
def identifiers(self) -> frozenset[Identifier]:
"""Return the identifiers of all entities in the link."""
return frozenset(entity.identifier for entity in self)

@property
def operation_results(self) -> Tuple[LinkOperationResult, ...]:
"""Return the results of operations performed on this link."""
return self._operation_results

def apply(self, operation: Operations, *, requested: Iterable[Identifier]) -> Link:
"""Apply an operation to the requested entities."""

def create_operation_result(results: Iterable[EntityOperationResult]) -> LinkOperationResult:
"""Create the result of an operation on a link from results of individual entities."""
results = set(results)
operation = next(iter(results)).operation
return LinkOperationResult(
operation,
updates=frozenset(result for result in results if isinstance(result, Update)),
errors=frozenset(result for result in results if isinstance(result, InvalidOperation)),
)

assert requested, "No identifiers requested."
assert set(requested) <= self.identifiers, "Requested identifiers not present in link."
changed = {entity.apply(operation) for entity in self if entity.identifier in requested}
unchanged = {entity for entity in self if entity.identifier not in requested}
operation_results = self.operation_results + (
create_operation_result(entity.operation_results[-1] for entity in changed),
)
return Link(changed | unchanged, operation_results)

def __contains__(self, entity: object) -> bool:
"""Check if the link contains the given entity."""
return entity in self._entities

def __iter__(self) -> Iterator[Entity]:
"""Iterate over all entities in the link."""
return iter(self._entities)

def __len__(self) -> int:
"""Return the number of entities in the link."""
return len(self._entities)


@dataclass(frozen=True)
class LinkOperationResult:
Expand All @@ -112,37 +159,3 @@ def __post_init__(self) -> None:
assert all(
result.operation is self.operation for result in (self.updates | self.errors)
), "Not all results have same operation."


def create_link_operation_result(results: Iterable[EntityOperationResult]) -> LinkOperationResult:
"""Create the result of an operation on a link from results of individual entities."""
results = set(results)
operation = next(iter(results)).operation
return LinkOperationResult(
operation,
updates=frozenset(result for result in results if isinstance(result, Update)),
errors=frozenset(result for result in results if isinstance(result, InvalidOperation)),
)


def process(link: Link, *, requested: Iterable[Identifier]) -> LinkOperationResult:
"""Process all entities in the link producing appropriate updates."""
_validate_requested(link, requested)
return create_link_operation_result(entity.process() for entity in link if entity.identifier in requested)


def _validate_requested(link: Link, requested: Iterable[Identifier]) -> None:
assert requested, "No identifiers requested."
assert set(requested) <= link.identifiers, "Requested identifiers not present in link."


def start_pull(link: Link, *, requested: Iterable[Identifier]) -> LinkOperationResult:
"""Start the pull process on the requested entities."""
_validate_requested(link, requested)
return create_link_operation_result(entity.start_pull() for entity in link if entity.identifier in requested)


def start_delete(link: Link, *, requested: Iterable[Identifier]) -> LinkOperationResult:
"""Start the delete process on the requested entities."""
_validate_requested(link, requested)
return create_link_operation_result(entity.start_delete() for entity in link if entity.identifier in requested)
101 changes: 57 additions & 44 deletions link/domain/state.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
"""Contains everything state related."""
from __future__ import annotations

from dataclasses import dataclass
from dataclasses import dataclass, replace
from enum import Enum, auto
from typing import Optional, Union
from functools import partial
from typing import Union

from .custom_types import Identifier

Expand All @@ -12,35 +13,36 @@ class State:
"""An entity's state."""

@classmethod
def start_pull(cls, entity: Entity) -> EntityOperationResult:
def start_pull(cls, entity: Entity) -> Entity:
"""Return the command needed to start the pull process for the entity."""
return cls._create_invalid_operation_result(Operations.START_PULL, entity.identifier)
return cls._create_invalid_operation(entity, Operations.START_PULL)

@classmethod
def start_delete(cls, entity: Entity) -> EntityOperationResult:
def start_delete(cls, entity: Entity) -> Entity:
"""Return the commands needed to start the delete process for the entity."""
return cls._create_invalid_operation_result(Operations.START_DELETE, entity.identifier)
return cls._create_invalid_operation(entity, Operations.START_DELETE)

@classmethod
def process(cls, entity: Entity) -> EntityOperationResult:
def process(cls, entity: Entity) -> Entity:
"""Return the commands needed to process the entity."""
return cls._create_invalid_operation_result(Operations.PROCESS, entity.identifier)
return cls._create_invalid_operation(entity, Operations.PROCESS)

@classmethod
def _create_invalid_operation_result(cls, operation: Operations, identifier: Identifier) -> EntityOperationResult:
return InvalidOperation(operation, identifier, cls)
@staticmethod
def _create_invalid_operation(entity: Entity, operation: Operations) -> Entity:
updated = entity.operation_results + (InvalidOperation(operation, entity.identifier, entity.state),)
return replace(entity, operation_results=updated)

@classmethod
def _create_valid_operation_result(
cls, operation: Operations, identifier: Identifier, new_state: type[State]
) -> EntityOperationResult:
def _transition_entity(
cls, entity: Entity, operation: Operations, new_state: type[State], *, new_process: Processes | None = None
) -> Entity:
if new_process is None:
new_process = entity.current_process
transition = Transition(cls, new_state)
return Update(
operation,
identifier,
transition,
command=TRANSITION_MAP[transition],
updated_results = entity.operation_results + (
Update(operation, entity.identifier, transition, TRANSITION_MAP[transition]),
)
return replace(entity, state=transition.new, current_process=new_process, operation_results=updated_results)


class States:
Expand All @@ -66,9 +68,9 @@ class Idle(State):
"""The default state of an entity."""

@classmethod
def start_pull(cls, entity: Entity) -> EntityOperationResult:
def start_pull(cls, entity: Entity) -> Entity:
"""Return the command needed to start the pull process for an entity."""
return cls._create_valid_operation_result(Operations.START_PULL, entity.identifier, Activated)
return cls._transition_entity(entity, Operations.START_PULL, Activated, new_process=Processes.PULL)


states.register(Idle)
Expand All @@ -78,16 +80,16 @@ class Activated(State):
"""The state of an activated entity."""

@classmethod
def process(cls, entity: Entity) -> EntityOperationResult:
def process(cls, entity: Entity) -> Entity:
"""Return the commands needed to process an activated entity."""
new_state: type[State]
transition_entity = partial(cls._transition_entity, entity, Operations.PROCESS)
if entity.is_tainted:
new_state = Deprecated
return transition_entity(Deprecated, new_process=Processes.NONE)
elif entity.current_process is Processes.PULL:
new_state = Received
return transition_entity(Received)
elif entity.current_process is Processes.DELETE:
new_state = Idle
return cls._create_valid_operation_result(Operations.PROCESS, entity.identifier, new_state)
return transition_entity(Idle, new_process=Processes.NONE)
raise RuntimeError


states.register(Activated)
Expand All @@ -97,17 +99,17 @@ class Received(State):
"""The state of an received entity."""

@classmethod
def process(cls, entity: Entity) -> EntityOperationResult:
def process(cls, entity: Entity) -> Entity:
"""Return the commands needed to process a received entity."""
new_state: type[State]
transition_entity = partial(cls._transition_entity, entity, Operations.PROCESS)
if entity.current_process is Processes.PULL:
if entity.is_tainted:
new_state = Tainted
return transition_entity(Tainted, new_process=Processes.NONE)
else:
new_state = Pulled
return transition_entity(Pulled, new_process=Processes.NONE)
elif entity.current_process is Processes.DELETE:
new_state = Activated
return cls._create_valid_operation_result(Operations.PROCESS, entity.identifier, new_state)
return transition_entity(Activated)
raise RuntimeError


states.register(Received)
Expand All @@ -117,9 +119,9 @@ class Pulled(State):
"""The state of an entity that has been copied to the local side."""

@classmethod
def start_delete(cls, entity: Entity) -> EntityOperationResult:
def start_delete(cls, entity: Entity) -> Entity:
"""Return the commands needed to start the delete process for the entity."""
return cls._create_valid_operation_result(Operations.START_DELETE, entity.identifier, Received)
return cls._transition_entity(entity, Operations.START_DELETE, Received, new_process=Processes.DELETE)


states.register(Pulled)
Expand All @@ -129,9 +131,9 @@ class Tainted(State):
"""The state of an entity that has been flagged as faulty by the source side."""

@classmethod
def start_delete(cls, entity: Entity) -> EntityOperationResult:
def start_delete(cls, entity: Entity) -> Entity:
"""Return the commands needed to start the delete process for the entity."""
return cls._create_valid_operation_result(Operations.START_DELETE, entity.identifier, Received)
return cls._transition_entity(entity, Operations.START_DELETE, Received, new_process=Processes.DELETE)


states.register(Tainted)
Expand Down Expand Up @@ -214,8 +216,9 @@ class InvalidOperation:
class Processes(Enum):
"""Names for processes that pull/delete entities into/from the local side."""

PULL = 1
DELETE = 2
NONE = auto()
PULL = auto()
DELETE = auto()


class Components(Enum):
Expand Down Expand Up @@ -285,17 +288,27 @@ class Entity:

identifier: Identifier
state: type[State]
current_process: Optional[Processes]
current_process: Processes
is_tainted: bool

def start_pull(self) -> EntityOperationResult:
operation_results: tuple[EntityOperationResult, ...]

def apply(self, operation: Operations) -> Entity:
"""Apply an operation to the entity."""
if operation is Operations.START_PULL:
return self._start_pull()
if operation is Operations.START_DELETE:
return self._start_delete()
if operation is Operations.PROCESS:
return self._process()

def _start_pull(self) -> Entity:
"""Start the pull process for the entity."""
return self.state.start_pull(self)

def start_delete(self) -> EntityOperationResult:
def _start_delete(self) -> Entity:
"""Start the delete process for the entity."""
return self.state.start_delete(self)

def process(self) -> EntityOperationResult:
def _process(self) -> Entity:
"""Process the entity."""
return self.state.process(self)
16 changes: 7 additions & 9 deletions link/infrastructure/link.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
start_delete_process,
start_pull_process,
)
from link.service.uow import UnitOfWork

from . import DJConfiguration, create_tables
from .facade import DJLinkFacade
Expand Down Expand Up @@ -54,17 +55,16 @@ def inner(obj: type) -> Any:
)
facade = DJLinkFacade(tables.source, tables.outbound, tables.local)
gateway = DJLinkGateway(facade, translator)
uow = UnitOfWork(gateway)
source_restriction: IterationCallbackList[PrimaryKey] = IterationCallbackList()
idle_entities_updater = create_idle_entities_updater(translator, create_content_replacer(source_restriction))
operation_presenter = create_operation_response_presenter(translator, create_operation_logger())
process_service = partial(
make_responsive(partial(process, link_gateway=gateway)), output_port=operation_presenter
)
process_service = partial(make_responsive(partial(process, uow=uow)), output_port=operation_presenter)
start_pull_process_service = partial(
make_responsive(partial(start_pull_process, link_gateway=gateway)), output_port=operation_presenter
make_responsive(partial(start_pull_process, uow=uow)), output_port=operation_presenter
)
start_delete_process_service = partial(
make_responsive(partial(start_delete_process, link_gateway=gateway)), output_port=operation_presenter
make_responsive(partial(start_delete_process, uow=uow)), output_port=operation_presenter
)
process_to_completion_service = partial(
make_responsive(partial(process_to_completion, process_service=process_service)), output_port=lambda x: None
Expand All @@ -82,10 +82,8 @@ def inner(obj: type) -> Any:
start_delete_process_service=start_delete_process_service,
output_port=lambda x: None,
),
Services.PROCESS: partial(process, link_gateway=gateway, output_port=operation_presenter),
Services.LIST_IDLE_ENTITIES: partial(
list_idle_entities, link_gateway=gateway, output_port=idle_entities_updater
),
Services.PROCESS: partial(process, uow=uow, output_port=operation_presenter),
Services.LIST_IDLE_ENTITIES: partial(list_idle_entities, uow=uow, output_port=idle_entities_updater),
}
controller = DJController(handlers, translator)
source_restriction.callback = controller.list_idle_entities
Expand Down
Loading

0 comments on commit ba9e0bb

Please sign in to comment.