diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py index 1e2a12c6adb87..a826f09b9a7c8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py @@ -247,32 +247,21 @@ def get_workunits_internal( self.report.num_queries_by_project[project.id] += 1 queries.append(entry) self.report.num_total_queries = len(queries) + logger.info(f"Found {self.report.num_total_queries} total queries") with self.report.audit_log_preprocessing_timer: # Preprocessing stage that deduplicates the queries using query hash per usage bucket - # Using regular dictionary with - # key: usage bucket - # value: File backed dictionary with query hash as key and observed query as value - # This structure is chosen in order to maintain order of execution of queries - - queries_deduped: Dict[int, FileBackedDict[ObservedQuery]] + # Note: FileBackedDict is an ordered dictionary, so the order of execution of + # queries is inherently maintained + queries_deduped: FileBackedDict[Dict[int, ObservedQuery]] queries_deduped = self.deduplicate_queries(queries) - self.report.num_unique_queries = len( - set( - query_hash - for bucket in queries_deduped.values() - for query_hash in bucket - ) - ) + self.report.num_unique_queries = len(queries_deduped) + logger.info(f"Found {self.report.num_unique_queries} unique queries") with self.report.audit_log_load_timer: i = 0 - for queries_in_bucket in queries_deduped.values(): - # Ordering is essential for column-level lineage via temporary table - for row in queries_in_bucket.sql_query_iterator( - "select value from data order by last_query_timestamp asc", - ): - query = queries_in_bucket.deserializer(row["value"]) + for _, query_instances in queries_deduped.items(): + for query in query_instances.values(): if i > 0 and i % 10000 == 0: logger.info(f"Added {i} query log entries to SQL aggregator") @@ -283,7 +272,7 @@ def get_workunits_internal( def deduplicate_queries( self, queries: FileBackedList[ObservedQuery] - ) -> Dict[int, FileBackedDict[ObservedQuery]]: + ) -> FileBackedDict[Dict[int, ObservedQuery]]: # This fingerprint based deduplication is done here to reduce performance hit due to # repetitive sql parsing while adding observed query to aggregator that would otherwise @@ -291,7 +280,7 @@ def deduplicate_queries( # With current implementation, it is possible that "Operation"(e.g. INSERT) is reported # only once per day, although it may have happened multiple times throughout the day. - queries_deduped: Dict[int, FileBackedDict[ObservedQuery]] = dict() + queries_deduped: FileBackedDict[Dict[int, ObservedQuery]] = FileBackedDict() for i, query in enumerate(queries): if i > 0 and i % 10000 == 0: @@ -310,20 +299,14 @@ def deduplicate_queries( query.query, self.identifiers.platform, fast=True ) - if time_bucket not in queries_deduped: - # TODO: Cleanup, etc as required for file backed dicts after use - queries_deduped[time_bucket] = FileBackedDict[ObservedQuery]( - extra_columns={"last_query_timestamp": lambda e: e.timestamp} - ) + query_instances = queries_deduped.setdefault(query.query_hash, {}) - observed_query = queries_deduped[time_bucket].get(query.query_hash) + observed_query = query_instances.setdefault(time_bucket, query) # If the query already exists for this time bucket, update its attributes - if observed_query is not None: + if observed_query is not query: observed_query.usage_multiplier += 1 observed_query.timestamp = query.timestamp - else: - queries_deduped[time_bucket][query.query_hash] = query return queries_deduped