Skip to content

Commit

Permalink
fix(ingestion/bigquery): Table-view-snapshot Lineage Bug fix (#9579)
Browse files Browse the repository at this point in the history
Co-authored-by: Aseem Bansal <[email protected]>
Co-authored-by: Harshal Sheth <[email protected]>
  • Loading branch information
3 people authored Jan 25, 2024
1 parent 53c7790 commit f83a2fa
Show file tree
Hide file tree
Showing 8 changed files with 416 additions and 31 deletions.
133 changes: 123 additions & 10 deletions metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
BigqueryProject,
BigQuerySchemaApi,
BigqueryTable,
BigqueryTableSnapshot,
BigqueryView,
)
from datahub.ingestion.source.bigquery_v2.common import (
Expand Down Expand Up @@ -234,7 +235,7 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config):
run_id=self.ctx.run_id,
)

# For database, schema, tables, views, etc
# For database, schema, tables, views, snapshots etc
self.lineage_extractor = BigqueryLineageExtractor(
config,
self.report,
Expand Down Expand Up @@ -282,8 +283,12 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config):

# Maps project -> view_ref, so we can find all views in a project
self.view_refs_by_project: Dict[str, Set[str]] = defaultdict(set)
# Maps project -> snapshot_ref, so we can find all snapshots in a project
self.snapshot_refs_by_project: Dict[str, Set[str]] = defaultdict(set)
# Maps view ref -> actual sql
self.view_definitions: FileBackedDict[str] = FileBackedDict()
# Maps snapshot ref -> Snapshot
self.snapshots_by_ref: FileBackedDict[BigqueryTableSnapshot] = FileBackedDict()

self.add_config_to_report()
atexit.register(cleanup, config)
Expand All @@ -303,6 +308,10 @@ def connectivity_test(client: bigquery.Client) -> CapabilityReport:
else:
return CapabilityReport(capable=True)

@property
def store_table_refs(self):
return self.config.include_table_lineage or self.config.include_usage_statistics

@staticmethod
def metadata_read_capability_test(
project_ids: List[str], config: BigQueryV2Config
Expand Down Expand Up @@ -453,6 +462,7 @@ def _init_schema_resolver(self) -> SchemaResolver:
self.config.include_schema_metadata
and self.config.include_tables
and self.config.include_views
and self.config.include_table_snapshots
)

if schema_resolution_required and not schema_ingestion_enabled:
Expand Down Expand Up @@ -567,6 +577,8 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
self.sql_parser_schema_resolver,
self.view_refs_by_project,
self.view_definitions,
self.snapshot_refs_by_project,
self.snapshots_by_ref,
self.table_refs,
)

Expand Down Expand Up @@ -603,6 +615,7 @@ def _process_project(
) -> Iterable[MetadataWorkUnit]:
db_tables: Dict[str, List[BigqueryTable]] = {}
db_views: Dict[str, List[BigqueryView]] = {}
db_snapshots: Dict[str, List[BigqueryTableSnapshot]] = {}

project_id = bigquery_project.id
try:
Expand Down Expand Up @@ -651,9 +664,9 @@ def _process_project(
self.report.report_dropped(f"{bigquery_dataset.name}.*")
continue
try:
# db_tables and db_views are populated in the this method
# db_tables, db_views, and db_snapshots are populated in the this method
yield from self._process_schema(
project_id, bigquery_dataset, db_tables, db_views
project_id, bigquery_dataset, db_tables, db_views, db_snapshots
)

except Exception as e:
Expand Down Expand Up @@ -684,6 +697,7 @@ def _process_schema(
bigquery_dataset: BigqueryDataset,
db_tables: Dict[str, List[BigqueryTable]],
db_views: Dict[str, List[BigqueryView]],
db_snapshots: Dict[str, List[BigqueryTableSnapshot]],
) -> Iterable[MetadataWorkUnit]:
dataset_name = bigquery_dataset.name

Expand All @@ -692,7 +706,11 @@ def _process_schema(
)

columns = None
if self.config.include_tables or self.config.include_views:
if (
self.config.include_tables
or self.config.include_views
or self.config.include_table_snapshots
):
columns = self.bigquery_data_dictionary.get_columns_for_dataset(
project_id=project_id,
dataset_name=dataset_name,
Expand All @@ -713,7 +731,7 @@ def _process_schema(
project_id=project_id,
dataset_name=dataset_name,
)
elif self.config.include_table_lineage or self.config.include_usage_statistics:
elif self.store_table_refs:
# Need table_refs to calculate lineage and usage
for table_item in self.bigquery_data_dictionary.list_tables(
dataset_name, project_id
Expand All @@ -738,7 +756,10 @@ def _process_schema(
if self.config.include_views:
db_views[dataset_name] = list(
self.bigquery_data_dictionary.get_views_for_dataset(
project_id, dataset_name, self.config.is_profiling_enabled()
project_id,
dataset_name,
self.config.is_profiling_enabled(),
self.report,
)
)

Expand All @@ -751,6 +772,25 @@ def _process_schema(
dataset_name=dataset_name,
)

if self.config.include_table_snapshots:
db_snapshots[dataset_name] = list(
self.bigquery_data_dictionary.get_snapshots_for_dataset(
project_id,
dataset_name,
self.config.is_profiling_enabled(),
self.report,
)
)

for snapshot in db_snapshots[dataset_name]:
snapshot_columns = columns.get(snapshot.name, []) if columns else []
yield from self._process_snapshot(
snapshot=snapshot,
columns=snapshot_columns,
project_id=project_id,
dataset_name=dataset_name,
)

# This method is used to generate the ignore list for datatypes the profiler doesn't support we have to do it here
# because the profiler doesn't have access to columns
def generate_profile_ignore_list(self, columns: List[BigqueryColumn]) -> List[str]:
Expand Down Expand Up @@ -778,7 +818,7 @@ def _process_table(
self.report.report_dropped(table_identifier.raw_table_name())
return

if self.config.include_table_lineage or self.config.include_usage_statistics:
if self.store_table_refs:
self.table_refs.add(
str(BigQueryTableRef(table_identifier).get_sanitized_table_ref())
)
Expand Down Expand Up @@ -827,7 +867,7 @@ def _process_view(
self.report.report_dropped(table_identifier.raw_table_name())
return

if self.config.include_table_lineage or self.config.include_usage_statistics:
if self.store_table_refs:
table_ref = str(
BigQueryTableRef(table_identifier).get_sanitized_table_ref()
)
Expand All @@ -849,6 +889,48 @@ def _process_view(
dataset_name=dataset_name,
)

def _process_snapshot(
self,
snapshot: BigqueryTableSnapshot,
columns: List[BigqueryColumn],
project_id: str,
dataset_name: str,
) -> Iterable[MetadataWorkUnit]:
table_identifier = BigqueryTableIdentifier(
project_id, dataset_name, snapshot.name
)

self.report.snapshots_scanned += 1

if not self.config.table_snapshot_pattern.allowed(
table_identifier.raw_table_name()
):
self.report.report_dropped(table_identifier.raw_table_name())
return

snapshot.columns = columns
snapshot.column_count = len(columns)
if not snapshot.column_count:
logger.warning(
f"Snapshot doesn't have any column or unable to get columns for table: {table_identifier}"
)

if self.store_table_refs:
table_ref = str(
BigQueryTableRef(table_identifier).get_sanitized_table_ref()
)
self.table_refs.add(table_ref)
if snapshot.base_table_identifier:
self.snapshot_refs_by_project[project_id].add(table_ref)
self.snapshots_by_ref[table_ref] = snapshot

yield from self.gen_snapshot_dataset_workunits(
table=snapshot,
columns=columns,
project_id=project_id,
dataset_name=dataset_name,
)

def gen_table_dataset_workunits(
self,
table: BigqueryTable,
Expand Down Expand Up @@ -933,9 +1015,34 @@ def gen_view_dataset_workunits(
aspect=view_properties_aspect,
).as_workunit()

def gen_snapshot_dataset_workunits(
self,
table: BigqueryTableSnapshot,
columns: List[BigqueryColumn],
project_id: str,
dataset_name: str,
) -> Iterable[MetadataWorkUnit]:
custom_properties: Dict[str, str] = {}
if table.ddl:
custom_properties["snapshot_ddl"] = table.ddl
if table.snapshot_time:
custom_properties["snapshot_time"] = str(table.snapshot_time)
if table.size_in_bytes:
custom_properties["size_in_bytes"] = str(table.size_in_bytes)
if table.rows_count:
custom_properties["rows_count"] = str(table.rows_count)
yield from self.gen_dataset_workunits(
table=table,
columns=columns,
project_id=project_id,
dataset_name=dataset_name,
sub_types=[DatasetSubTypes.BIGQUERY_TABLE_SNAPSHOT],
custom_properties=custom_properties,
)

def gen_dataset_workunits(
self,
table: Union[BigqueryTable, BigqueryView],
table: Union[BigqueryTable, BigqueryView, BigqueryTableSnapshot],
columns: List[BigqueryColumn],
project_id: str,
dataset_name: str,
Expand Down Expand Up @@ -1041,6 +1148,9 @@ def gen_schema_fields(self, columns: List[BigqueryColumn]) -> List[SchemaField]:
# TODO: Refractor this such that
# converter = HiveColumnToAvroConverter(struct_type_separator=" ");
# converter.get_schema_fields_for_hive_column(...)
original_struct_type_separator = (
HiveColumnToAvroConverter._STRUCT_TYPE_SEPARATOR
)
HiveColumnToAvroConverter._STRUCT_TYPE_SEPARATOR = " "
_COMPLEX_TYPE = re.compile("^(struct|array)")
last_id = -1
Expand Down Expand Up @@ -1101,12 +1211,15 @@ def gen_schema_fields(self, columns: List[BigqueryColumn]) -> List[SchemaField]:
)
schema_fields.append(field)
last_id = col.ordinal_position
HiveColumnToAvroConverter._STRUCT_TYPE_SEPARATOR = (
original_struct_type_separator
)
return schema_fields

def gen_schema_metadata(
self,
dataset_urn: str,
table: Union[BigqueryTable, BigqueryView],
table: Union[BigqueryTable, BigqueryView, BigqueryTableSnapshot],
columns: List[BigqueryColumn],
dataset_name: str,
) -> MetadataWorkUnit:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,15 @@ class BigQueryV2Config(
" because the project id is represented as the top-level container.",
)

include_table_snapshots: Optional[bool] = Field(
default=True, description="Whether table snapshots should be ingested."
)

table_snapshot_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for table snapshots to filter in ingestion. Specify regex to match the entire snapshot name in database.schema.snapshot format. e.g. to match all snapshots starting with customer in Customer database and public schema, use the regex 'Customer.public.customer.*'",
)

debug_include_full_payloads: bool = Field(
default=False,
description="Include full payload into events. It is only for debugging and internal use.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class BigQuerySchemaApiPerfReport(Report):
get_tables_for_dataset: PerfTimer = field(default_factory=PerfTimer)
list_tables: PerfTimer = field(default_factory=PerfTimer)
get_views_for_dataset: PerfTimer = field(default_factory=PerfTimer)
get_snapshots_for_dataset: PerfTimer = field(default_factory=PerfTimer)


@dataclass
Expand Down Expand Up @@ -119,6 +120,8 @@ class BigQueryV2Report(ProfilingSqlReport, IngestionStageReport, BaseTimeWindowR
num_usage_query_hash_collisions: int = 0
num_operational_stats_workunits_emitted: int = 0

snapshots_scanned: int = 0

num_view_definitions_parsed: int = 0
num_view_definitions_failed_parsing: int = 0
num_view_definitions_failed_column_parsing: int = 0
Expand Down
Loading

0 comments on commit f83a2fa

Please sign in to comment.