From b026e876a9cb376a0fc49d2ce4a0d5e93f0b8071 Mon Sep 17 00:00:00 2001 From: shubhamjagtap639 Date: Thu, 14 Mar 2024 13:09:09 +0530 Subject: [PATCH 01/12] Add support of BigQuery Owner Label to Datahub Owner ingestion --- .../src/datahub/emitter/mce_builder.py | 4 + .../ingestion/source/bigquery_v2/bigquery.py | 56 ++++++++- .../source/bigquery_v2/bigquery_config.py | 26 +++++ .../tests/unit/test_bigquery_source.py | 107 ++++++++++++++++++ 4 files changed, 187 insertions(+), 6 deletions(-) diff --git a/metadata-ingestion/src/datahub/emitter/mce_builder.py b/metadata-ingestion/src/datahub/emitter/mce_builder.py index d9933db67f66a5..1c29d38273d823 100644 --- a/metadata-ingestion/src/datahub/emitter/mce_builder.py +++ b/metadata-ingestion/src/datahub/emitter/mce_builder.py @@ -246,6 +246,10 @@ def make_owner_urn(owner: str, owner_type: OwnerType) -> str: return f"urn:li:{owner_type.value}:{owner}" +def make_ownership_type_urn(type: str) -> str: + return f"urn:li:ownershipType:{type}" + + def make_term_urn(term: str) -> str: """ Makes a term urn if the input is not a term urn already diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index bcc0aa50ed22e6..280a0d9bbe1063 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -15,7 +15,9 @@ make_data_platform_urn, make_dataplatform_instance_urn, make_dataset_urn, + make_ownership_type_urn, make_tag_urn, + make_user_urn, ) from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.mcp_builder import BigQueryDatasetKey, ContainerKey, ProjectIdKey @@ -111,6 +113,9 @@ from datahub.metadata.schema_classes import ( DataPlatformInstanceClass, GlobalTagsClass, + OwnerClass, + OwnershipClass, + OwnershipTypeClass, TagAssociationClass, ) from datahub.sql_parsing.schema_resolver import SchemaResolver @@ -976,12 +981,19 @@ def gen_table_dataset_workunits( custom_properties["is_sharded"] = str(True) sub_types = ["sharded table"] + sub_types - tags_to_add = None - if table.labels and self.config.capture_table_label_as_tag: - tags_to_add = [] - tags_to_add.extend( - [make_tag_urn(f"""{k}:{v}""") for k, v in table.labels.items()] - ) + tags_to_add = [] + owners_to_add = {} + if table.labels: + if self.config.capture_table_label_as_tag: + tags_to_add.extend( + [make_tag_urn(f"""{k}:{v}""") for k, v in table.labels.items()] + ) + # Capture ownership from table lables if present + for k, v in table.labels.items(): + if self.config.owner_key_pattern in k: + owners_to_add[self.convert_owner_label_value(v)] = k.split( + self.config.owner_key_pattern + )[0] yield from self.gen_dataset_workunits( table=table, @@ -990,6 +1002,7 @@ def gen_table_dataset_workunits( dataset_name=dataset_name, sub_types=sub_types, tags_to_add=tags_to_add, + owners_to_add=owners_to_add, custom_properties=custom_properties, ) @@ -1055,6 +1068,7 @@ def gen_dataset_workunits( dataset_name: str, sub_types: List[str], tags_to_add: Optional[List[str]] = None, + owners_to_add: Optional[Dict[str, str]] = None, custom_properties: Optional[Dict[str, str]] = None, ) -> Iterable[MetadataWorkUnit]: dataset_urn = self.gen_dataset_urn( @@ -1133,6 +1147,9 @@ def gen_dataset_workunits( domain_config=self.config.domain, ) + if owners_to_add: + yield self.gen_owners_aspect_aspect(dataset_urn, owners_to_add) + def gen_tags_aspect_workunit( self, dataset_urn: str, tags_to_add: List[str] ) -> MetadataWorkUnit: @@ -1143,6 +1160,24 @@ def gen_tags_aspect_workunit( entityUrn=dataset_urn, aspect=tags ).as_workunit() + def gen_owners_aspect_aspect( + self, dataset_urn: str, owners_to_add: Dict[str, str] + ) -> MetadataWorkUnit: + owners = OwnershipClass( + owners=[ + OwnerClass( + owner=make_user_urn(owner), + type=OwnershipTypeClass.CUSTOM, + typeUrn=make_ownership_type_urn(type), + ) + for owner, type in owners_to_add.items() + ] + ) + return MetadataChangeProposalWrapper( + entityUrn=dataset_urn, + aspect=owners, + ).as_workunit() + def gen_dataset_urn(self, project_id: str, dataset_name: str, table: str) -> str: datahub_dataset_name = BigqueryTableIdentifier(project_id, dataset_name, table) return make_dataset_urn( @@ -1386,3 +1421,12 @@ def add_config_to_report(self): self.config.start_time, self.config.end_time, ) + + def convert_owner_label_value(self, label_value: str) -> str: + for key in sorted( + self.config.owner_lable_character_mapping.keys(), key=len, reverse=True + ): + label_value = label_value.replace( + key, self.config.owner_lable_character_mapping[key] + ) + return label_value diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py index 2f4978d49e6870..c26fb4d98677d0 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py @@ -22,6 +22,15 @@ logger = logging.getLogger(__name__) +DEFAULT_OWNER_LABEL_CHAR_MAPPING = { + "_": ".", + "-": "@", + "__": "_", + "--": "-", + "_-": "#", + "-_": " ", +} + class BigQueryUsageConfig(BaseUsageConfig): _query_log_delay_removed = pydantic_removed_field("query_log_delay") @@ -287,6 +296,17 @@ def validate_column_lineage(cls, v: bool, values: Dict[str, Any]) -> bool: description="Option to exclude empty projects from being ingested.", ) + owner_lable_character_mapping: Dict[str, str] = Field( + default={}, + description="A mapping of bigquery owner label character to datahub owner character." + "Provided mapping will get added to default mapping.", + ) + + owner_key_pattern: str = Field( + default="_owner_email", + description="A pattern which defines what identifies an owner label.", + ) + @root_validator(skip_on_failure=True) def profile_default_settings(cls, values: Dict) -> Dict: # Extra default SQLAlchemy option for better connection pooling and threading. @@ -370,6 +390,12 @@ def backward_compatibility_configs_set(cls, values: Dict) -> Dict: return values + @root_validator(pre=False) + def update_owner_lable_character_mapping(cls, values: Dict) -> Dict: + DEFAULT_OWNER_LABEL_CHAR_MAPPING.update(values["owner_lable_character_mapping"]) + values["owner_lable_character_mapping"] = DEFAULT_OWNER_LABEL_CHAR_MAPPING + return values + def get_table_pattern(self, pattern: List[str]) -> str: return "|".join(pattern) if pattern else "" diff --git a/metadata-ingestion/tests/unit/test_bigquery_source.py b/metadata-ingestion/tests/unit/test_bigquery_source.py index 42d65fdf02683f..3e53272611b690 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_source.py +++ b/metadata-ingestion/tests/unit/test_bigquery_source.py @@ -28,6 +28,7 @@ BigqueryDataset, BigqueryProject, BigQuerySchemaApi, + BigqueryTable, BigqueryTableSnapshot, BigqueryView, ) @@ -35,10 +36,18 @@ LineageEdge, LineageEdgeColumnMapping, ) +from datahub.ingestion.source.common.subtypes import DatasetSubTypes from datahub.metadata.com.linkedin.pegasus2avro.dataset import ViewProperties from datahub.metadata.schema_classes import ( + ContainerClass, + DataPlatformInstanceClass, DatasetPropertiesClass, MetadataChangeProposalClass, + OwnershipClass, + SchemaMetadataClass, + StatusClass, + SubTypesClass, + TimeStampClass, ) @@ -352,6 +361,104 @@ def test_get_projects_list_fully_filtered(get_projects_mock, get_bq_client_mock) assert projects == [] +@pytest.fixture +def bigquery_table() -> BigqueryTable: + now = datetime.now(tz=timezone.utc) + return BigqueryTable( + name="table1", + comment="comment1", + created=now, + last_altered=now, + size_in_bytes=2400, + rows_count=2, + expires=now - timedelta(days=10), + labels={"data_producer_owner_email": "games_team-nytimes_com"}, + num_partitions=1, + max_partition_id="1", + max_shard_id="1", + active_billable_bytes=2400, + long_term_billable_bytes=2400, + ) + + +@patch.object(BigQueryV2Config, "get_bigquery_client") +def test_gen_table_dataset_workunits(get_bq_client_mock, bigquery_table): + project_id = "test-project" + dataset_name = "test-dataset" + config = BigQueryV2Config.parse_obj( + { + "project_id": project_id, + } + ) + source: BigqueryV2Source = BigqueryV2Source( + config=config, ctx=PipelineContext(run_id="test") + ) + + gen = source.gen_table_dataset_workunits( + bigquery_table, [], project_id, dataset_name + ) + mcp = cast(MetadataChangeProposalClass, next(iter(gen)).metadata) + assert mcp.aspect == StatusClass(removed=False) + + mcp = cast(MetadataChangeProposalClass, next(iter(gen)).metadata) + assert isinstance(mcp.aspect, SchemaMetadataClass) + assert mcp.aspect.schemaName == f"{project_id}.{dataset_name}.{bigquery_table.name}" + assert mcp.aspect.fields == [] + + mcp = cast(MetadataChangeProposalClass, next(iter(gen)).metadata) + dataset_properties = cast(DatasetPropertiesClass, mcp.aspect) + assert dataset_properties.name == bigquery_table.name + assert ( + dataset_properties.qualifiedName + == f"{project_id}.{dataset_name}.{bigquery_table.name}" + ) + assert dataset_properties.description == bigquery_table.comment + assert dataset_properties.created == TimeStampClass( + time=int(bigquery_table.created.timestamp() * 1000) + ) + assert dataset_properties.lastModified == TimeStampClass( + time=int(bigquery_table.last_altered.timestamp() * 1000) + ) + assert dataset_properties.tags == [] + assert dataset_properties.customProperties["expiration_date"] == str( + bigquery_table.expires + ) + assert dataset_properties.customProperties["size_in_bytes"] == str( + bigquery_table.size_in_bytes + ) + assert dataset_properties.customProperties["billable_bytes_active"] == str( + bigquery_table.active_billable_bytes + ) + assert dataset_properties.customProperties["billable_bytes_long_term"] == str( + bigquery_table.long_term_billable_bytes + ) + assert dataset_properties.customProperties["number_of_partitions"] == str( + bigquery_table.num_partitions + ) + assert dataset_properties.customProperties["max_partition_id"] == str( + bigquery_table.max_partition_id + ) + assert dataset_properties.customProperties["max_shard_id"] == str( + bigquery_table.max_shard_id + ) + + mcp = cast(MetadataChangeProposalClass, next(iter(gen)).metadata) + assert isinstance(mcp.aspect, ContainerClass) + + mcp = cast(MetadataChangeProposalClass, next(iter(gen)).metadata) + assert isinstance(mcp.aspect, DataPlatformInstanceClass) + + mcp = cast(MetadataChangeProposalClass, next(iter(gen)).metadata) + assert isinstance(mcp.aspect, SubTypesClass) + assert mcp.aspect.typeNames[1] == DatasetSubTypes.TABLE + + mcp = cast(MetadataChangeProposalClass, next(iter(gen)).metadata) + assert isinstance(mcp.aspect, OwnershipClass) + assert mcp.aspect.owners[0].owner == "urn:li:corpuser:games.team@nytimes.com" + assert mcp.aspect.owners[0].type == "CUSTOM" + assert mcp.aspect.owners[0].typeUrn == "urn:li:ownershipType:data_producer" + + @patch.object(BigQueryV2Config, "get_bigquery_client") def test_simple_upstream_table_generation(get_bq_client_mock): a: BigQueryTableRef = BigQueryTableRef( From 1bbace7447a05668fa5473e030061e5cea8d1a86 Mon Sep 17 00:00:00 2001 From: shubhamjagtap639 Date: Fri, 15 Mar 2024 13:52:43 +0530 Subject: [PATCH 02/12] Address review comments --- .../ingestion/source/bigquery_v2/bigquery.py | 24 ++++++++---- .../source/bigquery_v2/bigquery_config.py | 39 +++++++++++-------- .../tests/unit/test_bigquery_source.py | 14 +++++++ 3 files changed, 52 insertions(+), 25 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index 280a0d9bbe1063..d524e117463d28 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -988,12 +988,15 @@ def gen_table_dataset_workunits( tags_to_add.extend( [make_tag_urn(f"""{k}:{v}""") for k, v in table.labels.items()] ) - # Capture ownership from table lables if present - for k, v in table.labels.items(): - if self.config.owner_key_pattern in k: - owners_to_add[self.convert_owner_label_value(v)] = k.split( - self.config.owner_key_pattern - )[0] + if self.config.capture_table_owner_label_as_owner.enabled: + for k, v in table.labels.items(): + if ( + self.config.capture_table_owner_label_as_owner.owner_key_pattern + in k + ): + owners_to_add[self.convert_owner_label_value(v)] = k.split( + self.config.capture_table_owner_label_as_owner.owner_key_pattern + )[0] yield from self.gen_dataset_workunits( table=table, @@ -1424,9 +1427,14 @@ def add_config_to_report(self): def convert_owner_label_value(self, label_value: str) -> str: for key in sorted( - self.config.owner_lable_character_mapping.keys(), key=len, reverse=True + self.config.capture_table_owner_label_as_owner.owner_lable_character_mapping.keys(), + key=len, + reverse=True, ): label_value = label_value.replace( - key, self.config.owner_lable_character_mapping[key] + key, + self.config.capture_table_owner_label_as_owner.owner_lable_character_mapping[ + key + ], ) return label_value diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py index c26fb4d98677d0..143a018ab32982 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py @@ -46,6 +46,23 @@ class BigQueryUsageConfig(BaseUsageConfig): ) +class TableOwnerLableConfig(ConfigModel): + enabled: bool = Field( + default=False, description="Whether to capture table owner from label." + ) + + owner_lable_character_mapping: Dict[str, str] = Field( + default=DEFAULT_OWNER_LABEL_CHAR_MAPPING, + description="A mapping of bigquery owner label character to datahub owner character." + "Provided mapping will override default mapping.", + ) + + owner_key_pattern: str = Field( + default="_owner_email", + description="A pattern which defines what identifies an owner label.", + ) + + class BigQueryConnectionConfig(ConfigModel): credential: Optional[BigQueryCredential] = Field( default=None, description="BigQuery credential informations" @@ -130,6 +147,11 @@ class BigQueryV2Config( description="Capture BigQuery table labels as DataHub tag", ) + capture_table_owner_label_as_owner: TableOwnerLableConfig = Field( + default=TableOwnerLableConfig(), + description="Capture BigQuery table labels as DataHub ownership", + ) + capture_dataset_label_as_tag: bool = Field( default=False, description="Capture BigQuery dataset labels as DataHub tag", @@ -296,17 +318,6 @@ def validate_column_lineage(cls, v: bool, values: Dict[str, Any]) -> bool: description="Option to exclude empty projects from being ingested.", ) - owner_lable_character_mapping: Dict[str, str] = Field( - default={}, - description="A mapping of bigquery owner label character to datahub owner character." - "Provided mapping will get added to default mapping.", - ) - - owner_key_pattern: str = Field( - default="_owner_email", - description="A pattern which defines what identifies an owner label.", - ) - @root_validator(skip_on_failure=True) def profile_default_settings(cls, values: Dict) -> Dict: # Extra default SQLAlchemy option for better connection pooling and threading. @@ -390,12 +401,6 @@ def backward_compatibility_configs_set(cls, values: Dict) -> Dict: return values - @root_validator(pre=False) - def update_owner_lable_character_mapping(cls, values: Dict) -> Dict: - DEFAULT_OWNER_LABEL_CHAR_MAPPING.update(values["owner_lable_character_mapping"]) - values["owner_lable_character_mapping"] = DEFAULT_OWNER_LABEL_CHAR_MAPPING - return values - def get_table_pattern(self, pattern: List[str]) -> str: return "|".join(pattern) if pattern else "" diff --git a/metadata-ingestion/tests/unit/test_bigquery_source.py b/metadata-ingestion/tests/unit/test_bigquery_source.py index 3e53272611b690..51e32e6c1cf1ef 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_source.py +++ b/metadata-ingestion/tests/unit/test_bigquery_source.py @@ -42,11 +42,13 @@ ContainerClass, DataPlatformInstanceClass, DatasetPropertiesClass, + GlobalTagsClass, MetadataChangeProposalClass, OwnershipClass, SchemaMetadataClass, StatusClass, SubTypesClass, + TagAssociationClass, TimeStampClass, ) @@ -388,6 +390,10 @@ def test_gen_table_dataset_workunits(get_bq_client_mock, bigquery_table): config = BigQueryV2Config.parse_obj( { "project_id": project_id, + "capture_table_label_as_tag": True, + "capture_table_owner_label_as_owner": { + "enabled": True, + }, } ) source: BigqueryV2Source = BigqueryV2Source( @@ -442,6 +448,14 @@ def test_gen_table_dataset_workunits(get_bq_client_mock, bigquery_table): bigquery_table.max_shard_id ) + mcp = cast(MetadataChangeProposalClass, next(iter(gen)).metadata) + assert isinstance(mcp.aspect, GlobalTagsClass) + assert mcp.aspect.tags == [ + TagAssociationClass( + "urn:li:tag:data_producer_owner_email:games_team-nytimes_com" + ) + ] + mcp = cast(MetadataChangeProposalClass, next(iter(gen)).metadata) assert isinstance(mcp.aspect, ContainerClass) From 077ee7cf32c3281099cef7e44f632c5c29d39ef7 Mon Sep 17 00:00:00 2001 From: shubhamjagtap639 Date: Thu, 21 Mar 2024 15:43:47 +0530 Subject: [PATCH 03/12] Revert bigquery owner label changes --- .../ingestion/source/bigquery_v2/bigquery.py | 64 ++----------------- .../source/bigquery_v2/bigquery_config.py | 31 --------- .../tests/unit/test_bigquery_source.py | 10 --- 3 files changed, 6 insertions(+), 99 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index d524e117463d28..bcc0aa50ed22e6 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -15,9 +15,7 @@ make_data_platform_urn, make_dataplatform_instance_urn, make_dataset_urn, - make_ownership_type_urn, make_tag_urn, - make_user_urn, ) from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.mcp_builder import BigQueryDatasetKey, ContainerKey, ProjectIdKey @@ -113,9 +111,6 @@ from datahub.metadata.schema_classes import ( DataPlatformInstanceClass, GlobalTagsClass, - OwnerClass, - OwnershipClass, - OwnershipTypeClass, TagAssociationClass, ) from datahub.sql_parsing.schema_resolver import SchemaResolver @@ -981,22 +976,12 @@ def gen_table_dataset_workunits( custom_properties["is_sharded"] = str(True) sub_types = ["sharded table"] + sub_types - tags_to_add = [] - owners_to_add = {} - if table.labels: - if self.config.capture_table_label_as_tag: - tags_to_add.extend( - [make_tag_urn(f"""{k}:{v}""") for k, v in table.labels.items()] - ) - if self.config.capture_table_owner_label_as_owner.enabled: - for k, v in table.labels.items(): - if ( - self.config.capture_table_owner_label_as_owner.owner_key_pattern - in k - ): - owners_to_add[self.convert_owner_label_value(v)] = k.split( - self.config.capture_table_owner_label_as_owner.owner_key_pattern - )[0] + tags_to_add = None + if table.labels and self.config.capture_table_label_as_tag: + tags_to_add = [] + tags_to_add.extend( + [make_tag_urn(f"""{k}:{v}""") for k, v in table.labels.items()] + ) yield from self.gen_dataset_workunits( table=table, @@ -1005,7 +990,6 @@ def gen_table_dataset_workunits( dataset_name=dataset_name, sub_types=sub_types, tags_to_add=tags_to_add, - owners_to_add=owners_to_add, custom_properties=custom_properties, ) @@ -1071,7 +1055,6 @@ def gen_dataset_workunits( dataset_name: str, sub_types: List[str], tags_to_add: Optional[List[str]] = None, - owners_to_add: Optional[Dict[str, str]] = None, custom_properties: Optional[Dict[str, str]] = None, ) -> Iterable[MetadataWorkUnit]: dataset_urn = self.gen_dataset_urn( @@ -1150,9 +1133,6 @@ def gen_dataset_workunits( domain_config=self.config.domain, ) - if owners_to_add: - yield self.gen_owners_aspect_aspect(dataset_urn, owners_to_add) - def gen_tags_aspect_workunit( self, dataset_urn: str, tags_to_add: List[str] ) -> MetadataWorkUnit: @@ -1163,24 +1143,6 @@ def gen_tags_aspect_workunit( entityUrn=dataset_urn, aspect=tags ).as_workunit() - def gen_owners_aspect_aspect( - self, dataset_urn: str, owners_to_add: Dict[str, str] - ) -> MetadataWorkUnit: - owners = OwnershipClass( - owners=[ - OwnerClass( - owner=make_user_urn(owner), - type=OwnershipTypeClass.CUSTOM, - typeUrn=make_ownership_type_urn(type), - ) - for owner, type in owners_to_add.items() - ] - ) - return MetadataChangeProposalWrapper( - entityUrn=dataset_urn, - aspect=owners, - ).as_workunit() - def gen_dataset_urn(self, project_id: str, dataset_name: str, table: str) -> str: datahub_dataset_name = BigqueryTableIdentifier(project_id, dataset_name, table) return make_dataset_urn( @@ -1424,17 +1386,3 @@ def add_config_to_report(self): self.config.start_time, self.config.end_time, ) - - def convert_owner_label_value(self, label_value: str) -> str: - for key in sorted( - self.config.capture_table_owner_label_as_owner.owner_lable_character_mapping.keys(), - key=len, - reverse=True, - ): - label_value = label_value.replace( - key, - self.config.capture_table_owner_label_as_owner.owner_lable_character_mapping[ - key - ], - ) - return label_value diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py index 143a018ab32982..2f4978d49e6870 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py @@ -22,15 +22,6 @@ logger = logging.getLogger(__name__) -DEFAULT_OWNER_LABEL_CHAR_MAPPING = { - "_": ".", - "-": "@", - "__": "_", - "--": "-", - "_-": "#", - "-_": " ", -} - class BigQueryUsageConfig(BaseUsageConfig): _query_log_delay_removed = pydantic_removed_field("query_log_delay") @@ -46,23 +37,6 @@ class BigQueryUsageConfig(BaseUsageConfig): ) -class TableOwnerLableConfig(ConfigModel): - enabled: bool = Field( - default=False, description="Whether to capture table owner from label." - ) - - owner_lable_character_mapping: Dict[str, str] = Field( - default=DEFAULT_OWNER_LABEL_CHAR_MAPPING, - description="A mapping of bigquery owner label character to datahub owner character." - "Provided mapping will override default mapping.", - ) - - owner_key_pattern: str = Field( - default="_owner_email", - description="A pattern which defines what identifies an owner label.", - ) - - class BigQueryConnectionConfig(ConfigModel): credential: Optional[BigQueryCredential] = Field( default=None, description="BigQuery credential informations" @@ -147,11 +121,6 @@ class BigQueryV2Config( description="Capture BigQuery table labels as DataHub tag", ) - capture_table_owner_label_as_owner: TableOwnerLableConfig = Field( - default=TableOwnerLableConfig(), - description="Capture BigQuery table labels as DataHub ownership", - ) - capture_dataset_label_as_tag: bool = Field( default=False, description="Capture BigQuery dataset labels as DataHub tag", diff --git a/metadata-ingestion/tests/unit/test_bigquery_source.py b/metadata-ingestion/tests/unit/test_bigquery_source.py index 51e32e6c1cf1ef..2af9f9478bb938 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_source.py +++ b/metadata-ingestion/tests/unit/test_bigquery_source.py @@ -44,7 +44,6 @@ DatasetPropertiesClass, GlobalTagsClass, MetadataChangeProposalClass, - OwnershipClass, SchemaMetadataClass, StatusClass, SubTypesClass, @@ -391,9 +390,6 @@ def test_gen_table_dataset_workunits(get_bq_client_mock, bigquery_table): { "project_id": project_id, "capture_table_label_as_tag": True, - "capture_table_owner_label_as_owner": { - "enabled": True, - }, } ) source: BigqueryV2Source = BigqueryV2Source( @@ -466,12 +462,6 @@ def test_gen_table_dataset_workunits(get_bq_client_mock, bigquery_table): assert isinstance(mcp.aspect, SubTypesClass) assert mcp.aspect.typeNames[1] == DatasetSubTypes.TABLE - mcp = cast(MetadataChangeProposalClass, next(iter(gen)).metadata) - assert isinstance(mcp.aspect, OwnershipClass) - assert mcp.aspect.owners[0].owner == "urn:li:corpuser:games.team@nytimes.com" - assert mcp.aspect.owners[0].type == "CUSTOM" - assert mcp.aspect.owners[0].typeUrn == "urn:li:ownershipType:data_producer" - @patch.object(BigQueryV2Config, "get_bigquery_client") def test_simple_upstream_table_generation(get_bq_client_mock): From 7c3e818468d3cef43a1c429b7803fff547f4e071 Mon Sep 17 00:00:00 2001 From: shubhamjagtap639 Date: Thu, 21 Mar 2024 15:44:43 +0530 Subject: [PATCH 04/12] Add capture_owners_from_tags transformer --- metadata-ingestion/setup.py | 1 + .../ingestion/transformer/base_transformer.py | 6 +- .../transformer/capture_ownership_from_tag.py | 120 ++++++++++++++++++ 3 files changed, 124 insertions(+), 3 deletions(-) create mode 100644 metadata-ingestion/src/datahub/ingestion/transformer/capture_ownership_from_tag.py diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 8e8118d6c4e42d..541baf1597879e 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -669,6 +669,7 @@ "add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:AddDatasetDataProduct", "simple_add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:SimpleAddDatasetDataProduct", "pattern_add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:PatternAddDatasetDataProduct", + "capture_owners_from_tags = datahub.ingestion.transformer.capture_ownership_from_tag:CaptureOwnersFromTagsTransformer", ], "datahub.ingestion.sink.plugins": [ "file = datahub.ingestion.sink.file:FileSink", diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py b/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py index fb776ca8d23281..1b0a0687fcc92f 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py @@ -17,7 +17,7 @@ log = logging.getLogger(__name__) -def _update_work_unit_id( +def update_work_unit_id( envelope: RecordEnvelope, urn: str, aspect_name: str ) -> Dict[Any, Any]: structured_urn = Urn.from_string(urn) @@ -216,7 +216,7 @@ def _handle_end_of_stream( ): # to silent the lint error continue - record_metadata = _update_work_unit_id( + record_metadata = update_work_unit_id( envelope=envelope, aspect_name=mcp.aspectName, urn=mcp.entityUrn, @@ -277,7 +277,7 @@ def transform( ) ) - record_metadata = _update_work_unit_id( + record_metadata = update_work_unit_id( envelope=envelope, aspect_name=mcp.aspect.get_aspect_name(), # type: ignore urn=mcp.entityUrn, diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/capture_ownership_from_tag.py b/metadata-ingestion/src/datahub/ingestion/transformer/capture_ownership_from_tag.py new file mode 100644 index 00000000000000..eb2e603af9405d --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/transformer/capture_ownership_from_tag.py @@ -0,0 +1,120 @@ +from typing import Dict, Iterable + +from pydantic import Field + +from datahub.configuration.common import TransformerSemanticsConfigModel +from datahub.emitter.mce_builder import make_ownership_type_urn, make_user_urn +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.api.common import PipelineContext, RecordEnvelope +from datahub.ingestion.api.transform import Transformer +from datahub.ingestion.transformer.base_transformer import update_work_unit_id +from datahub.metadata.schema_classes import ( + GlobalTagsClass, + OwnerClass, + OwnershipClass, + OwnershipTypeClass, +) +from datahub.utilities.urns.tag_urn import TagUrn +from datahub.utilities.urns.urn import Urn + +DEFAULT_TAG_CHAR_MAPPING = { + "_": ".", + "-": "@", + "__": "_", + "--": "-", + "_-": "#", + "-_": " ", +} + + +class CaptureOwnersFromTagsConfig(TransformerSemanticsConfigModel): + tag_character_mapping: Dict[str, str] = Field( + default=DEFAULT_TAG_CHAR_MAPPING, + description="A mapping of tag character to datahub owner character." + "Provided mapping will override default mapping.", + ) + + owner_key_pattern: str = Field( + default="_owner_email", + description="A pattern which defines what identifies an owner label.", + ) + + +class CaptureOwnersFromTagsTransformer(Transformer): + """Transformer that captures owners from tag aspect""" + + ctx: PipelineContext + config: CaptureOwnersFromTagsConfig + + def __init__(self, config: CaptureOwnersFromTagsConfig, ctx: PipelineContext): + super().__init__() + self.ctx = ctx + self.config = config + + @classmethod + def create( + cls, config_dict: dict, ctx: PipelineContext + ) -> "CaptureOwnersFromTagsTransformer": + config = CaptureOwnersFromTagsConfig.parse_obj(config_dict) + return cls(config, ctx) + + def _convert_tag_value(self, value: str) -> str: + for key in sorted( + self.config.tag_character_mapping.keys(), + key=len, + reverse=True, + ): + value = value.replace(key, self.config.tag_character_mapping[key]) + return value + + def transform( + self, record_envelopes: Iterable[RecordEnvelope] + ) -> Iterable[RecordEnvelope]: + for envelope in record_envelopes: + if isinstance(envelope.record, MetadataChangeProposalWrapper): + assert envelope.record.entityUrn + if ( + envelope.record.aspectName == "globalTags" + and envelope.record.aspect + and isinstance(envelope.record.aspect, GlobalTagsClass) + ): + owners_to_add: Dict[str, str] = {} + + for tag_associate in envelope.record.aspect.tags: + tag_urn = Urn.from_string(tag_associate.tag) + assert isinstance(tag_urn, TagUrn) + tag = tag_urn.name.split(":") + if len(tag) != 2: # check if tag is key value pair + continue + if self.config.owner_key_pattern in tag[0]: + owners_to_add[self._convert_tag_value(tag[1])] = tag[ + 0 + ].split(self.config.owner_key_pattern)[0] + + if owners_to_add: + mcp = MetadataChangeProposalWrapper( + entityUrn=envelope.record.entityUrn, + aspect=OwnershipClass( + owners=[ + OwnerClass( + owner=make_user_urn(owner), + type=OwnershipTypeClass.CUSTOM, + typeUrn=make_ownership_type_urn(type), + ) + for owner, type in owners_to_add.items() + ] + ), + ) + + if mcp.aspectName and mcp.entityUrn: # to silent the lint error + record_metadata = update_work_unit_id( + envelope=envelope, + aspect_name=mcp.aspectName, + urn=mcp.entityUrn, + ) + yield RecordEnvelope( + record=mcp, + metadata=record_metadata, + ) + + yield envelope From 863651f399637a6e12342862771333ef7a1a64b7 Mon Sep 17 00:00:00 2001 From: shubhamjagtap639 Date: Fri, 22 Mar 2024 19:58:06 +0530 Subject: [PATCH 05/12] Remove new capture_owners_from_tags and instead extended old extract_ownership_from_tags --- .../docs/transformer/dataset_transformer.md | 6 +- metadata-ingestion/setup.py | 1 - .../ingestion/transformer/base_transformer.py | 6 +- .../transformer/capture_ownership_from_tag.py | 120 ------------------ .../extract_ownership_from_tags.py | 93 ++++++++++---- .../tests/unit/test_transform_dataset.py | 22 ++++ 6 files changed, 99 insertions(+), 149 deletions(-) delete mode 100644 metadata-ingestion/src/datahub/ingestion/transformer/capture_ownership_from_tag.py diff --git a/metadata-ingestion/docs/transformer/dataset_transformer.md b/metadata-ingestion/docs/transformer/dataset_transformer.md index 33ff722a0d0dd6..360fd3ba98ab1d 100644 --- a/metadata-ingestion/docs/transformer/dataset_transformer.md +++ b/metadata-ingestion/docs/transformer/dataset_transformer.md @@ -21,9 +21,11 @@ The below table shows transformer which can transform aspects of entity [Dataset | Field | Required | Type | Default | Description | |-----------------------------|----------|---------|---------------|---------------------------------------------| | `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. | -| `tag_prefix` | | str | | Regex to use for tags to match against. Supports Regex to match a prefix which is used to remove content. Rest of string is considered owner ID for creating owner URN. | -| `is_user` | | bool | `true` | Whether should be consider a user or not. If `false` then considered a group. | +| `tag_pattern` | | str | | Regex to use for tags to match against. Supports Regex to match a pattern which is used to remove content. Rest of string is considered owner ID for creating owner URN. | +| `is_user` | | bool | `true` | Whether should be consider a user or not. If `false` then considered a group. | +| `owner_character_mapping` | | dict[str, str] | | A mapping of extracted owner character to datahub owner character. | | `email_domain` | | str | | If set then this is appended to create owner URN. | +| `extract_owner_type_from_tag_pattern` | | str | `false` | Whether to extract an owner type from provided tag pattern first group. If `true`, no need to provide owner_type and owner_type_urn config.| | `owner_type` | | str | `TECHNICAL_OWNER` | Ownership type. | | `owner_type_urn` | | str | `None` | Set to a custom ownership type's URN if using custom ownership. | diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 541baf1597879e..8e8118d6c4e42d 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -669,7 +669,6 @@ "add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:AddDatasetDataProduct", "simple_add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:SimpleAddDatasetDataProduct", "pattern_add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:PatternAddDatasetDataProduct", - "capture_owners_from_tags = datahub.ingestion.transformer.capture_ownership_from_tag:CaptureOwnersFromTagsTransformer", ], "datahub.ingestion.sink.plugins": [ "file = datahub.ingestion.sink.file:FileSink", diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py b/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py index 1b0a0687fcc92f..fb776ca8d23281 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py @@ -17,7 +17,7 @@ log = logging.getLogger(__name__) -def update_work_unit_id( +def _update_work_unit_id( envelope: RecordEnvelope, urn: str, aspect_name: str ) -> Dict[Any, Any]: structured_urn = Urn.from_string(urn) @@ -216,7 +216,7 @@ def _handle_end_of_stream( ): # to silent the lint error continue - record_metadata = update_work_unit_id( + record_metadata = _update_work_unit_id( envelope=envelope, aspect_name=mcp.aspectName, urn=mcp.entityUrn, @@ -277,7 +277,7 @@ def transform( ) ) - record_metadata = update_work_unit_id( + record_metadata = _update_work_unit_id( envelope=envelope, aspect_name=mcp.aspect.get_aspect_name(), # type: ignore urn=mcp.entityUrn, diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/capture_ownership_from_tag.py b/metadata-ingestion/src/datahub/ingestion/transformer/capture_ownership_from_tag.py deleted file mode 100644 index eb2e603af9405d..00000000000000 --- a/metadata-ingestion/src/datahub/ingestion/transformer/capture_ownership_from_tag.py +++ /dev/null @@ -1,120 +0,0 @@ -from typing import Dict, Iterable - -from pydantic import Field - -from datahub.configuration.common import TransformerSemanticsConfigModel -from datahub.emitter.mce_builder import make_ownership_type_urn, make_user_urn -from datahub.emitter.mcp import MetadataChangeProposalWrapper -from datahub.ingestion.api.common import PipelineContext, RecordEnvelope -from datahub.ingestion.api.transform import Transformer -from datahub.ingestion.transformer.base_transformer import update_work_unit_id -from datahub.metadata.schema_classes import ( - GlobalTagsClass, - OwnerClass, - OwnershipClass, - OwnershipTypeClass, -) -from datahub.utilities.urns.tag_urn import TagUrn -from datahub.utilities.urns.urn import Urn - -DEFAULT_TAG_CHAR_MAPPING = { - "_": ".", - "-": "@", - "__": "_", - "--": "-", - "_-": "#", - "-_": " ", -} - - -class CaptureOwnersFromTagsConfig(TransformerSemanticsConfigModel): - tag_character_mapping: Dict[str, str] = Field( - default=DEFAULT_TAG_CHAR_MAPPING, - description="A mapping of tag character to datahub owner character." - "Provided mapping will override default mapping.", - ) - - owner_key_pattern: str = Field( - default="_owner_email", - description="A pattern which defines what identifies an owner label.", - ) - - -class CaptureOwnersFromTagsTransformer(Transformer): - """Transformer that captures owners from tag aspect""" - - ctx: PipelineContext - config: CaptureOwnersFromTagsConfig - - def __init__(self, config: CaptureOwnersFromTagsConfig, ctx: PipelineContext): - super().__init__() - self.ctx = ctx - self.config = config - - @classmethod - def create( - cls, config_dict: dict, ctx: PipelineContext - ) -> "CaptureOwnersFromTagsTransformer": - config = CaptureOwnersFromTagsConfig.parse_obj(config_dict) - return cls(config, ctx) - - def _convert_tag_value(self, value: str) -> str: - for key in sorted( - self.config.tag_character_mapping.keys(), - key=len, - reverse=True, - ): - value = value.replace(key, self.config.tag_character_mapping[key]) - return value - - def transform( - self, record_envelopes: Iterable[RecordEnvelope] - ) -> Iterable[RecordEnvelope]: - for envelope in record_envelopes: - if isinstance(envelope.record, MetadataChangeProposalWrapper): - assert envelope.record.entityUrn - if ( - envelope.record.aspectName == "globalTags" - and envelope.record.aspect - and isinstance(envelope.record.aspect, GlobalTagsClass) - ): - owners_to_add: Dict[str, str] = {} - - for tag_associate in envelope.record.aspect.tags: - tag_urn = Urn.from_string(tag_associate.tag) - assert isinstance(tag_urn, TagUrn) - tag = tag_urn.name.split(":") - if len(tag) != 2: # check if tag is key value pair - continue - if self.config.owner_key_pattern in tag[0]: - owners_to_add[self._convert_tag_value(tag[1])] = tag[ - 0 - ].split(self.config.owner_key_pattern)[0] - - if owners_to_add: - mcp = MetadataChangeProposalWrapper( - entityUrn=envelope.record.entityUrn, - aspect=OwnershipClass( - owners=[ - OwnerClass( - owner=make_user_urn(owner), - type=OwnershipTypeClass.CUSTOM, - typeUrn=make_ownership_type_urn(type), - ) - for owner, type in owners_to_add.items() - ] - ), - ) - - if mcp.aspectName and mcp.entityUrn: # to silent the lint error - record_metadata = update_work_unit_id( - envelope=envelope, - aspect_name=mcp.aspectName, - urn=mcp.entityUrn, - ) - yield RecordEnvelope( - record=mcp, - metadata=record_metadata, - ) - - yield envelope diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/extract_ownership_from_tags.py b/metadata-ingestion/src/datahub/ingestion/transformer/extract_ownership_from_tags.py index c4eba0e011de3d..1d95323c2d90cb 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/extract_ownership_from_tags.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/extract_ownership_from_tags.py @@ -1,10 +1,12 @@ import logging import re from functools import lru_cache -from typing import List, Optional, Sequence, Union, cast +from typing import Dict, List, Optional, Sequence, Union, cast + +from pydantic.class_validators import root_validator from datahub.configuration.common import TransformerSemanticsConfigModel -from datahub.emitter.mce_builder import Aspect +from datahub.emitter.mce_builder import Aspect, make_ownership_type_urn from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.transformer.dataset_transformer import DatasetTagsTransformer @@ -23,12 +25,33 @@ class ExtractOwnersFromTagsConfig(TransformerSemanticsConfigModel): - tag_prefix: str + tag_prefix: Optional[str] + tag_pattern: Optional[str] is_user: bool = True + owner_character_mapping: Optional[Dict[str, str]] = None email_domain: Optional[str] = None + extract_owner_type_from_tag_pattern: bool = False owner_type: str = "TECHNICAL_OWNER" owner_type_urn: Optional[str] = None + @root_validator(pre=True) + def raise_error_for_tag_prefix(cls, values: Dict) -> Dict: + if ( + values.get("tag_prefix") is not None + and values.get("tag_pattern") is not None + ): + raise ValueError( + "Cannot provide both tag_prefix and tag_pattern parameter. tag_prefix is deprecated in favor of tag_pattern." + ) + if values.get("tag_pattern") is None and values.get("tag_prefix") is None: + raise ValueError("tag_pattern is required") + if values.get("tag_prefix") is not None: + logger.warning( + "The tag_prefix argument is deprecated. Use tag_pattern instead." + ) + values["tag_pattern"] = values["tag_prefix"] + return values + @lru_cache(maxsize=10) def get_owner_type(owner_type_str: str) -> str: @@ -63,6 +86,16 @@ def get_owner_urn(self, owner_str: str) -> str: return owner_str + "@" + self.config.email_domain return owner_str + def convert_owner_as_per_mapping(self, owner: str) -> str: + if self.config.owner_character_mapping: + for key in sorted( + self.config.owner_character_mapping.keys(), + key=len, + reverse=True, + ): + owner = owner.replace(key, self.config.owner_character_mapping[key]) + return owner + def handle_end_of_stream( self, ) -> Sequence[Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]]: @@ -79,29 +112,41 @@ def transform_aspect( owners: List[OwnerClass] = [] for tag_class in tags: - tag_urn = TagUrn.from_string(tag_class.tag) - tag_str = tag_urn.entity_ids[0] - re_match = re.search(self.config.tag_prefix, tag_str) + tag_str = TagUrn.from_string(tag_class.tag).name + re_match = re.search(cast(str, self.config.tag_pattern), tag_str) if re_match: owner_str = tag_str[re_match.end() :].strip() + owner_str = self.convert_owner_as_per_mapping(owner_str) owner_urn_str = self.get_owner_urn(owner_str) - if self.config.is_user: - owner_urn = str(CorpuserUrn(owner_urn_str)) + owner_urn = ( + str(CorpuserUrn(owner_urn_str)) + if self.config.is_user + else str(CorpGroupUrn(owner_urn_str)) + ) + + if self.config.extract_owner_type_from_tag_pattern: + if re_match.groups(): + owners.append( + OwnerClass( + owner=owner_urn, + type=OwnershipTypeClass.CUSTOM, + typeUrn=make_ownership_type_urn(re_match.group(1)), + ) + ) else: - owner_urn = str(CorpGroupUrn(owner_urn_str)) - owner_type = get_owner_type(self.config.owner_type) - if owner_type == OwnershipTypeClass.CUSTOM: - assert ( - self.config.owner_type_urn is not None - ), "owner_type_urn must be set if owner_type is CUSTOM" - - owners.append( - OwnerClass( - owner=owner_urn, - type=owner_type, - typeUrn=self.config.owner_type_urn, + owner_type = get_owner_type(self.config.owner_type) + if owner_type == OwnershipTypeClass.CUSTOM: + assert ( + self.config.owner_type_urn is not None + ), "owner_type_urn must be set if owner_type is CUSTOM" + + owners.append( + OwnerClass( + owner=owner_urn, + type=owner_type, + typeUrn=self.config.owner_type_urn, + ) ) - ) self.owner_mcps.append( MetadataChangeProposalWrapper( @@ -111,5 +156,7 @@ def transform_aspect( ), ) ) - - return None + if not self.config.replace_existing: + return aspect + else: + return None diff --git a/metadata-ingestion/tests/unit/test_transform_dataset.py b/metadata-ingestion/tests/unit/test_transform_dataset.py index 2a6176906a0c3e..69761a2a4cb76c 100644 --- a/metadata-ingestion/tests/unit/test_transform_dataset.py +++ b/metadata-ingestion/tests/unit/test_transform_dataset.py @@ -640,6 +640,7 @@ def _test_owner( config: Dict, expected_owner: str, expected_owner_type: Optional[str] = None, + expected_owner_type_urn: Optional[str] = None, ) -> None: dataset = make_generic_dataset( aspects=[ @@ -679,6 +680,8 @@ def _test_owner( assert owner.owner == expected_owner + assert owner.typeUrn == expected_owner_type_urn + _test_owner( tag="owner:foo", config={ @@ -733,6 +736,25 @@ def _test_owner( }, expected_owner="urn:li:corpuser:foo@example.com", expected_owner_type=OwnershipTypeClass.CUSTOM, + expected_owner_type_urn="urn:li:ownershipType:ad8557d6-dcb9-4d2a-83fc-b7d0d54f3e0f", + ) + _test_owner( + tag="data_producer_owner_email:games.team@nytimes.com", + config={ + "tag_pattern": "(.*)_owner_email:", + "owner_character_mapping": { + "_": ".", + "-": "@", + "__": "_", + "--": "-", + "_-": "#", + "-_": " ", + }, + "extract_owner_type_from_tag_pattern": True, + }, + expected_owner="urn:li:corpuser:games.team@nytimes.com", + expected_owner_type=OwnershipTypeClass.CUSTOM, + expected_owner_type_urn="urn:li:ownershipType:data_producer", ) From 23060861c92df7bcab2b256fc531554270ae07f4 Mon Sep 17 00:00:00 2001 From: shubhamjagtap639 Date: Tue, 26 Mar 2024 13:41:01 +0530 Subject: [PATCH 06/12] Address review comments --- .../docs/transformer/dataset_transformer.md | 3 +- .../extract_ownership_from_tags.py | 9 ++-- .../tests/unit/test_bigquery_source.py | 48 ++++++++----------- 3 files changed, 23 insertions(+), 37 deletions(-) diff --git a/metadata-ingestion/docs/transformer/dataset_transformer.md b/metadata-ingestion/docs/transformer/dataset_transformer.md index 360fd3ba98ab1d..0cc2d548c7d003 100644 --- a/metadata-ingestion/docs/transformer/dataset_transformer.md +++ b/metadata-ingestion/docs/transformer/dataset_transformer.md @@ -20,12 +20,11 @@ The below table shows transformer which can transform aspects of entity [Dataset ### Config Details | Field | Required | Type | Default | Description | |-----------------------------|----------|---------|---------------|---------------------------------------------| -| `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. | | `tag_pattern` | | str | | Regex to use for tags to match against. Supports Regex to match a pattern which is used to remove content. Rest of string is considered owner ID for creating owner URN. | | `is_user` | | bool | `true` | Whether should be consider a user or not. If `false` then considered a group. | | `owner_character_mapping` | | dict[str, str] | | A mapping of extracted owner character to datahub owner character. | | `email_domain` | | str | | If set then this is appended to create owner URN. | -| `extract_owner_type_from_tag_pattern` | | str | `false` | Whether to extract an owner type from provided tag pattern first group. If `true`, no need to provide owner_type and owner_type_urn config.| +| `extract_owner_type_from_tag_pattern` | | str | `false` | Whether to extract an owner type from provided tag pattern first group. If `true`, no need to provide owner_type and owner_type_urn config. For example: if provided tag pattern is `(.*)_owner_email:` and actual tag is `developer_owner_email`, then extracted owner type will be `developer`.| | `owner_type` | | str | `TECHNICAL_OWNER` | Ownership type. | | `owner_type_urn` | | str | `None` | Set to a custom ownership type's URN if using custom ownership. | diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/extract_ownership_from_tags.py b/metadata-ingestion/src/datahub/ingestion/transformer/extract_ownership_from_tags.py index 1d95323c2d90cb..bfd299507136ed 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/extract_ownership_from_tags.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/extract_ownership_from_tags.py @@ -5,7 +5,7 @@ from pydantic.class_validators import root_validator -from datahub.configuration.common import TransformerSemanticsConfigModel +from datahub.configuration.common import ConfigModel from datahub.emitter.mce_builder import Aspect, make_ownership_type_urn from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.common import PipelineContext @@ -24,7 +24,7 @@ logger = logging.getLogger(__name__) -class ExtractOwnersFromTagsConfig(TransformerSemanticsConfigModel): +class ExtractOwnersFromTagsConfig(ConfigModel): tag_prefix: Optional[str] tag_pattern: Optional[str] is_user: bool = True @@ -156,7 +156,4 @@ def transform_aspect( ), ) ) - if not self.config.replace_existing: - return aspect - else: - return None + return aspect diff --git a/metadata-ingestion/tests/unit/test_bigquery_source.py b/metadata-ingestion/tests/unit/test_bigquery_source.py index 2af9f9478bb938..426d4dc12f2086 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_source.py +++ b/metadata-ingestion/tests/unit/test_bigquery_source.py @@ -408,41 +408,31 @@ def test_gen_table_dataset_workunits(get_bq_client_mock, bigquery_table): assert mcp.aspect.fields == [] mcp = cast(MetadataChangeProposalClass, next(iter(gen)).metadata) - dataset_properties = cast(DatasetPropertiesClass, mcp.aspect) - assert dataset_properties.name == bigquery_table.name + assert isinstance(mcp.aspect, DatasetPropertiesClass) + assert mcp.aspect.name == bigquery_table.name assert ( - dataset_properties.qualifiedName - == f"{project_id}.{dataset_name}.{bigquery_table.name}" + mcp.aspect.qualifiedName == f"{project_id}.{dataset_name}.{bigquery_table.name}" ) - assert dataset_properties.description == bigquery_table.comment - assert dataset_properties.created == TimeStampClass( + assert mcp.aspect.description == bigquery_table.comment + assert mcp.aspect.created == TimeStampClass( time=int(bigquery_table.created.timestamp() * 1000) ) - assert dataset_properties.lastModified == TimeStampClass( + assert mcp.aspect.lastModified == TimeStampClass( time=int(bigquery_table.last_altered.timestamp() * 1000) ) - assert dataset_properties.tags == [] - assert dataset_properties.customProperties["expiration_date"] == str( - bigquery_table.expires - ) - assert dataset_properties.customProperties["size_in_bytes"] == str( - bigquery_table.size_in_bytes - ) - assert dataset_properties.customProperties["billable_bytes_active"] == str( - bigquery_table.active_billable_bytes - ) - assert dataset_properties.customProperties["billable_bytes_long_term"] == str( - bigquery_table.long_term_billable_bytes - ) - assert dataset_properties.customProperties["number_of_partitions"] == str( - bigquery_table.num_partitions - ) - assert dataset_properties.customProperties["max_partition_id"] == str( - bigquery_table.max_partition_id - ) - assert dataset_properties.customProperties["max_shard_id"] == str( - bigquery_table.max_shard_id - ) + assert mcp.aspect.tags == [] + + assert mcp.aspect.customProperties == { + "expiration_date": str(bigquery_table.expires), + "size_in_bytes": str(bigquery_table.size_in_bytes), + "billable_bytes_active": str(bigquery_table.active_billable_bytes), + "billable_bytes_long_term": str(bigquery_table.long_term_billable_bytes), + "number_of_partitions": str(bigquery_table.num_partitions), + "max_partition_id": str(bigquery_table.max_partition_id), + "is_partitioned": "True", + "max_shard_id": str(bigquery_table.max_shard_id), + "is_sharded": "True", + } mcp = cast(MetadataChangeProposalClass, next(iter(gen)).metadata) assert isinstance(mcp.aspect, GlobalTagsClass) From 4beb576d0aab0ad5275bcd39651cecb02eab3ae8 Mon Sep 17 00:00:00 2001 From: shubhamjagtap639 Date: Tue, 26 Mar 2024 13:47:07 +0530 Subject: [PATCH 07/12] Modify test case data --- metadata-ingestion/tests/unit/test_transform_dataset.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/tests/unit/test_transform_dataset.py b/metadata-ingestion/tests/unit/test_transform_dataset.py index 69761a2a4cb76c..01ec6f07e545f8 100644 --- a/metadata-ingestion/tests/unit/test_transform_dataset.py +++ b/metadata-ingestion/tests/unit/test_transform_dataset.py @@ -739,7 +739,7 @@ def _test_owner( expected_owner_type_urn="urn:li:ownershipType:ad8557d6-dcb9-4d2a-83fc-b7d0d54f3e0f", ) _test_owner( - tag="data_producer_owner_email:games.team@nytimes.com", + tag="data_producer_owner_email:abc_xyz-email_com", config={ "tag_pattern": "(.*)_owner_email:", "owner_character_mapping": { @@ -752,7 +752,7 @@ def _test_owner( }, "extract_owner_type_from_tag_pattern": True, }, - expected_owner="urn:li:corpuser:games.team@nytimes.com", + expected_owner="urn:li:corpuser:abc.xyz@email.com", expected_owner_type=OwnershipTypeClass.CUSTOM, expected_owner_type_urn="urn:li:ownershipType:data_producer", ) From 5bbff7b5258f097280738f7e7281dfe83c18aaaf Mon Sep 17 00:00:00 2001 From: shubhamjagtap639 Date: Fri, 29 Mar 2024 00:27:05 +0530 Subject: [PATCH 08/12] Address review comments --- .../docs/transformer/dataset_transformer.md | 55 +++++++++++++++++-- .../extract_ownership_from_tags.py | 31 +++-------- 2 files changed, 59 insertions(+), 27 deletions(-) diff --git a/metadata-ingestion/docs/transformer/dataset_transformer.md b/metadata-ingestion/docs/transformer/dataset_transformer.md index 0cc2d548c7d003..6d53293bd1cdac 100644 --- a/metadata-ingestion/docs/transformer/dataset_transformer.md +++ b/metadata-ingestion/docs/transformer/dataset_transformer.md @@ -20,7 +20,7 @@ The below table shows transformer which can transform aspects of entity [Dataset ### Config Details | Field | Required | Type | Default | Description | |-----------------------------|----------|---------|---------------|---------------------------------------------| -| `tag_pattern` | | str | | Regex to use for tags to match against. Supports Regex to match a pattern which is used to remove content. Rest of string is considered owner ID for creating owner URN. | +| `tag_pattern` | ✅ | str | | Regex to use for tags to match against. Supports Regex to match a pattern which is used to remove content. Rest of string is considered owner ID for creating owner URN. | | `is_user` | | bool | `true` | Whether should be consider a user or not. If `false` then considered a group. | | `owner_character_mapping` | | dict[str, str] | | A mapping of extracted owner character to datahub owner character. | | `email_domain` | | str | | If set then this is appended to create owner URN. | @@ -28,17 +28,62 @@ The below table shows transformer which can transform aspects of entity [Dataset | `owner_type` | | str | `TECHNICAL_OWNER` | Ownership type. | | `owner_type_urn` | | str | `None` | Set to a custom ownership type's URN if using custom ownership. | -Matches against a tag prefix and considers string in tags after that prefix as owner to create ownership. +Let’s suppose we’d like to add a dataset ownerships based on part of dataset tags. To do so, we can use the `extract_ownership_from_tags` transformer that’s included in the ingestion framework. + +The config, which we’d append to our ingestion recipe YAML, would look like this: ```yaml transformers: - type: "extract_ownership_from_tags" config: - tag_prefix: "dbt:techno-genie:" - is_user: true - email_domain: "coolcompany.com" + tag_pattern: "owner_email:" ``` +So if we have input dataset tag like +- `urn:li:tag:dataset_owner_email:abc@email.com` +- `urn:li:tag:dataset_owner_email:xyz@email.com` + +String after the matched tag pattern will be considered as owner to create ownership. Hence an owners called `abc@email.com` and `xyz@email.com` will be added to them respectively. + +`extract_ownership_from_tags` can be configured in below different way + +- Add owners, however owner should be considered as group and also email domain not provided in tag string + ```yaml + transformers: + - type: "extract_ownership_from_tags" + config: + tag_pattern: "owner_email:" + is_user: false + email_domain: "email.com" + ``` +- Add owners, however owner type and owner type urn wanted to provide externally + ```yaml + transformers: + - type: "extract_ownership_from_tags" + config: + tag_pattern: "owner_email:" + owner_type: "CUSTOM" + owner_type_urn: "urn:li:ownershipType:data_product" + ``` +- Add owners, however some owner characters needs to replace with some other characters before ingestion. For example: `abc_xyz-email_com` owner should get convert to `abc.xyz@email.com`. In this case the config would look like this: + ```yaml + transformers: + - type: "extract_ownership_from_tags" + config: + tag_pattern: "owner_email:" + owner_character_mapping: + "_": ".", + "-": "@", + ``` +- Add owners, however owner type also need to extracted from tag pattern. For example: from tag urn `urn:li:tag:data_producer_owner_email:abc@email.com` owner type `data_producer` should get extracted. In this case the config would look like this: + ```yaml + transformers: + - type: "extract_ownership_from_tags" + config: + tag_pattern: "(.*)_owner_email:" + extract_owner_type_from_tag_pattern: true + ``` + ## Mark Dataset Status ### Config Details | Field | Required | Type | Default | Description | diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/extract_ownership_from_tags.py b/metadata-ingestion/src/datahub/ingestion/transformer/extract_ownership_from_tags.py index bfd299507136ed..d356aebc808f53 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/extract_ownership_from_tags.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/extract_ownership_from_tags.py @@ -3,9 +3,8 @@ from functools import lru_cache from typing import Dict, List, Optional, Sequence, Union, cast -from pydantic.class_validators import root_validator - from datahub.configuration.common import ConfigModel +from datahub.configuration.validate_field_rename import pydantic_renamed_field from datahub.emitter.mce_builder import Aspect, make_ownership_type_urn from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.common import PipelineContext @@ -25,8 +24,7 @@ class ExtractOwnersFromTagsConfig(ConfigModel): - tag_prefix: Optional[str] - tag_pattern: Optional[str] + tag_pattern: str is_user: bool = True owner_character_mapping: Optional[Dict[str, str]] = None email_domain: Optional[str] = None @@ -34,23 +32,9 @@ class ExtractOwnersFromTagsConfig(ConfigModel): owner_type: str = "TECHNICAL_OWNER" owner_type_urn: Optional[str] = None - @root_validator(pre=True) - def raise_error_for_tag_prefix(cls, values: Dict) -> Dict: - if ( - values.get("tag_prefix") is not None - and values.get("tag_pattern") is not None - ): - raise ValueError( - "Cannot provide both tag_prefix and tag_pattern parameter. tag_prefix is deprecated in favor of tag_pattern." - ) - if values.get("tag_pattern") is None and values.get("tag_prefix") is None: - raise ValueError("tag_pattern is required") - if values.get("tag_prefix") is not None: - logger.warning( - "The tag_prefix argument is deprecated. Use tag_pattern instead." - ) - values["tag_pattern"] = values["tag_prefix"] - return values + _rename_tag_prefix_to_tag_pattern = pydantic_renamed_field( + "tag_prefix", "tag_pattern" + ) @lru_cache(maxsize=10) @@ -88,6 +72,9 @@ def get_owner_urn(self, owner_str: str) -> str: def convert_owner_as_per_mapping(self, owner: str) -> str: if self.config.owner_character_mapping: + # Sort the provided mapping by its length. + # Eg: Suppose we have {"_":".", "__":"#"} character mapping. + # In this case "__" character should get replace first compare to "_" character. for key in sorted( self.config.owner_character_mapping.keys(), key=len, @@ -113,7 +100,7 @@ def transform_aspect( for tag_class in tags: tag_str = TagUrn.from_string(tag_class.tag).name - re_match = re.search(cast(str, self.config.tag_pattern), tag_str) + re_match = re.search(self.config.tag_pattern, tag_str) if re_match: owner_str = tag_str[re_match.end() :].strip() owner_str = self.convert_owner_as_per_mapping(owner_str) From d0cec72667a03cd807126e420a2edd97c1509206 Mon Sep 17 00:00:00 2001 From: shubhamjagtap639 Date: Fri, 29 Mar 2024 01:14:50 +0530 Subject: [PATCH 09/12] Address review comments --- .../docs/transformer/dataset_transformer.md | 16 ++++++++-------- .../transformer/extract_ownership_from_tags.py | 2 +- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/metadata-ingestion/docs/transformer/dataset_transformer.md b/metadata-ingestion/docs/transformer/dataset_transformer.md index 8136bea08f8472..0acc134d4ef00b 100644 --- a/metadata-ingestion/docs/transformer/dataset_transformer.md +++ b/metadata-ingestion/docs/transformer/dataset_transformer.md @@ -20,7 +20,7 @@ The below table shows transformer which can transform aspects of entity [Dataset ### Config Details | Field | Required | Type | Default | Description | |-----------------------------|----------|---------|---------------|---------------------------------------------| -| `tag_pattern` | ✅ | str | | Regex to use for tags to match against. Supports Regex to match a pattern which is used to remove content. Rest of string is considered owner ID for creating owner URN. | +| `tag_pattern` | | str | | Regex to use for tags to match against. Supports Regex to match a pattern which is used to remove content. Rest of string is considered owner ID for creating owner URN. | | `is_user` | | bool | `true` | Whether should be consider a user or not. If `false` then considered a group. | | `owner_character_mapping` | | dict[str, str] | | A mapping of extracted owner character to datahub owner character. | | `email_domain` | | str | | If set then this is appended to create owner URN. | @@ -43,20 +43,20 @@ So if we have input dataset tag like - `urn:li:tag:dataset_owner_email:abc@email.com` - `urn:li:tag:dataset_owner_email:xyz@email.com` -String after the matched tag pattern will be considered as owner to create ownership. Hence an owners called `abc@email.com` and `xyz@email.com` will be added to them respectively. +The portion of the tag after the matched tag pattern will be converted into an owner. Hence users `abc@email.com` and `xyz@email.com` will be added as owners. -`extract_ownership_from_tags` can be configured in below different way +### Examples -- Add owners, however owner should be considered as group and also email domain not provided in tag string +- Add owners, however owner should be considered as group and also email domain not provided in tag string. For example: from tag urn `urn:li:tag:dataset_owner:abc` extracted owner urn should be `urn:li:corpGroup:abc@email.com` then config would look like this: ```yaml transformers: - type: "extract_ownership_from_tags" config: - tag_pattern: "owner_email:" + tag_pattern: "owner:" is_user: false email_domain: "email.com" ``` -- Add owners, however owner type and owner type urn wanted to provide externally +- Add owners, however owner type and owner type urn wanted to provide externally. For example: from tag urn `urn:li:tag:dataset_owner_email:abc@email.com` owner type should be `CUSTOM` and owner type urn as `"urn:li:ownershipType:data_product"` then config would look like this: ```yaml transformers: - type: "extract_ownership_from_tags" @@ -65,7 +65,7 @@ String after the matched tag pattern will be considered as owner to create owner owner_type: "CUSTOM" owner_type_urn: "urn:li:ownershipType:data_product" ``` -- Add owners, however some owner characters needs to replace with some other characters before ingestion. For example: `abc_xyz-email_com` owner should get convert to `abc.xyz@email.com`. In this case the config would look like this: +- Add owners, however some owner characters needs to replace with some other characters before ingestion. For example: from tag urn `urn:li:tag:dataset_owner_email:abc_xyz-email_com` extracted owner urn should be `urn:li:corpGroup:abc.xyz@email.com` then config would look like this: ```yaml transformers: - type: "extract_ownership_from_tags" @@ -75,7 +75,7 @@ String after the matched tag pattern will be considered as owner to create owner "_": ".", "-": "@", ``` -- Add owners, however owner type also need to extracted from tag pattern. For example: from tag urn `urn:li:tag:data_producer_owner_email:abc@email.com` owner type `data_producer` should get extracted. In this case the config would look like this: +- Add owners, however owner type also need to extracted from tag pattern. For example: from tag urn `urn:li:tag:data_producer_owner_email:abc@email.com` extracted owner type should be `data_producer` then config would look like this: ```yaml transformers: - type: "extract_ownership_from_tags" diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/extract_ownership_from_tags.py b/metadata-ingestion/src/datahub/ingestion/transformer/extract_ownership_from_tags.py index d356aebc808f53..e509b4b719166b 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/extract_ownership_from_tags.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/extract_ownership_from_tags.py @@ -24,7 +24,7 @@ class ExtractOwnersFromTagsConfig(ConfigModel): - tag_pattern: str + tag_pattern: str = "" is_user: bool = True owner_character_mapping: Optional[Dict[str, str]] = None email_domain: Optional[str] = None From cadb48934193059ca74445b7f82ff21333afa6ee Mon Sep 17 00:00:00 2001 From: shubhamjagtap639 Date: Thu, 4 Apr 2024 13:02:35 +0530 Subject: [PATCH 10/12] Add code to handle overlapping replacement while converting tag as per mapping --- .../extract_ownership_from_tags.py | 39 +++++++++++++------ .../tests/unit/test_transform_dataset.py | 8 ++-- 2 files changed, 31 insertions(+), 16 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/extract_ownership_from_tags.py b/metadata-ingestion/src/datahub/ingestion/transformer/extract_ownership_from_tags.py index e509b4b719166b..397a74f38a3161 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/extract_ownership_from_tags.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/extract_ownership_from_tags.py @@ -26,7 +26,7 @@ class ExtractOwnersFromTagsConfig(ConfigModel): tag_pattern: str = "" is_user: bool = True - owner_character_mapping: Optional[Dict[str, str]] = None + tag_character_mapping: Optional[Dict[str, str]] = None email_domain: Optional[str] = None extract_owner_type_from_tag_pattern: bool = False owner_type: str = "TECHNICAL_OWNER" @@ -70,18 +70,35 @@ def get_owner_urn(self, owner_str: str) -> str: return owner_str + "@" + self.config.email_domain return owner_str - def convert_owner_as_per_mapping(self, owner: str) -> str: - if self.config.owner_character_mapping: - # Sort the provided mapping by its length. - # Eg: Suppose we have {"_":".", "__":"#"} character mapping. - # In this case "__" character should get replace first compare to "_" character. - for key in sorted( - self.config.owner_character_mapping.keys(), + def convert_tag_as_per_mapping(self, tag: str) -> str: + """ + Function to modify tag as per provided tag character mapping. It also handles the overlappings in the mapping. + Eg: '--':'-' & '-':'@' should not cause incorrect mapping. + """ + if self.config.tag_character_mapping: + # indices list to keep track of the indices where replacements have been made + indices = [] + for old_char in sorted( + self.config.tag_character_mapping.keys(), key=len, reverse=True, ): - owner = owner.replace(key, self.config.owner_character_mapping[key]) - return owner + new_char = self.config.tag_character_mapping[old_char] + index = tag.find(old_char) + while index != -1: + if index not in indices: + tag = tag[:index] + new_char + tag[index + len(old_char) :] + # Adjust indices for overlapping replacements + indices = [ + each + (len(new_char) - len(old_char)) + if each > index + else each + for each in indices + ] + indices.append(index) + # Find the next occurrence of old_char, starting from the next index + index = tag.find(old_char, index + len(new_char)) + return tag def handle_end_of_stream( self, @@ -100,10 +117,10 @@ def transform_aspect( for tag_class in tags: tag_str = TagUrn.from_string(tag_class.tag).name + tag_str = self.convert_tag_as_per_mapping(tag_str) re_match = re.search(self.config.tag_pattern, tag_str) if re_match: owner_str = tag_str[re_match.end() :].strip() - owner_str = self.convert_owner_as_per_mapping(owner_str) owner_urn_str = self.get_owner_urn(owner_str) owner_urn = ( str(CorpuserUrn(owner_urn_str)) diff --git a/metadata-ingestion/tests/unit/test_transform_dataset.py b/metadata-ingestion/tests/unit/test_transform_dataset.py index c31ec12abfbd71..3782eb0e275f31 100644 --- a/metadata-ingestion/tests/unit/test_transform_dataset.py +++ b/metadata-ingestion/tests/unit/test_transform_dataset.py @@ -742,20 +742,18 @@ def _test_owner( expected_owner_type_urn="urn:li:ownershipType:ad8557d6-dcb9-4d2a-83fc-b7d0d54f3e0f", ) _test_owner( - tag="data_producer_owner_email:abc_xyz-email_com", + tag="data__producer__owner__email:abc--xyz-email_com", config={ "tag_pattern": "(.*)_owner_email:", - "owner_character_mapping": { + "tag_character_mapping": { "_": ".", "-": "@", "__": "_", "--": "-", - "_-": "#", - "-_": " ", }, "extract_owner_type_from_tag_pattern": True, }, - expected_owner="urn:li:corpuser:abc.xyz@email.com", + expected_owner="urn:li:corpuser:abc-xyz@email.com", expected_owner_type=OwnershipTypeClass.CUSTOM, expected_owner_type_urn="urn:li:ownershipType:data_producer", ) From bb2e243e73ad3770f36eca374d23a461f0cea793 Mon Sep 17 00:00:00 2001 From: shubhamjagtap639 Date: Thu, 4 Apr 2024 13:03:06 +0530 Subject: [PATCH 11/12] Update the dataset transformer doc --- .../docs/transformer/dataset_transformer.md | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/metadata-ingestion/docs/transformer/dataset_transformer.md b/metadata-ingestion/docs/transformer/dataset_transformer.md index 0acc134d4ef00b..5421a932daccee 100644 --- a/metadata-ingestion/docs/transformer/dataset_transformer.md +++ b/metadata-ingestion/docs/transformer/dataset_transformer.md @@ -22,7 +22,7 @@ The below table shows transformer which can transform aspects of entity [Dataset |-----------------------------|----------|---------|---------------|---------------------------------------------| | `tag_pattern` | | str | | Regex to use for tags to match against. Supports Regex to match a pattern which is used to remove content. Rest of string is considered owner ID for creating owner URN. | | `is_user` | | bool | `true` | Whether should be consider a user or not. If `false` then considered a group. | -| `owner_character_mapping` | | dict[str, str] | | A mapping of extracted owner character to datahub owner character. | +| `tag_character_mapping` | | dict[str, str] | | A mapping of tag character to datahub owner character. If provided, `tag_pattern` config should be matched against converted tag as per mapping| | `email_domain` | | str | | If set then this is appended to create owner URN. | | `extract_owner_type_from_tag_pattern` | | str | `false` | Whether to extract an owner type from provided tag pattern first group. If `true`, no need to provide owner_type and owner_type_urn config. For example: if provided tag pattern is `(.*)_owner_email:` and actual tag is `developer_owner_email`, then extracted owner type will be `developer`.| | `owner_type` | | str | `TECHNICAL_OWNER` | Ownership type. | @@ -40,14 +40,14 @@ transformers: ``` So if we have input dataset tag like -- `urn:li:tag:dataset_owner_email:abc@email.com` -- `urn:li:tag:dataset_owner_email:xyz@email.com` +- `urn:li:tag:owner_email:abc@email.com` +- `urn:li:tag:owner_email:xyz@email.com` The portion of the tag after the matched tag pattern will be converted into an owner. Hence users `abc@email.com` and `xyz@email.com` will be added as owners. ### Examples -- Add owners, however owner should be considered as group and also email domain not provided in tag string. For example: from tag urn `urn:li:tag:dataset_owner:abc` extracted owner urn should be `urn:li:corpGroup:abc@email.com` then config would look like this: +- Add owners, however owner should be considered as group and also email domain not provided in tag string. For example: from tag urn `urn:li:tag:owner:abc` extracted owner urn should be `urn:li:corpGroup:abc@email.com` then config would look like this: ```yaml transformers: - type: "extract_ownership_from_tags" @@ -56,7 +56,7 @@ The portion of the tag after the matched tag pattern will be converted into an o is_user: false email_domain: "email.com" ``` -- Add owners, however owner type and owner type urn wanted to provide externally. For example: from tag urn `urn:li:tag:dataset_owner_email:abc@email.com` owner type should be `CUSTOM` and owner type urn as `"urn:li:ownershipType:data_product"` then config would look like this: +- Add owners, however owner type and owner type urn wanted to provide externally. For example: from tag urn `urn:li:tag:owner_email:abc@email.com` owner type should be `CUSTOM` and owner type urn as `"urn:li:ownershipType:data_product"` then config would look like this: ```yaml transformers: - type: "extract_ownership_from_tags" @@ -65,15 +65,17 @@ The portion of the tag after the matched tag pattern will be converted into an o owner_type: "CUSTOM" owner_type_urn: "urn:li:ownershipType:data_product" ``` -- Add owners, however some owner characters needs to replace with some other characters before ingestion. For example: from tag urn `urn:li:tag:dataset_owner_email:abc_xyz-email_com` extracted owner urn should be `urn:li:corpGroup:abc.xyz@email.com` then config would look like this: +- Add owners, however some tag characters needs to replace with some other characters before extracting owner. For example: from tag urn `urn:li:tag:owner__email:abc--xyz-email_com` extracted owner urn should be `urn:li:corpGroup:abc.xyz@email.com` then config would look like this: ```yaml transformers: - type: "extract_ownership_from_tags" config: tag_pattern: "owner_email:" - owner_character_mapping: - "_": ".", - "-": "@", + tag_character_mapping: + "_": "." + "-": "@" + "--": "-" + "__": "_" ``` - Add owners, however owner type also need to extracted from tag pattern. For example: from tag urn `urn:li:tag:data_producer_owner_email:abc@email.com` extracted owner type should be `data_producer` then config would look like this: ```yaml From 4eedafe1303cecc45aa759a55b405c69dc9cf132 Mon Sep 17 00:00:00 2001 From: shubhamjagtap639 Date: Thu, 4 Apr 2024 14:07:13 +0530 Subject: [PATCH 12/12] Fix lint error --- .../ingestion/transformer/extract_ownership_from_tags.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/extract_ownership_from_tags.py b/metadata-ingestion/src/datahub/ingestion/transformer/extract_ownership_from_tags.py index 397a74f38a3161..27311ff998cbf9 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/extract_ownership_from_tags.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/extract_ownership_from_tags.py @@ -77,7 +77,7 @@ def convert_tag_as_per_mapping(self, tag: str) -> str: """ if self.config.tag_character_mapping: # indices list to keep track of the indices where replacements have been made - indices = [] + indices: List[int] = list() for old_char in sorted( self.config.tag_character_mapping.keys(), key=len,