diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py index e75d358d38702..7c1365abc80dd 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py @@ -64,6 +64,7 @@ UpstreamLineageClass, ) from datahub.utilities.bigquery_sql_parser import BigQuerySQLParser +from datahub.utilities.mapping import Constants logger = logging.getLogger(__name__) @@ -702,6 +703,17 @@ def is_latest_shard(self, project_id: str, schema: str, table: str) -> bool: else: return True + def get_extra_tags( + self, inspector: Inspector, schema: str, table: str + ) -> Dict[str, List[str]]: + extra_tags: Dict[str, List[str]] = {} + partition: Optional[BigQueryPartitionColumn] = self.get_latest_partition( + schema, table + ) + if partition: + extra_tags[partition.column_name] = [Constants.TAG_PARTITION_KEY] + return extra_tags + def generate_partition_profiler_query( self, schema: str, table: str, partition_datetime: Optional[datetime.datetime] ) -> Tuple[Optional[str], Optional[str]]: diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hive.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hive.py index 5989cf6221f86..1cde2c3dcc599 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hive.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hive.py @@ -95,6 +95,7 @@ def get_schema_fields_for_column( dataset_name: str, column: Dict[Any, Any], pk_constraints: Optional[Dict[Any, Any]] = None, + tags: Optional[List[str]] = None, ) -> List[SchemaField]: fields = super().get_schema_fields_for_column( diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/presto_on_hive.py b/metadata-ingestion/src/datahub/ingestion/source/sql/presto_on_hive.py index 50268910244f0..519935049930a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/presto_on_hive.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/presto_on_hive.py @@ -450,6 +450,7 @@ def get_schema_fields_for_column( dataset_name: str, column: Dict[Any, Any], pk_constraints: Optional[Dict[Any, Any]] = None, + tags: Optional[List[str]] = None, ) -> List[SchemaField]: return get_schema_fields_for_hive_column( column["col_name"], column["col_type"], default_nullable=True diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py index 308a2f8556f37..6c3138825eaf0 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py @@ -34,6 +34,7 @@ make_dataplatform_instance_urn, make_dataset_urn_with_platform_instance, make_domain_urn, + make_tag_urn, ) from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.mcp_builder import ( @@ -83,12 +84,15 @@ DataPlatformInstanceClass, DatasetLineageTypeClass, DatasetPropertiesClass, + GlobalTagsClass, JobStatusClass, SubTypesClass, + TagAssociationClass, UpstreamClass, ViewPropertiesClass, ) from datahub.telemetry import telemetry +from datahub.utilities.mapping import Constants from datahub.utilities.sqlalchemy_query_combiner import SQLAlchemyQueryCombinerReport if TYPE_CHECKING: @@ -868,6 +872,11 @@ def loop_tables( # noqa: C901 except Exception as e: self.report.report_failure(f"{schema}", f"Tables error: {e}") + def get_extra_tags( + self, inspector: Inspector, schema: str, table: str + ) -> Optional[Dict[str, List[str]]]: + return None + def _process_table( self, dataset_name: str, @@ -926,9 +935,12 @@ def _process_table( self.report.report_workunit(lineage_wu) yield lineage_wu + extra_tags = self.get_extra_tags(inspector, schema, table) pk_constraints: dict = inspector.get_pk_constraint(table, schema) foreign_keys = self._get_foreign_keys(dataset_urn, inspector, schema, table) - schema_fields = self.get_schema_fields(dataset_name, columns, pk_constraints) + schema_fields = self.get_schema_fields( + dataset_name, columns, pk_constraints, tags=extra_tags + ) schema_metadata = get_schema_metadata( self.report, dataset_name, @@ -1050,19 +1062,40 @@ def _get_foreign_keys( return foreign_keys def get_schema_fields( - self, dataset_name: str, columns: List[dict], pk_constraints: dict = None + self, + dataset_name: str, + columns: List[dict], + pk_constraints: dict = None, + tags: Optional[Dict[str, List[str]]] = None, ) -> List[SchemaField]: canonical_schema = [] for column in columns: + column_tags: Optional[List[str]] = None + if tags: + column_tags = tags.get(column["name"], []) fields = self.get_schema_fields_for_column( - dataset_name, column, pk_constraints + dataset_name, column, pk_constraints, tags=column_tags ) canonical_schema.extend(fields) return canonical_schema def get_schema_fields_for_column( - self, dataset_name: str, column: dict, pk_constraints: dict = None + self, + dataset_name: str, + column: dict, + pk_constraints: dict = None, + tags: Optional[List[str]] = None, ) -> List[SchemaField]: + gtc: Optional[GlobalTagsClass] = None + is_partition_key: Optional[bool] = None + if tags: + if Constants.TAG_PARTITION_KEY in tags: + is_partition_key = True + else: + is_partition_key = False + tags_str = [make_tag_urn(t) for t in tags] + tags_tac = [TagAssociationClass(t) for t in tags_str] + gtc = GlobalTagsClass(tags_tac) field = SchemaField( fieldPath=column["name"], type=get_column_type(self.report, dataset_name, column["type"]), @@ -1070,6 +1103,8 @@ def get_schema_fields_for_column( description=column.get("comment", None), nullable=column["nullable"], recursive=False, + globalTags=gtc, + isPartitioningKey=is_partition_key, ) if ( pk_constraints is not None diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/trino.py b/metadata-ingestion/src/datahub/ingestion/source/sql/trino.py index 1afee15b51a05..80524cad6b7b5 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/trino.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/trino.py @@ -1,7 +1,7 @@ import json import uuid from textwrap import dedent -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional import sqlalchemy @@ -175,7 +175,11 @@ def create(cls, config_dict, ctx): return cls(config, ctx) def get_schema_fields_for_column( - self, dataset_name: str, column: dict, pk_constraints: dict = None + self, + dataset_name: str, + column: dict, + pk_constraints: dict = None, + tags: Optional[List[str]] = None, ) -> List[SchemaField]: fields = super().get_schema_fields_for_column( diff --git a/metadata-ingestion/src/datahub/utilities/mapping.py b/metadata-ingestion/src/datahub/utilities/mapping.py index e2b00e3a2ba73..2b6c458db8d1c 100644 --- a/metadata-ingestion/src/datahub/utilities/mapping.py +++ b/metadata-ingestion/src/datahub/utilities/mapping.py @@ -19,6 +19,7 @@ class Constants: USER_OWNER = "user" GROUP_OWNER = "group" OPERAND_DATATYPE_SUPPORTED = [int, bool, str, float] + TAG_PARTITION_KEY = "PARTITION_KEY" class OperationProcessor: