Skip to content

Commit

Permalink
fix(ingest): bigquery - Fix BigQuery Datetime/Timestamp type column p…
Browse files Browse the repository at this point in the history
…artition table profile bug (datahub-project#4658)

* fix BigQuery Datetime type column partition table profile bug

* inplace datetime replace

* extract out 'if' blocks and write a unit-test

* parse logic inside get_partition_range func
  • Loading branch information
sebkim authored and maggiehays committed Aug 1, 2022
1 parent 754aa23 commit 8254d1e
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 9 deletions.
62 changes: 53 additions & 9 deletions metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

# This import verifies that the dependencies are available.
import sqlalchemy_bigquery
from dateutil import parser
from dateutil.relativedelta import relativedelta
from google.cloud.bigquery import Client as BigQueryClient
from google.cloud.logging_v2.client import Client as GCPLoggingClient
from sqlalchemy import create_engine, inspect
Expand Down Expand Up @@ -221,6 +221,40 @@ def bigquery_audit_metadata_query_template(
return textwrap.dedent(query)


def get_partition_range_from_partition_id(
partition_id: str, partition_datetime: Optional[datetime.datetime]
) -> Tuple[datetime.datetime, datetime.datetime]:
duration: relativedelta
# if yearly partitioned,
if len(partition_id) == 4:
duration = relativedelta(years=1)
if not partition_datetime:
partition_datetime = datetime.datetime.strptime(partition_id, "%Y")
partition_datetime = partition_datetime.replace(month=1, day=1)
# elif monthly partitioned,
elif len(partition_id) == 6:
duration = relativedelta(months=1)
if not partition_datetime:
partition_datetime = datetime.datetime.strptime(partition_id, "%Y%m")
partition_datetime = partition_datetime.replace(day=1)
# elif daily partitioned,
elif len(partition_id) == 8:
duration = relativedelta(days=1)
if not partition_datetime:
partition_datetime = datetime.datetime.strptime(partition_id, "%Y%m%d")
# elif hourly partitioned,
elif len(partition_id) == 10:
duration = relativedelta(hours=1)
if not partition_datetime:
partition_datetime = datetime.datetime.strptime(partition_id, "%Y%m%d%H")
else:
raise ValueError(
f"check your partition_id {partition_id}. It must be yearly/monthly/daily/hourly."
)
upper_bound_partition_datetime = partition_datetime + duration
return partition_datetime, upper_bound_partition_datetime


# Handle the GEOGRAPHY type. We will temporarily patch the _type_map
# in the get_workunits method of the source.
GEOGRAPHY = make_sqlalchemy_type("GEOGRAPHY")
Expand Down Expand Up @@ -632,14 +666,25 @@ def generate_partition_profiler_query(

partition = self.get_latest_partition(schema, table)
if partition:
partition_ts: Union[datetime.datetime, datetime.date]
if not partition_datetime:
partition_datetime = parser.parse(partition.partition_id)
partition_where_clause: str
logger.debug(f"{table} is partitioned and partition column is {partition}")
(
partition_datetime,
upper_bound_partition_datetime,
) = get_partition_range_from_partition_id(
partition.partition_id, partition_datetime
)
if partition.data_type in ("TIMESTAMP", "DATETIME"):
partition_ts = partition_datetime
partition_where_clause = "{column_name} BETWEEN '{partition_id}' AND '{upper_bound_partition_id}'".format(
column_name=partition.column_name,
partition_id=partition_datetime,
upper_bound_partition_id=upper_bound_partition_datetime,
)
elif partition.data_type == "DATE":
partition_ts = partition_datetime.date()
partition_where_clause = "{column_name} = '{partition_id}'".format(
column_name=partition.column_name,
partition_id=partition_datetime.date(),
)
else:
logger.warning(f"Not supported partition type {partition.data_type}")
return None, None
Expand All @@ -650,13 +695,12 @@ def generate_partition_profiler_query(
FROM
`{table_catalog}.{table_schema}.{table_name}`
WHERE
{column_name} = '{partition_id}'
{partition_where_clause}
""".format(
table_catalog=partition.table_catalog,
table_schema=partition.table_schema,
table_name=partition.table_name,
column_name=partition.column_name,
partition_id=partition_ts,
partition_where_clause=partition_where_clause,
)

return (partition.partition_id, custom_sql)
Expand Down
56 changes: 56 additions & 0 deletions metadata-ingestion/tests/unit/test_bq_get_partition_range.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import datetime

from datahub.ingestion.source.sql.bigquery import get_partition_range_from_partition_id


def test_get_partition_range_from_partition_id():
# yearly partition check
assert get_partition_range_from_partition_id(
"2022", datetime.datetime(2022, 1, 1)
) == (datetime.datetime(2022, 1, 1), datetime.datetime(2023, 1, 1))
assert get_partition_range_from_partition_id(
"2022", datetime.datetime(2022, 3, 12)
) == (datetime.datetime(2022, 1, 1), datetime.datetime(2023, 1, 1))
assert get_partition_range_from_partition_id(
"2022", datetime.datetime(2021, 5, 2)
) == (datetime.datetime(2021, 1, 1), datetime.datetime(2022, 1, 1))
assert get_partition_range_from_partition_id("2022", None) == (
datetime.datetime(2022, 1, 1),
datetime.datetime(2023, 1, 1),
)
# monthly partition check
assert get_partition_range_from_partition_id(
"202202", datetime.datetime(2022, 2, 1)
) == (datetime.datetime(2022, 2, 1), datetime.datetime(2022, 3, 1))
assert get_partition_range_from_partition_id(
"202202", datetime.datetime(2022, 2, 3)
) == (datetime.datetime(2022, 2, 1), datetime.datetime(2022, 3, 1))
assert get_partition_range_from_partition_id(
"202202", datetime.datetime(2021, 12, 13)
) == (datetime.datetime(2021, 12, 1), datetime.datetime(2022, 1, 1))
assert get_partition_range_from_partition_id("202202", None) == (
datetime.datetime(2022, 2, 1),
datetime.datetime(2022, 3, 1),
)
# daily partition check
assert get_partition_range_from_partition_id(
"20220205", datetime.datetime(2022, 2, 5)
) == (datetime.datetime(2022, 2, 5), datetime.datetime(2022, 2, 6))
assert get_partition_range_from_partition_id(
"20220205", datetime.datetime(2022, 2, 3)
) == (datetime.datetime(2022, 2, 3), datetime.datetime(2022, 2, 4))
assert get_partition_range_from_partition_id("20220205", None) == (
datetime.datetime(2022, 2, 5),
datetime.datetime(2022, 2, 6),
)
# hourly partition check
assert get_partition_range_from_partition_id(
"2022020509", datetime.datetime(2022, 2, 5, 9)
) == (datetime.datetime(2022, 2, 5, 9), datetime.datetime(2022, 2, 5, 10))
assert get_partition_range_from_partition_id(
"2022020509", datetime.datetime(2022, 2, 3, 1)
) == (datetime.datetime(2022, 2, 3, 1), datetime.datetime(2022, 2, 3, 2))
assert get_partition_range_from_partition_id("2022020509", None) == (
datetime.datetime(2022, 2, 5, 9),
datetime.datetime(2022, 2, 5, 10),
)

0 comments on commit 8254d1e

Please sign in to comment.