Skip to content

Commit

Permalink
Add basics of dynamic connector dispatch
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Richey authored and Alex Richey committed Jan 10, 2025
1 parent 31724e0 commit be2cfcb
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 1 deletion.
6 changes: 6 additions & 0 deletions dcpy/connectors/ftp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
class FTPConnector:
def push(self, dest_path: str, user_id: str):
raise Exception(f"Push not implemented for FTP")

Check warning on line 3 in dcpy/connectors/ftp.py

View check run for this annotation

Codecov / codecov/patch

dcpy/connectors/ftp.py#L3

Added line #L3 was not covered by tests

def pull(self, **kwargs):
raise Exception(f"Pull not implemented for FTP")

Check warning on line 6 in dcpy/connectors/ftp.py

View check run for this annotation

Codecov / codecov/patch

dcpy/connectors/ftp.py#L6

Added line #L6 was not covered by tests
17 changes: 16 additions & 1 deletion dcpy/connectors/socrata/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
from socrata.revisions import Revision as SocrataPyRevision
import time
from typing import TypedDict, Literal, NotRequired, Any
from typing import Unpack

from dcpy.utils.logging import logger

from dcpy.models.lifecycle.distribution import PublisherPushKwargs
import dcpy.models.product.dataset.metadata as md
from .utils import SOCRATA_USER, SOCRATA_PASSWORD, _socrata_request

Expand Down Expand Up @@ -573,10 +575,10 @@ def __init__(self, metadata: md.Metadata, destination_id: str):


def push_dataset(
*,
metadata: md.Metadata,
dataset_destination_id: str,
dataset_package_path: Path,
*,
publish: bool = False,
metadata_only: bool = False,
):
Expand Down Expand Up @@ -680,3 +682,16 @@ def push_dataset(

dataset.discard_open_revisions()
return f"Published {metadata.attributes.display_name} - destination: {dataset_destination_id}"


class SocrataPublishConnector:
conn_type = "socrata"

def push(
self,
**kwargs: Unpack[PublisherPushKwargs],
) -> Any:
return push_dataset(**kwargs)

Check warning on line 694 in dcpy/connectors/socrata/publish.py

View check run for this annotation

Codecov / codecov/patch

dcpy/connectors/socrata/publish.py#L694

Added line #L694 was not covered by tests

def pull(self, **kwargs):
raise Exception("Pull not implemented for Socrata Connector")

Check warning on line 697 in dcpy/connectors/socrata/publish.py

View check run for this annotation

Codecov / codecov/patch

dcpy/connectors/socrata/publish.py#L697

Added line #L697 was not covered by tests
10 changes: 10 additions & 0 deletions dcpy/lifecycle/distribute/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from dcpy.lifecycle.distribute import connectors as _conn

from dcpy.connectors.socrata.publish import SocrataPublishConnector


# Register all default connectors for `lifecycle.distribute`.
# Third parties can similarly register their own connectors,
# so long as the connector implements the distribute Connector protocol.
_conn.register(SocrataPublishConnector())
_conn.register(_conn.DistributionFTPConnector())
52 changes: 52 additions & 0 deletions dcpy/lifecycle/distribute/connectors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from typing import Unpack, Protocol, Any

from dcpy.connectors.ftp import FTPConnector
from dcpy.models.lifecycle.distribution import PublisherPushKwargs


_connectors = {}


class Connector(Protocol):
conn_type: str

def push(self, **kwargs: Unpack[PublisherPushKwargs]) -> Any:
"""push"""

def pull(self, **kwargs) -> Any:
"""pull"""


def register(connector: Connector):
_connectors[connector.conn_type] = connector


def push(dest_type: str, **kwargs: Unpack[PublisherPushKwargs]) -> str:
connector: Connector = _connectors[dest_type]
return connector.push(**kwargs)


def pull(conn_type: str, **kwargs) -> str:
connector: Connector = _connectors[conn_type]
return connector.pull(**kwargs)

Check warning on line 31 in dcpy/lifecycle/distribute/connectors.py

View check run for this annotation

Codecov / codecov/patch

dcpy/lifecycle/distribute/connectors.py#L30-L31

Added lines #L30 - L31 were not covered by tests


# Wrap the FTP Connector to bind it to the `PublisherPushKwargs`
# so that we can register and delegate FTP calls.
# This is the recommended way for third parties to add custom Distribution Connectors.
class DistributionFTPConnector:
conn_type: str

def __init__(self):
self.conn_type = "ftp"
self._base_connector = FTPConnector()

def push(self, **kwargs: Unpack[PublisherPushKwargs]) -> Any:
md = kwargs["metadata"]
dest = md.get_destination(kwargs["dataset_destination_id"])
dest_path = dest.custom["destination_path"]
user_id = dest.custom["user_id"]
self._base_connector.push(dest_path=dest_path, user_id=user_id)

Check warning on line 49 in dcpy/lifecycle/distribute/connectors.py

View check run for this annotation

Codecov / codecov/patch

dcpy/lifecycle/distribute/connectors.py#L45-L49

Added lines #L45 - L49 were not covered by tests

def pull(self, **kwargs) -> Any:
raise Exception("Pull is not defined for any Distribution Connectors.")

Check warning on line 52 in dcpy/lifecycle/distribute/connectors.py

View check run for this annotation

Codecov / codecov/patch

dcpy/lifecycle/distribute/connectors.py#L52

Added line #L52 was not covered by tests
12 changes: 12 additions & 0 deletions dcpy/models/lifecycle/distribution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from typing import TypedDict, NotRequired, Required
from pathlib import Path

import dcpy.models.product.dataset.metadata as ds_md


class PublisherPushKwargs(TypedDict):
metadata: Required[ds_md.Metadata]
dataset_destination_id: Required[str]
publish: NotRequired[bool]
dataset_package_path: NotRequired[Path]
metadata_only: NotRequired[bool]
58 changes: 58 additions & 0 deletions dcpy/test/lifecycle/distribute/test_connector_dispatch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from pathlib import Path
import pytest
from typing import Unpack, Any

from dcpy.models.lifecycle.distribution import PublisherPushKwargs
from dcpy.models.product import metadata as md
from dcpy.lifecycle.distribute import connectors


@pytest.fixture
def org_metadata(resources_path: Path):
# TODO: refactor away, into conftest maybe
template_vars = {
"version": "24c",
"lion_prod_level_pub_freq": "monthly",
"pseudo_lots_pub_freq": "monthly",
"agency": "fake_agency",
}
return md.OrgMetadata.from_path(
resources_path / "test_product_metadata_repo", template_vars=template_vars
)


SNOWFLAKE_CONNECTOR_TYPE = "snowflake"


class MockSnowflakeConnector:
conn_type: str

def __init__(self):
self.conn_type = SNOWFLAKE_CONNECTOR_TYPE
self.push_counter = 0

def push(
self,
**kwargs: Unpack[PublisherPushKwargs],
) -> Any:
self.push_counter += 1

def pull(self, **kwargs):
raise Exception("Pull not implemented for Socrata Connector")


def test_dynamic_dispatch(org_metadata: md.OrgMetadata):
snowflake_connector = MockSnowflakeConnector()
connectors.register(snowflake_connector)

assert snowflake_connector.push_counter == 0

connectors.push(
dest_type=SNOWFLAKE_CONNECTOR_TYPE,
metadata=org_metadata.product("lion").dataset("pseudo_lots"),
dataset_destination_id="garlic_sftp",
)

assert (
snowflake_connector.push_counter == 1
), "The mock snowflake connector should have been called."
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,5 @@ destinations:
- id: socrata
type: socrata
tags: [prod_tag, pseudo_lots_tag]
- id: garlic_sftp
type: sftp

0 comments on commit be2cfcb

Please sign in to comment.