Skip to content

Commit

Permalink
WIP / POC of dynamic dispatch for connectors
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Richey authored and Alex Richey committed Jan 6, 2025
1 parent 000ad1e commit d4f5a7e
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 12 deletions.
32 changes: 32 additions & 0 deletions dcpy/connectors/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from dcpy.models.connectors import Connector

_connectors = {}


def register(connector: Connector):
_connectors[connector.c_type] = connector


def push(c_type: str, **kwargs) -> str:
connector: Connector = _connectors[c_type]
connector.push(**kwargs)
return ""


def pull(c_type: str, **kwargs) -> str:
connector: Connector = _connectors[c_type]
connector.pull(**kwargs)
return ""


from .socrata.publish import SocrataConnector

register(SocrataConnector())

from .web import WebConnector

register(WebConnector())

from .ftp import FTPConnector

register(FTPConnector())
11 changes: 11 additions & 0 deletions dcpy/connectors/ftp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Placeholder

from dcpy.models.connectors import Connector

class FTPConnector(Connector):
c_type = "ftp"

def push(self, **kwargs):
raise Exception(f"Push not implemented for {self.c_type}")
def pull(self, **kwargs):
raise Exception(f"Pull not implemented for {self.c_type}")
16 changes: 16 additions & 0 deletions dcpy/connectors/socrata/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from dcpy.utils.logging import logger

from dcpy.models.connectors import Connector
import dcpy.models.product.dataset.metadata as md
from .utils import SOCRATA_USER, SOCRATA_PASSWORD, _socrata_request

Expand Down Expand Up @@ -680,3 +681,18 @@ def push_dataset(

dataset.discard_open_revisions()
return f"Published {metadata.attributes.display_name} - destination: {dataset_destination_id}"

class SocrataConnector(Connector):
c_type = "socrata"

def push(self, **kwargs):
return push_dataset(
metadata=kwargs['metadata'],
dataset_destination_id=kwargs['dataset_destination_id'],
dataset_package_path=kwargs['dataset_package_path'],
publish=kwargs.get('publish', False),
metadata_only=kwargs.get('metadata_only', False),
)

def pull(self, **kwargs):
raise Exception(f"Pull not implemented for Socrata Connector")
11 changes: 11 additions & 0 deletions dcpy/connectors/web.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from pathlib import Path
import requests

from dcpy.models.connectors import Connector
from dcpy.utils.logging import logger


Expand All @@ -14,3 +15,13 @@ def download_file(url: str, path: Path) -> None:
response.raise_for_status()
with open(path, "wb") as f:
f.write(response.content)


class WebConnector(Connector):
c_type="web"

def pull(self, **kwargs):
download_file(kwargs["url"], kwargs["path"])

def push(self, **kwargs):
raise Exception(f"Push not implemented for {self.c_type}")
25 changes: 13 additions & 12 deletions dcpy/lifecycle/distribute/socrata.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
import dcpy.models.product.dataset.metadata as m
from dcpy.utils.logging import logger
import dcpy.connectors.edm.packaging as packaging
import dcpy.connectors.socrata.publish as soc_pub
from dcpy import connectors


class PublishKwargs(TypedDict):
metadata_path: NotRequired[Path]
metadata: NotRequired[m.Metadata]
publish: Required[bool]
ignore_validation_errors: Required[bool]
skip_validation: Required[bool]
Expand All @@ -20,7 +20,7 @@ def dist_from_local(
package_path: Path,
dataset_destination_id: str,
*,
metadata_path: Path | None = None,
metadata: m.Metadata,
publish: bool = False,
ignore_validation_errors: bool = False,
skip_validation: bool = False,
Expand All @@ -30,16 +30,14 @@ def dist_from_local(
Requires fully rendered template, ie there should be no template variables in the metadata
"""
md = m.Metadata.from_path(metadata_path or (package_path / "metadata.yml"))
validation_errors = md.validate_consistency()
validation_errors = metadata.validate_consistency()
if validation_errors:
logger.error(
f"The metadata file contains inconsistencies that must be fixed before pushing: {str(validation_errors)}"
)
return str(validation_errors)

dest = md.get_destination(dataset_destination_id)
assert dest.type == "socrata"
dest = metadata.get_destination(dataset_destination_id)

if not (skip_validation or metadata_only):
logger.info("Validating package")
Expand All @@ -58,15 +56,16 @@ def dist_from_local(
raise Exception(error_msg)

try:
return soc_pub.push_dataset(
metadata=md,
return connectors.push(
c_type=dest.type,
metadata=metadata,
dataset_destination_id=dataset_destination_id,
dataset_package_path=package_path,
publish=bool(publish),
metadata_only=bool(metadata_only),
)
except Exception as e:
return f"Error pushing {md.attributes.display_name}, destination: {dest.id}: {str(e)}"
return f"Error pushing {metadata.attributes.display_name}, destination: {dest.id}: {str(e)}"


def dist_from_local_all_socrata(
Expand Down Expand Up @@ -147,10 +146,11 @@ def _dist_from_local(
help="Only push metadata (including attachments).",
),
):
metadata = m.Metadata.from_path(metadata_path or (package_path / "metadata.yml"))
result = dist_from_local(
package_path=package_path,
dataset_destination_id=dataset_destination_id,
metadata_path=metadata_path,
metadata=metadata,
publish=publish,
ignore_validation_errors=ignore_validation_errors,
skip_validation=skip_validation,
Expand Down Expand Up @@ -285,11 +285,12 @@ def _dist_from_s3(
package_path = packaging.pull(
packaging.DatasetPackageKey(product_name, version, dataset or product_name)
)
metadata = m.Metadata.from_path(metadata_path or (package_path / "metadata.yml"))

result = dist_from_local(
package_path=package_path,
dataset_destination_id=dataset_destination_id,
metadata_path=metadata_path,
metadata=metadata,
publish=publish,
ignore_validation_errors=ignore_validation_errors,
skip_validation=skip_validation,
Expand Down
16 changes: 16 additions & 0 deletions dcpy/models/connectors/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import abc

# TODO: question... I implemented methods as abc because w/ concrete methods,
# static checkers like pylance will complain about type-safety of the overridden **kwargs.
# I do wonder if there's a more elegant way.
class Connector(abc.ABC):

c_type: str

@abc.abstractmethod
def push(self, **kwargs):
raise Exception(f"push not implemented for {self.c_type}")

@abc.abstractmethod
def pull(self, **kwargs):
raise Exception(f"pull not implemented for {self.c_type}")

0 comments on commit d4f5a7e

Please sign in to comment.