Skip to content

Commit

Permalink
feat(ingest): bigquery - enhance logging while processing audit logs (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
rslanka authored Feb 9, 2022
1 parent 0f56bc5 commit 2d7452d
Showing 1 changed file with 21 additions and 0 deletions.
21 changes: 21 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ def _compute_big_query_lineage(self) -> None:
)

def _compute_bigquery_lineage_via_gcp_logging(self) -> None:
logger.info("Populating lineage info via GCP audit logs")
try:
_clients: List[GCPLoggingClient] = self._make_bigquery_client()
log_entries: Iterable[AuditLogEntry] = self._get_bigquery_log_entries(
Expand All @@ -333,6 +334,7 @@ def _compute_bigquery_lineage_via_gcp_logging(self) -> None:
)

def _compute_bigquery_lineage_via_exported_bigquery_audit_metadata(self) -> None:
logger.info("Populating lineage info via exported GCP audit logs")
try:
_client: BigQueryClient = BigQueryClient(project=self.config.project_id)
exported_bigquery_audit_metadata: Iterable[
Expand Down Expand Up @@ -441,11 +443,15 @@ def _get_exported_bigquery_audit_metadata(
def _parse_bigquery_log_entries(
self, entries: Iterable[AuditLogEntry]
) -> Iterable[QueryEvent]:
num_total_log_entries: int = 0
num_parsed_log_entires: int = 0
for entry in entries:
num_total_log_entries += 1
event: Optional[QueryEvent] = None
try:
if QueryEvent.can_parse_entry(entry):
event = QueryEvent.from_entry(entry)
num_parsed_log_entires += 1
else:
raise RuntimeError("Unable to parse log entry as QueryEvent.")
except Exception as e:
Expand All @@ -456,6 +462,10 @@ def _parse_bigquery_log_entries(
logger.error("Unable to parse GCP log entry.", e)
if event is not None:
yield event
logger.info(
f"Parsing BigQuery log entries: Number of log entries scanned={num_total_log_entries}, "
f"number of log entries successfully parsed={num_parsed_log_entires}"
)

def _parse_exported_bigquery_audit_metadata(
self, audit_metadata_rows: Iterable[BigQueryAuditMetadata]
Expand All @@ -482,18 +492,29 @@ def _parse_exported_bigquery_audit_metadata(

def _create_lineage_map(self, entries: Iterable[QueryEvent]) -> Dict[str, Set[str]]:
lineage_map: Dict[str, Set[str]] = collections.defaultdict(set)
num_entries: int = 0
num_skipped_entries: int = 0
for e in entries:
num_entries += 1
if (
e.destinationTable is None
or e.destinationTable.is_anonymous()
or not e.referencedTables
):
num_skipped_entries += 1
continue
entry_consumed: bool = False
for ref_table in e.referencedTables:
destination_table_str = str(e.destinationTable.remove_extras())
ref_table_str = str(ref_table.remove_extras())
if ref_table_str != destination_table_str:
lineage_map[destination_table_str].add(ref_table_str)
entry_consumed = True
if not entry_consumed:
num_skipped_entries += 1
logger.info(
f"Creating lineage map: total number of entries={num_entries}, number skipped={num_skipped_entries}."
)
return lineage_map

def get_latest_partition(
Expand Down

0 comments on commit 2d7452d

Please sign in to comment.