Skip to content

Commit

Permalink
Add very basic wrapper around proc func outputs
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Richey authored and Alex Richey committed Dec 12, 2024
1 parent 340ec91 commit 7d02679
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 69 deletions.
178 changes: 135 additions & 43 deletions dcpy/lifecycle/ingest/transform.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from functools import partial
import geopandas as gpd
from pydantic import BaseModel
import pandas as pd
from pathlib import Path
from typing import Callable, Literal
from typing import Callable, Literal, NamedTuple, Any
from dataclasses import dataclass

from dcpy.models import file
from dcpy.models import file, base
from dcpy.models.base import SortedSerializedBase
from dcpy.models.lifecycle.ingest import ProcessingStep, Column
from dcpy.utils import data, introspect
from dcpy.utils.geospatial import transform, parquet as geoparquet
Expand All @@ -14,6 +17,36 @@
OUTPUT_GEOM_COLUMN = "geom"


class ProcessingSummary(SortedSerializedBase):
"""Summary of the changes from a data processing function."""

description: str | None = None
row_modifications: dict
column_modifications: dict


class ProcessingResult(SortedSerializedBase, arbitrary_types_allowed=True):
df: pd.DataFrame
summary: ProcessingSummary


def make_generic_change_stats(
before: pd.DataFrame, after: pd.DataFrame, *, description: str | None
) -> ProcessingSummary:
"""Generate a ProcessingSummary by comparing two dataframes before and after processing."""
initial_columns = set(before.columns)
final_columns = set(after.columns)

return ProcessingSummary(
description=description,
row_modifications={"count_before": len(before), "count_after": len(after)},
column_modifications={
"added": list(final_columns - initial_columns),
"removed": list(initial_columns - final_columns),
},
)


def to_parquet(
file_format: file.Format,
local_data_path: Path,
Expand Down Expand Up @@ -72,45 +105,58 @@ class ProcessingFunctions:
def __init__(self, dataset_id: str):
self.dataset_id = dataset_id

def reproject(self, df: gpd.GeoDataFrame, target_crs: str) -> gpd.GeoDataFrame:
return transform.reproject_gdf(df, target_crs=target_crs)
def reproject(self, df: gpd.GeoDataFrame, target_crs: str) -> ProcessingResult:
result = transform.reproject_gdf(df, target_crs=target_crs)
summary = make_generic_change_stats(
df, result, description=f"Reprojected geometries to {target_crs}"
)
return ProcessingResult(df=result, summary=summary)

def sort(self, df: pd.DataFrame, by: list[str], ascending=True) -> pd.DataFrame:
def sort(self, df: pd.DataFrame, by: list[str], ascending=True) -> ProcessingResult:
sorted = df.sort_values(by=by, ascending=ascending)
return sorted.reset_index(drop=True)
summary = make_generic_change_stats(
df, sorted, description=f"Sorted by columns: {', '.join(by)}"
)
return ProcessingResult(df=sorted, summary=summary)

def filter_rows(
self,
df: pd.DataFrame,
type: Literal["equals", "contains"],
column_name: str | int,
val: str | int,
) -> pd.DataFrame:
) -> ProcessingResult:
if type == "contains":
filter = df[column_name].str.contains(str(val))
else:
filter = df[column_name] == val
filtered = df[filter]
return filtered.reset_index(drop=True)
filtered = df[filter].reset_index(drop=True)
summary = make_generic_change_stats(df, filtered, description="Filtered rows")
return ProcessingResult(df=filtered, summary=summary)

def rename_columns(
self, df: pd.DataFrame, map: dict[str, str], drop_others=False
) -> pd.DataFrame:
) -> ProcessingResult:
renamed = df.copy()
if isinstance(renamed, gpd.GeoDataFrame) and renamed.geometry.name in map:
renamed.rename_geometry(map.pop(renamed.geometry.name), inplace=True)
renamed = renamed.rename(columns=map, errors="raise")
if drop_others:
renamed = renamed[list(map.values())]
return renamed
summary = make_generic_change_stats(
df,
renamed,
description=("Renamed columns"),
)
return ProcessingResult(df=renamed, summary=summary)

def clean_column_names(
self,
df: pd.DataFrame,
*,
replace: dict[str, str] | None = None,
lower: bool = False,
) -> pd.DataFrame:
) -> ProcessingResult:
cleaned = df.copy()
replace = replace or {}
columns = list(cleaned.columns)
Expand All @@ -119,22 +165,32 @@ def clean_column_names(
if lower:
columns = [c.lower() for c in columns]
cleaned.columns = pd.Index(columns)
return cleaned
summary = make_generic_change_stats(
df, cleaned, description="Cleaned column names"
)
return ProcessingResult(df=cleaned, summary=summary)

def update_column(
self,
df: pd.DataFrame,
column_name: str,
val: str | int,
) -> pd.DataFrame:
) -> ProcessingResult:
updated = df.copy()
updated[column_name] = val
return updated
summary = make_generic_change_stats(df, updated, description="Updated columns")
return ProcessingResult(df=updated, summary=summary)

def append_prev(self, df: pd.DataFrame, version: str = "latest") -> pd.DataFrame:
def append_prev(
self, df: pd.DataFrame, version: str = "latest"
) -> ProcessingResult:
prev_df = recipes.read_df(recipes.Dataset(id=self.dataset_id, version=version))
appended = pd.concat((prev_df, df))
return appended.reset_index(drop=True)
appended = appended.reset_index(drop=True)
summary = make_generic_change_stats(
prev_df, appended, description="Appended rows"
)
return ProcessingResult(df=appended, summary=summary)

def upsert_column_of_previous_version(
self,
Expand All @@ -143,7 +199,7 @@ def upsert_column_of_previous_version(
version: str = "latest",
insert_behavior: Literal["allow", "ignore", "error"] = "allow",
missing_key_behavior: Literal["null", "coalesce", "error"] = "error",
) -> pd.DataFrame:
) -> ProcessingResult:
assert key, "Must provide non-empty list of columns to be used as keys"
prev_df = recipes.read_df(recipes.Dataset(id=self.dataset_id, version=version))
df = data.upsert_df_columns(
Expand All @@ -153,34 +209,48 @@ def upsert_column_of_previous_version(
insert_behavior=insert_behavior,
missing_key_behavior=missing_key_behavior,
)
return df
summary = make_generic_change_stats(prev_df, df, description="Upserted columns")
return ProcessingResult(df=df, summary=summary)

def deduplicate(
self,
df: pd.DataFrame,
sort_columns: list[str] | None = None,
sort_ascending: bool = True,
by: list[str] | None = None,
) -> pd.DataFrame:
) -> ProcessingResult:
deduped = df.copy()
if sort_columns:
deduped = deduped.sort_values(by=sort_columns, ascending=sort_ascending)
deduped = deduped.drop_duplicates(by)
return deduped.reset_index(drop=True)
deduped = deduped.drop_duplicates(by).reset_index(drop=True)
summary = make_generic_change_stats(
df, deduped, description="Removed duplicates"
)
return ProcessingResult(df=deduped, summary=summary)

def drop_columns(self, df: pd.DataFrame, columns: list[str | int]) -> pd.DataFrame:
def drop_columns(
self, df: pd.DataFrame, columns: list[str | int]
) -> ProcessingResult:
columns = [df.columns[i] if isinstance(i, int) else i for i in columns]
return df.drop(columns, axis=1)
result = df.drop(columns, axis=1)
summary = make_generic_change_stats(df, result, description="Dropped columns")
return ProcessingResult(df=result, summary=summary)

def strip_columns(
self, df: pd.DataFrame, cols: list[str] | None = None
) -> pd.DataFrame:
) -> ProcessingResult:
stripped = df.copy()
if cols:
for col in cols:
df[col] = df[col].str.strip()
stripped[col] = stripped[col].str.strip()
else:
df = df.apply(lambda x: x.str.strip() if x.dtype == "object" else x)
return df
stripped = stripped.apply(
lambda x: x.str.strip() if x.dtype == "object" else x
)
summary = make_generic_change_stats(
df, stripped, description="Stripped whitespace"
)
return ProcessingResult(df=stripped, summary=summary)

def coerce_column_types(
self,
Expand All @@ -189,7 +259,7 @@ def coerce_column_types(
str, Literal["numeric", "integer", "bigint", "string", "date", "datetime"]
],
errors: Literal["raise", "coerce"] = "raise",
):
) -> ProcessingResult:
def to_str(obj):
if isinstance(obj, int):
return str(obj)
Expand All @@ -200,31 +270,42 @@ def to_str(obj):
else:
return str(obj)

df = df.copy()
result = df.copy()
changes = []
for column in column_types:
original_type = result[column].dtype
match column_types[column]:
case "numeric":
df[column] = pd.to_numeric(df[column], errors=errors)
result[column] = pd.to_numeric(result[column], errors=errors)
case "integer" | "bigint" as t:
mapping = {"integer": "Int32", "bigint": "Int64"}
df[column] = pd.array(df[column], dtype=mapping[t]) # type: ignore
result[column] = pd.array(result[column], dtype=mapping[t])
case "string":
df[column] = df[column].apply(to_str)
result[column] = result[column].apply(to_str)
case "date":
df[column] = pd.to_datetime(df[column], errors=errors).dt.date
df[column] = df[column].replace(pd.NaT, None) # type: ignore
result[column] = pd.to_datetime(
result[column], errors=errors
).dt.date
result[column] = result[column].replace(pd.NaT, None)
case "datetime":
df[column] = pd.to_datetime(df[column], errors=errors)
df[column] = df[column].replace(pd.NaT, None) # type: ignore
return df
result[column] = pd.to_datetime(result[column], errors=errors)
result[column] = result[column].replace(pd.NaT, None)
changes.append(f"{column}: {original_type} -> {result[column].dtype}")
summary = make_generic_change_stats(
df, result, description="Coerced column types"
)
return ProcessingResult(df=result, summary=summary)

def multi(self, df: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
def multi(self, df: gpd.GeoDataFrame) -> ProcessingResult:
multi_gdf = df.copy()
multi_gdf.set_geometry(
gpd.GeoSeries([transform.multi(feature) for feature in multi_gdf.geometry]),
inplace=True,
)
return multi_gdf
summary = make_generic_change_stats(
df, multi_gdf, description="Converted geometries"
)
return ProcessingResult(df=multi_gdf, summary=summary)

def pd_series_func(
self,
Expand All @@ -234,7 +315,7 @@ def pd_series_func(
output_column_name: str | None = None,
geo: bool = False, # only used for validation
**kwargs,
) -> pd.DataFrame:
) -> ProcessingResult:
"""
Operates on a given column using a given pandas Series function and supplied kwargs
Expand Down Expand Up @@ -265,8 +346,14 @@ def pd_series_func(
func = transformed[column_name]
for part in parts:
func = func.__getattribute__(part)

transformed[output_column_name or column_name] = func(**kwargs) # type: ignore
return transformed
summary = make_generic_change_stats(
df,
transformed,
description=f"Applied {function_name} to column {column_name}",
)
return ProcessingResult(df=transformed, summary=summary)


def validate_pd_series_func(
Expand Down Expand Up @@ -351,12 +438,17 @@ def process(
df = geoparquet.read_df(input_path)
compiled_steps = validate_processing_steps(dataset_id, processing_steps)

summaries = []
for step in compiled_steps:
df = step(df)
result = step(df)
df = result.df
summaries.append(result.summary)
logger.info(f"Processing step complete: {result.summary}")

validate_columns(df, expected_columns)

if output_csv:
df.to_csv(output_path.parent / f"{dataset_id}.csv")

df.to_parquet(output_path)
logger.info("Processing summary:\n" + "\n".join(f"- {s}" for s in summaries))
Loading

0 comments on commit 7d02679

Please sign in to comment.