From 7d026793571762b583dfeace1d40241b7d9f83df Mon Sep 17 00:00:00 2001 From: Alex Richey Date: Thu, 12 Dec 2024 13:31:05 -0500 Subject: [PATCH] Add very basic wrapper around proc func outputs --- dcpy/lifecycle/ingest/transform.py | 178 ++++++++++++++----- dcpy/test/lifecycle/ingest/test_transform.py | 52 +++--- 2 files changed, 161 insertions(+), 69 deletions(-) diff --git a/dcpy/lifecycle/ingest/transform.py b/dcpy/lifecycle/ingest/transform.py index aea3f89d3f..39a9c47213 100644 --- a/dcpy/lifecycle/ingest/transform.py +++ b/dcpy/lifecycle/ingest/transform.py @@ -1,10 +1,13 @@ from functools import partial import geopandas as gpd +from pydantic import BaseModel import pandas as pd from pathlib import Path -from typing import Callable, Literal +from typing import Callable, Literal, NamedTuple, Any +from dataclasses import dataclass -from dcpy.models import file +from dcpy.models import file, base +from dcpy.models.base import SortedSerializedBase from dcpy.models.lifecycle.ingest import ProcessingStep, Column from dcpy.utils import data, introspect from dcpy.utils.geospatial import transform, parquet as geoparquet @@ -14,6 +17,36 @@ OUTPUT_GEOM_COLUMN = "geom" +class ProcessingSummary(SortedSerializedBase): + """Summary of the changes from a data processing function.""" + + description: str | None = None + row_modifications: dict + column_modifications: dict + + +class ProcessingResult(SortedSerializedBase, arbitrary_types_allowed=True): + df: pd.DataFrame + summary: ProcessingSummary + + +def make_generic_change_stats( + before: pd.DataFrame, after: pd.DataFrame, *, description: str | None +) -> ProcessingSummary: + """Generate a ProcessingSummary by comparing two dataframes before and after processing.""" + initial_columns = set(before.columns) + final_columns = set(after.columns) + + return ProcessingSummary( + description=description, + row_modifications={"count_before": len(before), "count_after": len(after)}, + column_modifications={ + "added": list(final_columns - initial_columns), + "removed": list(initial_columns - final_columns), + }, + ) + + def to_parquet( file_format: file.Format, local_data_path: Path, @@ -72,12 +105,19 @@ class ProcessingFunctions: def __init__(self, dataset_id: str): self.dataset_id = dataset_id - def reproject(self, df: gpd.GeoDataFrame, target_crs: str) -> gpd.GeoDataFrame: - return transform.reproject_gdf(df, target_crs=target_crs) + def reproject(self, df: gpd.GeoDataFrame, target_crs: str) -> ProcessingResult: + result = transform.reproject_gdf(df, target_crs=target_crs) + summary = make_generic_change_stats( + df, result, description=f"Reprojected geometries to {target_crs}" + ) + return ProcessingResult(df=result, summary=summary) - def sort(self, df: pd.DataFrame, by: list[str], ascending=True) -> pd.DataFrame: + def sort(self, df: pd.DataFrame, by: list[str], ascending=True) -> ProcessingResult: sorted = df.sort_values(by=by, ascending=ascending) - return sorted.reset_index(drop=True) + summary = make_generic_change_stats( + df, sorted, description=f"Sorted by columns: {', '.join(by)}" + ) + return ProcessingResult(df=sorted, summary=summary) def filter_rows( self, @@ -85,24 +125,30 @@ def filter_rows( type: Literal["equals", "contains"], column_name: str | int, val: str | int, - ) -> pd.DataFrame: + ) -> ProcessingResult: if type == "contains": filter = df[column_name].str.contains(str(val)) else: filter = df[column_name] == val - filtered = df[filter] - return filtered.reset_index(drop=True) + filtered = df[filter].reset_index(drop=True) + summary = make_generic_change_stats(df, filtered, description="Filtered rows") + return ProcessingResult(df=filtered, summary=summary) def rename_columns( self, df: pd.DataFrame, map: dict[str, str], drop_others=False - ) -> pd.DataFrame: + ) -> ProcessingResult: renamed = df.copy() if isinstance(renamed, gpd.GeoDataFrame) and renamed.geometry.name in map: renamed.rename_geometry(map.pop(renamed.geometry.name), inplace=True) renamed = renamed.rename(columns=map, errors="raise") if drop_others: renamed = renamed[list(map.values())] - return renamed + summary = make_generic_change_stats( + df, + renamed, + description=("Renamed columns"), + ) + return ProcessingResult(df=renamed, summary=summary) def clean_column_names( self, @@ -110,7 +156,7 @@ def clean_column_names( *, replace: dict[str, str] | None = None, lower: bool = False, - ) -> pd.DataFrame: + ) -> ProcessingResult: cleaned = df.copy() replace = replace or {} columns = list(cleaned.columns) @@ -119,22 +165,32 @@ def clean_column_names( if lower: columns = [c.lower() for c in columns] cleaned.columns = pd.Index(columns) - return cleaned + summary = make_generic_change_stats( + df, cleaned, description="Cleaned column names" + ) + return ProcessingResult(df=cleaned, summary=summary) def update_column( self, df: pd.DataFrame, column_name: str, val: str | int, - ) -> pd.DataFrame: + ) -> ProcessingResult: updated = df.copy() updated[column_name] = val - return updated + summary = make_generic_change_stats(df, updated, description="Updated columns") + return ProcessingResult(df=updated, summary=summary) - def append_prev(self, df: pd.DataFrame, version: str = "latest") -> pd.DataFrame: + def append_prev( + self, df: pd.DataFrame, version: str = "latest" + ) -> ProcessingResult: prev_df = recipes.read_df(recipes.Dataset(id=self.dataset_id, version=version)) appended = pd.concat((prev_df, df)) - return appended.reset_index(drop=True) + appended = appended.reset_index(drop=True) + summary = make_generic_change_stats( + prev_df, appended, description="Appended rows" + ) + return ProcessingResult(df=appended, summary=summary) def upsert_column_of_previous_version( self, @@ -143,7 +199,7 @@ def upsert_column_of_previous_version( version: str = "latest", insert_behavior: Literal["allow", "ignore", "error"] = "allow", missing_key_behavior: Literal["null", "coalesce", "error"] = "error", - ) -> pd.DataFrame: + ) -> ProcessingResult: assert key, "Must provide non-empty list of columns to be used as keys" prev_df = recipes.read_df(recipes.Dataset(id=self.dataset_id, version=version)) df = data.upsert_df_columns( @@ -153,7 +209,8 @@ def upsert_column_of_previous_version( insert_behavior=insert_behavior, missing_key_behavior=missing_key_behavior, ) - return df + summary = make_generic_change_stats(prev_df, df, description="Upserted columns") + return ProcessingResult(df=df, summary=summary) def deduplicate( self, @@ -161,26 +218,39 @@ def deduplicate( sort_columns: list[str] | None = None, sort_ascending: bool = True, by: list[str] | None = None, - ) -> pd.DataFrame: + ) -> ProcessingResult: deduped = df.copy() if sort_columns: deduped = deduped.sort_values(by=sort_columns, ascending=sort_ascending) - deduped = deduped.drop_duplicates(by) - return deduped.reset_index(drop=True) + deduped = deduped.drop_duplicates(by).reset_index(drop=True) + summary = make_generic_change_stats( + df, deduped, description="Removed duplicates" + ) + return ProcessingResult(df=deduped, summary=summary) - def drop_columns(self, df: pd.DataFrame, columns: list[str | int]) -> pd.DataFrame: + def drop_columns( + self, df: pd.DataFrame, columns: list[str | int] + ) -> ProcessingResult: columns = [df.columns[i] if isinstance(i, int) else i for i in columns] - return df.drop(columns, axis=1) + result = df.drop(columns, axis=1) + summary = make_generic_change_stats(df, result, description="Dropped columns") + return ProcessingResult(df=result, summary=summary) def strip_columns( self, df: pd.DataFrame, cols: list[str] | None = None - ) -> pd.DataFrame: + ) -> ProcessingResult: + stripped = df.copy() if cols: for col in cols: - df[col] = df[col].str.strip() + stripped[col] = stripped[col].str.strip() else: - df = df.apply(lambda x: x.str.strip() if x.dtype == "object" else x) - return df + stripped = stripped.apply( + lambda x: x.str.strip() if x.dtype == "object" else x + ) + summary = make_generic_change_stats( + df, stripped, description="Stripped whitespace" + ) + return ProcessingResult(df=stripped, summary=summary) def coerce_column_types( self, @@ -189,7 +259,7 @@ def coerce_column_types( str, Literal["numeric", "integer", "bigint", "string", "date", "datetime"] ], errors: Literal["raise", "coerce"] = "raise", - ): + ) -> ProcessingResult: def to_str(obj): if isinstance(obj, int): return str(obj) @@ -200,31 +270,42 @@ def to_str(obj): else: return str(obj) - df = df.copy() + result = df.copy() + changes = [] for column in column_types: + original_type = result[column].dtype match column_types[column]: case "numeric": - df[column] = pd.to_numeric(df[column], errors=errors) + result[column] = pd.to_numeric(result[column], errors=errors) case "integer" | "bigint" as t: mapping = {"integer": "Int32", "bigint": "Int64"} - df[column] = pd.array(df[column], dtype=mapping[t]) # type: ignore + result[column] = pd.array(result[column], dtype=mapping[t]) case "string": - df[column] = df[column].apply(to_str) + result[column] = result[column].apply(to_str) case "date": - df[column] = pd.to_datetime(df[column], errors=errors).dt.date - df[column] = df[column].replace(pd.NaT, None) # type: ignore + result[column] = pd.to_datetime( + result[column], errors=errors + ).dt.date + result[column] = result[column].replace(pd.NaT, None) case "datetime": - df[column] = pd.to_datetime(df[column], errors=errors) - df[column] = df[column].replace(pd.NaT, None) # type: ignore - return df + result[column] = pd.to_datetime(result[column], errors=errors) + result[column] = result[column].replace(pd.NaT, None) + changes.append(f"{column}: {original_type} -> {result[column].dtype}") + summary = make_generic_change_stats( + df, result, description="Coerced column types" + ) + return ProcessingResult(df=result, summary=summary) - def multi(self, df: gpd.GeoDataFrame) -> gpd.GeoDataFrame: + def multi(self, df: gpd.GeoDataFrame) -> ProcessingResult: multi_gdf = df.copy() multi_gdf.set_geometry( gpd.GeoSeries([transform.multi(feature) for feature in multi_gdf.geometry]), inplace=True, ) - return multi_gdf + summary = make_generic_change_stats( + df, multi_gdf, description="Converted geometries" + ) + return ProcessingResult(df=multi_gdf, summary=summary) def pd_series_func( self, @@ -234,7 +315,7 @@ def pd_series_func( output_column_name: str | None = None, geo: bool = False, # only used for validation **kwargs, - ) -> pd.DataFrame: + ) -> ProcessingResult: """ Operates on a given column using a given pandas Series function and supplied kwargs @@ -265,8 +346,14 @@ def pd_series_func( func = transformed[column_name] for part in parts: func = func.__getattribute__(part) + transformed[output_column_name or column_name] = func(**kwargs) # type: ignore - return transformed + summary = make_generic_change_stats( + df, + transformed, + description=f"Applied {function_name} to column {column_name}", + ) + return ProcessingResult(df=transformed, summary=summary) def validate_pd_series_func( @@ -351,8 +438,12 @@ def process( df = geoparquet.read_df(input_path) compiled_steps = validate_processing_steps(dataset_id, processing_steps) + summaries = [] for step in compiled_steps: - df = step(df) + result = step(df) + df = result.df + summaries.append(result.summary) + logger.info(f"Processing step complete: {result.summary}") validate_columns(df, expected_columns) @@ -360,3 +451,4 @@ def process( df.to_csv(output_path.parent / f"{dataset_id}.csv") df.to_parquet(output_path) + logger.info("Processing summary:\n" + "\n".join(f"- {s}" for s in summaries)) diff --git a/dcpy/test/lifecycle/ingest/test_transform.py b/dcpy/test/lifecycle/ingest/test_transform.py index 48ff2a0bc8..e1957ca276 100644 --- a/dcpy/test/lifecycle/ingest/test_transform.py +++ b/dcpy/test/lifecycle/ingest/test_transform.py @@ -107,7 +107,7 @@ def test_validate_processing_steps(): } ).set_geometry("col3") for step in compiled_steps: - df = step(df) + df = step(df).df expected = gpd.GeoDataFrame( {"col3": gpd.GeoSeries([None, None, None])} ).set_geometry("col3") @@ -202,60 +202,60 @@ def test_reproject(self): assert self.gdf.crs.to_string() == "EPSG:4326" target = "EPSG:2263" reprojected = self.proc.reproject(self.gdf, target_crs=target) - assert reprojected.crs.to_string() == target + assert reprojected.df.crs.to_string() == target def test_sort(self): sorted = self.proc.sort(self.basic_df, by=["a"]) expected = pd.DataFrame({"a": [1, 2, 3], "b": ["c_3", "b_1", "b_2"]}) - assert sorted.equals(expected) + assert sorted.df.equals(expected) def test_filter_rows_equals(self): filtered = self.proc.filter_rows( self.basic_df, type="equals", column_name="a", val=1 ) expected = pd.DataFrame({"a": [1], "b": ["c_3"]}) - assert filtered.equals(expected) + assert filtered.df.equals(expected) def test_filter_rows_contains(self): filtered = self.proc.filter_rows( self.basic_df, type="contains", column_name="b", val="b_" ) expected = pd.DataFrame({"a": [2, 3], "b": ["b_1", "b_2"]}) - assert filtered.equals(expected) + assert filtered.df.equals(expected) def test_rename_columns(self): - renamed = self.proc.rename_columns(self.basic_df, {"a": "c"}) + renamed = self.proc.rename_columns(self.basic_df, {"a": "c"}).df expected = pd.DataFrame({"c": [2, 3, 1], "b": ["b_1", "b_2", "c_3"]}) assert renamed.equals(expected) def test_rename_columns_drop(self): renamed = self.proc.rename_columns(self.basic_df, {"a": "c"}, drop_others=True) expected = pd.DataFrame({"c": [2, 3, 1]}) - assert renamed.equals(expected) + assert renamed.df.equals(expected) def test_clean_column_names(self): cleaned = self.proc.clean_column_names(self.messy_names_df, replace={"_": "-"}) expected = pd.DataFrame({"Column": [1, 2], "Two-Words": [3, 4]}) - assert cleaned.equals(expected) + assert cleaned.df.equals(expected) def test_clean_column_names_lower(self): cleaned = self.proc.clean_column_names( self.messy_names_df, replace={"_": "-"}, lower=True ) expected = pd.DataFrame({"column": [1, 2], "two-words": [3, 4]}) - assert cleaned.equals(expected) + assert cleaned.df.equals(expected) def test_update_column(self): updated = self.proc.update_column(self.basic_df, column_name="a", val=5) expected = pd.DataFrame({"a": [5, 5, 5], "b": ["b_1", "b_2", "c_3"]}) - assert updated.equals(expected) + assert updated.df.equals(expected) @mock.patch("dcpy.connectors.edm.recipes.read_df") def test_append_prev(self, read_df): read_df.return_value = self.prev_df appended = self.proc.append_prev(self.basic_df) expected = pd.DataFrame({"a": [-1, 2, 3, 1], "b": ["z", "b_1", "b_2", "c_3"]}) - assert appended.equals(expected) + assert appended.df.equals(expected) @mock.patch("dcpy.connectors.edm.recipes.read_df") def test_upsert_column_of_previous_version(self, read_df): @@ -264,35 +264,35 @@ def test_upsert_column_of_previous_version(self, read_df): expected = pd.DataFrame( {"a": [3, 2, 1], "b": ["b_2", "b_1", "c_3"], "c": [True, False, True]} ) - assert upserted.equals(expected) + assert upserted.df.equals(expected) def test_deduplicate(self): deduped = self.proc.deduplicate(self.dupe_df) expected = pd.DataFrame({"a": [1, 1, 2], "b": [3, 1, 2]}) - assert deduped.equals(expected) + assert deduped.df.equals(expected) def test_deduplicate_by(self): deduped = self.proc.deduplicate(self.dupe_df, by="a") expected = pd.DataFrame({"a": [1, 2], "b": [3, 2]}) - assert deduped.equals(expected) + assert deduped.df.equals(expected) def test_deduplicate_by_sort(self): deduped = self.proc.deduplicate(self.dupe_df, by="a", sort_columns="b") expected = pd.DataFrame({"a": [1, 2], "b": [1, 2]}) - assert deduped.equals(expected) + assert deduped.df.equals(expected) def test_drop_columns(self): dropped = self.proc.drop_columns(self.basic_df, columns=["b"]) expected = pd.DataFrame({"a": [2, 3, 1]}) - assert dropped.equals(expected) + assert dropped.df.equals(expected) def test_strip_columns(self): stripped = self.proc.strip_columns(self.whitespace_df, ["b"]) - assert stripped.equals(self.basic_df) + assert stripped.df.equals(self.basic_df) def test_strip_all_columns(self): stripped = self.proc.strip_columns(self.whitespace_df) - assert stripped.equals(self.basic_df) + assert stripped.df.equals(self.basic_df) @pytest.mark.parametrize( "original_column, cast, errors, expected_column", @@ -335,14 +335,14 @@ def test_coerce_column_type(self, original_column, cast, errors, expected_column coerced = self.proc.coerce_column_types( self.coerce_df, {original_column: cast}, errors=errors ) - assert coerced[original_column].equals(self.coerce_df[expected_column]) + assert coerced.df[original_column].equals(self.coerce_df[expected_column]) def test_pd_series_func(self): transformed = self.proc.pd_series_func( self.basic_df, column_name="b", function_name="map", arg={"b_1": "c_1"} ) expected = pd.DataFrame({"a": [2, 3, 1], "b": ["c_1", np.nan, np.nan]}) - assert transformed.equals(expected) + assert transformed.df.equals(expected) def test_pd_series_func_str(self): transformed = self.proc.pd_series_func( @@ -353,7 +353,7 @@ def test_pd_series_func_str(self): repl="B-", ) expected = pd.DataFrame({"a": [2, 3, 1], "b": ["B-1", "B-2", "c_3"]}) - assert transformed.equals(expected) + assert transformed.df.equals(expected) def test_gpd_series_func(self): gdf = gpd.GeoDataFrame( @@ -365,7 +365,7 @@ def test_gpd_series_func(self): transformed = self.proc.pd_series_func( gdf, column_name="wkt", function_name="force_2d", geo=True ) - assert transformed.equals( + assert transformed.df.equals( gpd.GeoDataFrame( { "a": [1, 2], @@ -386,9 +386,9 @@ def test_rename_geodataframe(self): transformed: gpd.GeoDataFrame = self.proc.rename_columns( self.gdf, map={"wkt": "geom"} ) - assert transformed.active_geometry_name == "geom" + assert transformed.df.active_geometry_name == "geom" expected = gpd.read_parquet(RESOURCES / TEST_DATA_DIR / "renamed.parquet") - assert transformed.equals(expected) + assert transformed.df.equals(expected) def test_multi(self): gdf = gpd.GeoDataFrame( @@ -426,7 +426,7 @@ def test_multi(self): ), } ) - assert transformed.equals(expected) + assert transformed.df.equals(expected) def test_processing_no_steps(create_temp_filesystem: Path): @@ -437,7 +437,7 @@ def test_processing_no_steps(create_temp_filesystem: Path): ), "Error in setup of test - output file should not exist yet" transform.process(TEST_DATASET_NAME, [], [], input, output) - assert output.exists() + assert output.exists() # TODO: inconsistent with df def test_processing(create_temp_filesystem: Path):