Skip to content

Commit

Permalink
feat(bigquery): add partition key tag (datahub-project#4974)
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored and maggiehays committed Aug 1, 2022
1 parent 785fa41 commit d598103
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 6 deletions.
12 changes: 12 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
UpstreamLineageClass,
)
from datahub.utilities.bigquery_sql_parser import BigQuerySQLParser
from datahub.utilities.mapping import Constants

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -702,6 +703,17 @@ def is_latest_shard(self, project_id: str, schema: str, table: str) -> bool:
else:
return True

def get_extra_tags(
self, inspector: Inspector, schema: str, table: str
) -> Dict[str, List[str]]:
extra_tags: Dict[str, List[str]] = {}
partition: Optional[BigQueryPartitionColumn] = self.get_latest_partition(
schema, table
)
if partition:
extra_tags[partition.column_name] = [Constants.TAG_PARTITION_KEY]
return extra_tags

def generate_partition_profiler_query(
self, schema: str, table: str, partition_datetime: Optional[datetime.datetime]
) -> Tuple[Optional[str], Optional[str]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ def get_schema_fields_for_column(
dataset_name: str,
column: Dict[Any, Any],
pk_constraints: Optional[Dict[Any, Any]] = None,
tags: Optional[List[str]] = None,
) -> List[SchemaField]:

fields = super().get_schema_fields_for_column(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,7 @@ def get_schema_fields_for_column(
dataset_name: str,
column: Dict[Any, Any],
pk_constraints: Optional[Dict[Any, Any]] = None,
tags: Optional[List[str]] = None,
) -> List[SchemaField]:
return get_schema_fields_for_hive_column(
column["col_name"], column["col_type"], default_nullable=True
Expand Down
43 changes: 39 additions & 4 deletions metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
make_dataplatform_instance_urn,
make_dataset_urn_with_platform_instance,
make_domain_urn,
make_tag_urn,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import (
Expand Down Expand Up @@ -83,12 +84,15 @@
DataPlatformInstanceClass,
DatasetLineageTypeClass,
DatasetPropertiesClass,
GlobalTagsClass,
JobStatusClass,
SubTypesClass,
TagAssociationClass,
UpstreamClass,
ViewPropertiesClass,
)
from datahub.telemetry import telemetry
from datahub.utilities.mapping import Constants
from datahub.utilities.sqlalchemy_query_combiner import SQLAlchemyQueryCombinerReport

if TYPE_CHECKING:
Expand Down Expand Up @@ -868,6 +872,11 @@ def loop_tables( # noqa: C901
except Exception as e:
self.report.report_failure(f"{schema}", f"Tables error: {e}")

def get_extra_tags(
self, inspector: Inspector, schema: str, table: str
) -> Optional[Dict[str, List[str]]]:
return None

def _process_table(
self,
dataset_name: str,
Expand Down Expand Up @@ -926,9 +935,12 @@ def _process_table(
self.report.report_workunit(lineage_wu)
yield lineage_wu

extra_tags = self.get_extra_tags(inspector, schema, table)
pk_constraints: dict = inspector.get_pk_constraint(table, schema)
foreign_keys = self._get_foreign_keys(dataset_urn, inspector, schema, table)
schema_fields = self.get_schema_fields(dataset_name, columns, pk_constraints)
schema_fields = self.get_schema_fields(
dataset_name, columns, pk_constraints, tags=extra_tags
)
schema_metadata = get_schema_metadata(
self.report,
dataset_name,
Expand Down Expand Up @@ -1050,26 +1062,49 @@ def _get_foreign_keys(
return foreign_keys

def get_schema_fields(
self, dataset_name: str, columns: List[dict], pk_constraints: dict = None
self,
dataset_name: str,
columns: List[dict],
pk_constraints: dict = None,
tags: Optional[Dict[str, List[str]]] = None,
) -> List[SchemaField]:
canonical_schema = []
for column in columns:
column_tags: Optional[List[str]] = None
if tags:
column_tags = tags.get(column["name"], [])
fields = self.get_schema_fields_for_column(
dataset_name, column, pk_constraints
dataset_name, column, pk_constraints, tags=column_tags
)
canonical_schema.extend(fields)
return canonical_schema

def get_schema_fields_for_column(
self, dataset_name: str, column: dict, pk_constraints: dict = None
self,
dataset_name: str,
column: dict,
pk_constraints: dict = None,
tags: Optional[List[str]] = None,
) -> List[SchemaField]:
gtc: Optional[GlobalTagsClass] = None
is_partition_key: Optional[bool] = None
if tags:
if Constants.TAG_PARTITION_KEY in tags:
is_partition_key = True
else:
is_partition_key = False
tags_str = [make_tag_urn(t) for t in tags]
tags_tac = [TagAssociationClass(t) for t in tags_str]
gtc = GlobalTagsClass(tags_tac)
field = SchemaField(
fieldPath=column["name"],
type=get_column_type(self.report, dataset_name, column["type"]),
nativeDataType=column.get("full_type", repr(column["type"])),
description=column.get("comment", None),
nullable=column["nullable"],
recursive=False,
globalTags=gtc,
isPartitioningKey=is_partition_key,
)
if (
pk_constraints is not None
Expand Down
8 changes: 6 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/sql/trino.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
import uuid
from textwrap import dedent
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional

import sqlalchemy

Expand Down Expand Up @@ -175,7 +175,11 @@ def create(cls, config_dict, ctx):
return cls(config, ctx)

def get_schema_fields_for_column(
self, dataset_name: str, column: dict, pk_constraints: dict = None
self,
dataset_name: str,
column: dict,
pk_constraints: dict = None,
tags: Optional[List[str]] = None,
) -> List[SchemaField]:

fields = super().get_schema_fields_for_column(
Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion/src/datahub/utilities/mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class Constants:
USER_OWNER = "user"
GROUP_OWNER = "group"
OPERAND_DATATYPE_SUPPORTED = [int, bool, str, float]
TAG_PARTITION_KEY = "PARTITION_KEY"


class OperationProcessor:
Expand Down

0 comments on commit d598103

Please sign in to comment.