Skip to content

Commit

Permalink
Add dynamic connector dispatcher
Browse files Browse the repository at this point in the history
also add mock dispatcher for FTP to illustrate the concept
  • Loading branch information
Alex Richey authored and Alex Richey committed Jan 15, 2025
1 parent 31724e0 commit 7f2538d
Show file tree
Hide file tree
Showing 9 changed files with 287 additions and 1 deletion.
6 changes: 6 additions & 0 deletions dcpy/connectors/ftp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
class FTPConnector:
def push(self, dest_path: str, ftp_profile: str):
raise Exception("Push not implemented for FTP")

Check warning on line 3 in dcpy/connectors/ftp.py

View check run for this annotation

Codecov / codecov/patch

dcpy/connectors/ftp.py#L3

Added line #L3 was not covered by tests

def pull(self, **kwargs):
raise Exception("Pull not implemented for FTP")

Check warning on line 6 in dcpy/connectors/ftp.py

View check run for this annotation

Codecov / codecov/patch

dcpy/connectors/ftp.py#L6

Added line #L6 was not covered by tests
5 changes: 4 additions & 1 deletion dcpy/connectors/socrata/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
from socrata.revisions import Revision as SocrataPyRevision
import time
from typing import TypedDict, Literal, NotRequired, Any
from typing import Unpack

from dcpy.utils.logging import logger

from dcpy.models.lifecycle.distribution import PublisherPushKwargs
import dcpy.models.product.dataset.metadata as md
from .utils import SOCRATA_USER, SOCRATA_PASSWORD, _socrata_request

Expand Down Expand Up @@ -573,10 +575,10 @@ def __init__(self, metadata: md.Metadata, destination_id: str):


def push_dataset(
*,
metadata: md.Metadata,
dataset_destination_id: str,
dataset_package_path: Path,
*,
publish: bool = False,
metadata_only: bool = False,
):
Expand Down Expand Up @@ -680,3 +682,4 @@ def push_dataset(

dataset.discard_open_revisions()
return f"Published {metadata.attributes.display_name} - destination: {dataset_destination_id}"

16 changes: 16 additions & 0 deletions dcpy/lifecycle/distribute/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from dcpy.lifecycle.distribute.connectors import (
DistributionFTPConnector,
SocrataPublishConnector,
)

from dcpy.models.lifecycle.distribution import PublisherPushKwargs
from dcpy.models.connectors import ConnectorDispatcher


# Register all default connectors for `lifecycle.distribute`.
# Third parties can similarly register their own connectors,
# so long as the connector implements a ConnectorDispatcher protocol.
dispatcher = ConnectorDispatcher[PublisherPushKwargs, dict]()

dispatcher.register(conn_type="socrata", connector=SocrataPublishConnector())
dispatcher.register(conn_type="ftp", connector=DistributionFTPConnector())
42 changes: 42 additions & 0 deletions dcpy/lifecycle/distribute/connectors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from typing import Any

from dcpy.connectors.ftp import FTPConnector
from dcpy.connectors.socrata import publish as socrata_pub
from dcpy.models.lifecycle.distribution import PublisherPushKwargs

# Sadly, can't use Unpack on kwarg generics yet.
# https://github.com/python/typing/issues/1399


# Wrap the connectors to bind them to the `PublisherPushKwargs`
# so that we can register and delegate calls.
# This is the recommended way for third parties to add custom Distribution Connectors.
class DistributionFTPConnector:
conn_type: str

def __init__(self):
self.conn_type = "ftp"
self._base_connector = FTPConnector()

def push(self, arg: PublisherPushKwargs) -> Any:
md = arg["metadata"]
dest = md.get_destination(arg["dataset_destination_id"])
dest_path = dest.custom["destination_path"]
user_id = dest.custom["user_id"]
self._base_connector.push(dest_path=dest_path, ftp_profile=user_id)

Check warning on line 26 in dcpy/lifecycle/distribute/connectors.py

View check run for this annotation

Codecov / codecov/patch

dcpy/lifecycle/distribute/connectors.py#L22-L26

Added lines #L22 - L26 were not covered by tests

def pull(self, _: dict) -> Any:
raise Exception("Pull is not defined for any Distribution Connectors.")

Check warning on line 29 in dcpy/lifecycle/distribute/connectors.py

View check run for this annotation

Codecov / codecov/patch

dcpy/lifecycle/distribute/connectors.py#L29

Added line #L29 was not covered by tests


class SocrataPublishConnector:
conn_type = "socrata"

def push(
self,
arg: PublisherPushKwargs,
) -> Any:
return socrata_pub.push_dataset(**arg)

Check warning on line 39 in dcpy/lifecycle/distribute/connectors.py

View check run for this annotation

Codecov / codecov/patch

dcpy/lifecycle/distribute/connectors.py#L39

Added line #L39 was not covered by tests

def pull(self, _: dict):
raise Exception("Pull not implemented for Socrata Connector")

Check warning on line 42 in dcpy/lifecycle/distribute/connectors.py

View check run for this annotation

Codecov / codecov/patch

dcpy/lifecycle/distribute/connectors.py#L42

Added line #L42 was not covered by tests
34 changes: 34 additions & 0 deletions dcpy/models/connectors/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from typing import Protocol, Any, TypeVar, Generic


_O = TypeVar("_O", contravariant=True)
_I = TypeVar("_I", contravariant=True)


class _ConnectorProtocol(Protocol, Generic[_O, _I]):
conn_type: str

def push(self, arg: _O, /) -> Any:
"""push"""

def pull(self, arg: _I, /) -> Any:
"""pull"""


class ConnectorDispatcher(Generic[_O, _I]):
_connectors: dict[str, _ConnectorProtocol[_O, _I]]

def __init__(self):
self._connectors = {}

def register(self, conn_type: str, connector: _ConnectorProtocol[_O, _I]):
print(f"registering {conn_type}")
self._connectors[conn_type] = connector

def push(self, dest_type: str, arg: _O) -> str:
connector: _ConnectorProtocol = self._connectors[dest_type]
return connector.push(arg)

def pull(self, source_type: str, arg: _I) -> str:
connector: _ConnectorProtocol = self._connectors[source_type]
return connector.pull(arg)

Check warning on line 34 in dcpy/models/connectors/__init__.py

View check run for this annotation

Codecov / codecov/patch

dcpy/models/connectors/__init__.py#L33-L34

Added lines #L33 - L34 were not covered by tests
12 changes: 12 additions & 0 deletions dcpy/models/lifecycle/distribution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from typing import TypedDict, NotRequired, Required
from pathlib import Path

import dcpy.models.product.dataset.metadata as ds_md


class PublisherPushKwargs(TypedDict):
metadata: Required[ds_md.Metadata]
dataset_destination_id: Required[str]
publish: NotRequired[bool]
dataset_package_path: NotRequired[Path]
metadata_only: NotRequired[bool]
60 changes: 60 additions & 0 deletions dcpy/test/lifecycle/distribute/test_connector_dispatch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from pathlib import Path
import pytest
from typing import Any

from dcpy.models.lifecycle.distribution import PublisherPushKwargs
from dcpy.models.product import metadata as md
from dcpy.lifecycle.distribute import dispatcher


@pytest.fixture
def org_metadata(resources_path: Path):
# TODO: refactor away, into conftest maybe
template_vars = {
"version": "24c",
"lion_prod_level_pub_freq": "monthly",
"pseudo_lots_pub_freq": "monthly",
"agency": "fake_agency",
}
return md.OrgMetadata.from_path(
resources_path / "test_product_metadata_repo", template_vars=template_vars
)


SNOWFLAKE_CONNECTOR_TYPE = "snowflake"


class MockSnowflakeConnector:
conn_type: str

def __init__(self):
self.conn_type = SNOWFLAKE_CONNECTOR_TYPE
self.push_counter = 0

def push(
self,
thing: PublisherPushKwargs,
) -> Any:
print(thing)
self.push_counter += 1

def pull(self, arg: dict) -> Any:
raise Exception("Pull not implemented for Socrata Connector")


def test_dynamic_dispatch(org_metadata: md.OrgMetadata):
snowflake_connector = MockSnowflakeConnector()
dispatcher.register(
conn_type=SNOWFLAKE_CONNECTOR_TYPE, connector=snowflake_connector
)
dispatch_details: PublisherPushKwargs = {
"metadata": org_metadata.product("lion").dataset("pseudo_lots"),
"dataset_destination_id": "garlic_sftp",
}
assert snowflake_connector.push_counter == 0

dispatcher.push(SNOWFLAKE_CONNECTOR_TYPE, dispatch_details)

assert (
snowflake_connector.push_counter == 1
), "The mock snowflake connector should have been called."
111 changes: 111 additions & 0 deletions dcpy/test/models/test_connectors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from typing import Any
from dataclasses import dataclass

from dcpy.models.connectors import ConnectorDispatcher


# Define Some Animals
@dataclass
class Animal:
age: int


@dataclass
class Dog(Animal):
name: str

def bark(self):
print(f"bark {self.name}")


class Beagle(Dog):
pass


@dataclass
class Cat(Animal):
remaining_lives: int = 9

def glare(self):
pass


# Define Some Connectors to handle those animals
@dataclass
class AnimalConn:
conn_type: str

def push(self, _: Animal) -> Any:
"""push"""

def pull(self, _: Animal) -> Any:
"""pull"""


@dataclass
class DogConn:
conn_type: str

def push(self, dog: Dog) -> Any:
dog.bark()

def pull(self, _: Dog) -> Any:
"""pull"""


@dataclass
class BeagleConn:
conn_type: str

def push(self, dog: Beagle) -> Any:
dog.bark()

def pull(self, _: Beagle) -> Any:
"""pull"""


@dataclass
class CatConn:
conn_type: str

def push(self, cat: Cat) -> Any:
cat.glare()

def pull(self, _: Cat) -> Any:
"""pull"""


def test_dog_contravariance():
dog_dispatcher = ConnectorDispatcher[Dog, Dog]()

# This is fine! Callables for Animal can also handle a Dog.
dog_dispatcher.register(conn_type="animal", connector=AnimalConn("animal"))
# Also fine. Obviously dogs are dogs.
dog_dispatcher.register(conn_type="dog", connector=DogConn("dog"))

def things_that_fail_the_type_checker_if_you_remove_the_type_ignore():
"""Chucking these in a function to 1) not execute them, 2) indent them"""

# mypy won't allow this because the CatConn might be passed a dog, and dogs can't meow(), for example
dog_dispatcher.register(conn_type="cat", connector=CatConn("Soxx")) # type: ignore

# This one is less obvious. Due to contravariance, subclasses of Dog are not allowed
# for the dog_dispatcher, which takes `Dog` types for the push and pull method.
# This is somewhat counterintuitive, but allowing this in generics breaks type-safety
dog_dispatcher.register(conn_type="beagle", connector=BeagleConn("snoopy")) # type: ignore

### On to the dispatching

# This is fine! Any function that can handle an Animal can handle Dog
dog_dispatcher.push("animal", Dog(age=4, name="rufus"))

def more_things_that_would_fail_the_type_checker():
# Fails for obvious reasons
dog_dispatcher.push("cat", Cat(age=4)) # type: ignore

# This would execute just fine, but given then dynamic nature of dispatch
# mypy can't be sure that an Animal won't be passed to a DogConnector
dog_dispatcher.push("animal", Animal(age=4)) # type: ignore

# This would actually break though, after we called .bark() on the Animal.
dog_dispatcher.push("dog", Animal(age=4)) # type: ignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,5 @@ destinations:
- id: socrata
type: socrata
tags: [prod_tag, pseudo_lots_tag]
- id: garlic_sftp
type: sftp

0 comments on commit 7f2538d

Please sign in to comment.