Skip to content

Commit

Permalink
fix: Resolve PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sagar-salvi-apptware committed Jun 12, 2024
1 parent c390fbc commit b2dbff8
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@
)
from datahub.utilities.mapping import Constants
from datahub.utilities.perf_timer import PerfTimer
from datahub.utilities.ratelimiter import RateLimiter
from datahub.utilities.registries.domain_registry import DomainRegistry

logger: logging.Logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -748,6 +749,12 @@ def _process_schema(

columns = None

rate_limiter: Optional[RateLimiter] = None
if self.config.rate_limit:
rate_limiter = RateLimiter(
max_calls=self.config.requests_per_min, period=60
)

if (
self.config.include_tables
or self.config.include_views
Expand All @@ -759,6 +766,8 @@ def _process_schema(
column_limit=self.config.column_limit,
run_optimized_column_query=self.config.run_optimized_column_query,
extract_policy_tags_from_catalog=self.config.extract_policy_tags_from_catalog,
report=self.report,
rate_limiter=rate_limiter,
)

if self.config.include_tables:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Dict, Iterator, List, Optional
from typing import Any, Dict, Iterable, Iterator, List, Optional

from google.cloud import bigquery, datacatalog_v1
from google.cloud.bigquery.table import (
Expand All @@ -22,6 +22,7 @@
BigqueryTableType,
)
from datahub.ingestion.source.sql.sql_generic import BaseColumn, BaseTable, BaseView
from datahub.utilities.ratelimiter import RateLimiter

logger: logging.Logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -353,34 +354,68 @@ def _make_bigquery_view(view: bigquery.Row) -> BigqueryView:
)

def get_policy_tags_for_column(
self, project_id: str, dataset_name: str, table_name: str, column_name: str
) -> List[str]:
self,
project_id: str,
dataset_name: str,
table_name: str,
column_name: str,
report: BigQueryV2Report,
rate_limiter: Optional[RateLimiter] = None,
) -> Iterable[str]:
assert self.datacatalog_client
# Get the table schema
table_ref = f"{project_id}.{dataset_name}.{table_name}"
table = self.bq_client.get_table(table_ref)
schema = table.schema

# Find the specific field in the schema
field = next((f for f in schema if f.name == column_name), None)
if not field or not field.policy_tags:
return []

# Retrieve policy tag display names
policy_tag_display_names = [
self.datacatalog_client.get_policy_tag(name=policy_tag_name).display_name
for policy_tag_name in field.policy_tags.names
]

return policy_tag_display_names
try:
# Get the table schema
table_ref = f"{project_id}.{dataset_name}.{table_name}"
table = self.bq_client.get_table(table_ref)
schema = table.schema

# Find the specific field in the schema
field = next((f for f in schema if f.name == column_name), None)
if not field or not field.policy_tags:
return

# Retrieve policy tag display names
for policy_tag_name in field.policy_tags.names:
try:
if rate_limiter:
with rate_limiter:
policy_tag = self.datacatalog_client.get_policy_tag(
name=policy_tag_name
)
else:
policy_tag = self.datacatalog_client.get_policy_tag(
name=policy_tag_name
)
yield policy_tag.display_name
except Exception as e:
logger.warning(
f"Unexpected error when retrieving policy tag {policy_tag_name} for column {column_name} in table {table_name}: {e}",
exc_info=True,
)
report.report_warning(
"metadata-extraction",
f"Failed to retrieve policy tag {policy_tag_name} for column {column_name} in table {table_name} due to unexpected error: {e}",
)
except Exception as e:
logger.error(
f"Unexpected error retrieving schema for table {table_name} in dataset {dataset_name}, project {project_id}: {e}",
exc_info=True,
)
report.report_warning(
"metadata-extraction",
f"Failed to retrieve schema for table {table_name} in dataset {dataset_name}, project {project_id} due to unexpected error: {e}",
)

def get_columns_for_dataset(
self,
project_id: str,
dataset_name: str,
column_limit: int,
report: BigQueryV2Report,
run_optimized_column_query: bool = False,
extract_policy_tags_from_catalog: bool = False,
rate_limiter: Optional[RateLimiter] = None,
) -> Optional[Dict[str, List[BigqueryColumn]]]:
columns: Dict[str, List[BigqueryColumn]] = defaultdict(list)
with self.report.get_columns_for_dataset:
Expand Down Expand Up @@ -425,11 +460,15 @@ def get_columns_for_dataset(
comment=column.comment,
is_partition_column=column.is_partitioning_column == "YES",
cluster_column_position=column.clustering_ordinal_position,
policy_tags=self.get_policy_tags_for_column(
project_id,
dataset_name,
column.table_name,
column.column_name,
policy_tags=list(
self.get_policy_tags_for_column(
project_id,
dataset_name,
column.table_name,
column.column_name,
report,
rate_limiter,
)
)
if extract_policy_tags_from_catalog
else [],
Expand Down

0 comments on commit b2dbff8

Please sign in to comment.