From 8254d1ee9492d10d1a9a4ef855b855ff49b116ac Mon Sep 17 00:00:00 2001 From: Sebo Kim Date: Tue, 26 Apr 2022 23:54:19 +0900 Subject: [PATCH] fix(ingest): bigquery - Fix BigQuery Datetime/Timestamp type column partition table profile bug (#4658) * fix BigQuery Datetime type column partition table profile bug * inplace datetime replace * extract out 'if' blocks and write a unit-test * parse logic inside get_partition_range func --- .../datahub/ingestion/source/sql/bigquery.py | 62 ++++++++++++++++--- .../tests/unit/test_bq_get_partition_range.py | 56 +++++++++++++++++ 2 files changed, 109 insertions(+), 9 deletions(-) create mode 100644 metadata-ingestion/tests/unit/test_bq_get_partition_range.py diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py index b0d06631a1d501..e135dcc24339a1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py @@ -12,7 +12,7 @@ # This import verifies that the dependencies are available. import sqlalchemy_bigquery -from dateutil import parser +from dateutil.relativedelta import relativedelta from google.cloud.bigquery import Client as BigQueryClient from google.cloud.logging_v2.client import Client as GCPLoggingClient from sqlalchemy import create_engine, inspect @@ -221,6 +221,40 @@ def bigquery_audit_metadata_query_template( return textwrap.dedent(query) +def get_partition_range_from_partition_id( + partition_id: str, partition_datetime: Optional[datetime.datetime] +) -> Tuple[datetime.datetime, datetime.datetime]: + duration: relativedelta + # if yearly partitioned, + if len(partition_id) == 4: + duration = relativedelta(years=1) + if not partition_datetime: + partition_datetime = datetime.datetime.strptime(partition_id, "%Y") + partition_datetime = partition_datetime.replace(month=1, day=1) + # elif monthly partitioned, + elif len(partition_id) == 6: + duration = relativedelta(months=1) + if not partition_datetime: + partition_datetime = datetime.datetime.strptime(partition_id, "%Y%m") + partition_datetime = partition_datetime.replace(day=1) + # elif daily partitioned, + elif len(partition_id) == 8: + duration = relativedelta(days=1) + if not partition_datetime: + partition_datetime = datetime.datetime.strptime(partition_id, "%Y%m%d") + # elif hourly partitioned, + elif len(partition_id) == 10: + duration = relativedelta(hours=1) + if not partition_datetime: + partition_datetime = datetime.datetime.strptime(partition_id, "%Y%m%d%H") + else: + raise ValueError( + f"check your partition_id {partition_id}. It must be yearly/monthly/daily/hourly." + ) + upper_bound_partition_datetime = partition_datetime + duration + return partition_datetime, upper_bound_partition_datetime + + # Handle the GEOGRAPHY type. We will temporarily patch the _type_map # in the get_workunits method of the source. GEOGRAPHY = make_sqlalchemy_type("GEOGRAPHY") @@ -632,14 +666,25 @@ def generate_partition_profiler_query( partition = self.get_latest_partition(schema, table) if partition: - partition_ts: Union[datetime.datetime, datetime.date] - if not partition_datetime: - partition_datetime = parser.parse(partition.partition_id) + partition_where_clause: str logger.debug(f"{table} is partitioned and partition column is {partition}") + ( + partition_datetime, + upper_bound_partition_datetime, + ) = get_partition_range_from_partition_id( + partition.partition_id, partition_datetime + ) if partition.data_type in ("TIMESTAMP", "DATETIME"): - partition_ts = partition_datetime + partition_where_clause = "{column_name} BETWEEN '{partition_id}' AND '{upper_bound_partition_id}'".format( + column_name=partition.column_name, + partition_id=partition_datetime, + upper_bound_partition_id=upper_bound_partition_datetime, + ) elif partition.data_type == "DATE": - partition_ts = partition_datetime.date() + partition_where_clause = "{column_name} = '{partition_id}'".format( + column_name=partition.column_name, + partition_id=partition_datetime.date(), + ) else: logger.warning(f"Not supported partition type {partition.data_type}") return None, None @@ -650,13 +695,12 @@ def generate_partition_profiler_query( FROM `{table_catalog}.{table_schema}.{table_name}` WHERE - {column_name} = '{partition_id}' + {partition_where_clause} """.format( table_catalog=partition.table_catalog, table_schema=partition.table_schema, table_name=partition.table_name, - column_name=partition.column_name, - partition_id=partition_ts, + partition_where_clause=partition_where_clause, ) return (partition.partition_id, custom_sql) diff --git a/metadata-ingestion/tests/unit/test_bq_get_partition_range.py b/metadata-ingestion/tests/unit/test_bq_get_partition_range.py new file mode 100644 index 00000000000000..50863b9f9a8352 --- /dev/null +++ b/metadata-ingestion/tests/unit/test_bq_get_partition_range.py @@ -0,0 +1,56 @@ +import datetime + +from datahub.ingestion.source.sql.bigquery import get_partition_range_from_partition_id + + +def test_get_partition_range_from_partition_id(): + # yearly partition check + assert get_partition_range_from_partition_id( + "2022", datetime.datetime(2022, 1, 1) + ) == (datetime.datetime(2022, 1, 1), datetime.datetime(2023, 1, 1)) + assert get_partition_range_from_partition_id( + "2022", datetime.datetime(2022, 3, 12) + ) == (datetime.datetime(2022, 1, 1), datetime.datetime(2023, 1, 1)) + assert get_partition_range_from_partition_id( + "2022", datetime.datetime(2021, 5, 2) + ) == (datetime.datetime(2021, 1, 1), datetime.datetime(2022, 1, 1)) + assert get_partition_range_from_partition_id("2022", None) == ( + datetime.datetime(2022, 1, 1), + datetime.datetime(2023, 1, 1), + ) + # monthly partition check + assert get_partition_range_from_partition_id( + "202202", datetime.datetime(2022, 2, 1) + ) == (datetime.datetime(2022, 2, 1), datetime.datetime(2022, 3, 1)) + assert get_partition_range_from_partition_id( + "202202", datetime.datetime(2022, 2, 3) + ) == (datetime.datetime(2022, 2, 1), datetime.datetime(2022, 3, 1)) + assert get_partition_range_from_partition_id( + "202202", datetime.datetime(2021, 12, 13) + ) == (datetime.datetime(2021, 12, 1), datetime.datetime(2022, 1, 1)) + assert get_partition_range_from_partition_id("202202", None) == ( + datetime.datetime(2022, 2, 1), + datetime.datetime(2022, 3, 1), + ) + # daily partition check + assert get_partition_range_from_partition_id( + "20220205", datetime.datetime(2022, 2, 5) + ) == (datetime.datetime(2022, 2, 5), datetime.datetime(2022, 2, 6)) + assert get_partition_range_from_partition_id( + "20220205", datetime.datetime(2022, 2, 3) + ) == (datetime.datetime(2022, 2, 3), datetime.datetime(2022, 2, 4)) + assert get_partition_range_from_partition_id("20220205", None) == ( + datetime.datetime(2022, 2, 5), + datetime.datetime(2022, 2, 6), + ) + # hourly partition check + assert get_partition_range_from_partition_id( + "2022020509", datetime.datetime(2022, 2, 5, 9) + ) == (datetime.datetime(2022, 2, 5, 9), datetime.datetime(2022, 2, 5, 10)) + assert get_partition_range_from_partition_id( + "2022020509", datetime.datetime(2022, 2, 3, 1) + ) == (datetime.datetime(2022, 2, 3, 1), datetime.datetime(2022, 2, 3, 2)) + assert get_partition_range_from_partition_id("2022020509", None) == ( + datetime.datetime(2022, 2, 5, 9), + datetime.datetime(2022, 2, 5, 10), + )