Skip to content

Commit

Permalink
fix(ingest/profiling): compute sample row count correctly (datahub-pr…
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored and sleeperdeep committed Jun 25, 2024
1 parent 973f80c commit e94ea5f
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 11 deletions.
34 changes: 24 additions & 10 deletions metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -689,9 +689,28 @@ def generate_dataset_profile( # noqa: C901 (complexity)
logger.debug(f"profiling {self.dataset_name}: flushing stage 1 queries")
self.query_combiner.flush()

assert profile.rowCount is not None
full_row_count = profile.rowCount

if self.config.use_sampling and not self.config.limit:
self.update_dataset_batch_use_sampling(profile)

# Note that this row count may be different from the full_row_count if we are using sampling.
row_count: int = profile.rowCount
if profile.partitionSpec and "SAMPLE" in profile.partitionSpec.partition:
# Querying exact row count of sample using `_get_dataset_rows`.
# We are not using `self.config.sample_size` directly as the actual row count
# in the sample may be different than configured `sample_size`. For BigQuery,
# we've even seen 160k rows returned for a sample size of 10k.
logger.debug("Recomputing row count for the sample")

# Note that we can't just call `self._get_dataset_rows(profile)` here because
# there's some sort of caching happening that will return the full table row count
# instead of the sample row count.
row_count = self.dataset.get_row_count(str(self.dataset._table))

profile.partitionSpec.partition += f" (sample rows {row_count})"

columns_profiling_queue: List[_SingleColumnSpec] = []
if columns_to_profile:
for column in all_columns:
Expand All @@ -708,16 +727,6 @@ def generate_dataset_profile( # noqa: C901 (complexity)
logger.debug(f"profiling {self.dataset_name}: flushing stage 2 queries")
self.query_combiner.flush()

assert profile.rowCount is not None
row_count: int # used for null counts calculation
if profile.partitionSpec and "SAMPLE" in profile.partitionSpec.partition:
# Querying exact row count of sample using `_get_dataset_rows`.
# We are not using `self.config.sample_size` directly as actual row count
# in sample may be slightly different (more or less) than configured `sample_size`.
self._get_dataset_rows(profile)

row_count = profile.rowCount

for column_spec in columns_profiling_queue:
column = column_spec.column
column_profile = column_spec.column_profile
Expand Down Expand Up @@ -825,6 +834,10 @@ def generate_dataset_profile( # noqa: C901 (complexity)

logger.debug(f"profiling {self.dataset_name}: flushing stage 3 queries")
self.query_combiner.flush()

# Reset the row count to the original value.
profile.rowCount = full_row_count

return profile

def init_profile(self):
Expand Down Expand Up @@ -1274,6 +1287,7 @@ def create_bigquery_temp_table(
try:
cursor: "BigQueryCursor" = cast("BigQueryCursor", raw_connection.cursor())
try:
logger.debug(f"Creating temporary table for {table_pretty_name}: {bq_sql}")
cursor.execute(bq_sql)
except Exception as e:
if not instance.config.catch_exceptions:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def get_profile_request(
rows_count=table.rows_count,
):
logger.debug(
f"Dataset {dataset_name} was not eliagable for profiling due to last_altered, size in bytes or count of rows limit"
f"Dataset {dataset_name} was not eligible for profiling due to last_altered, size in bytes or count of rows limit"
)
# Profile only table level if dataset is filtered from profiling
# due to size limits alone
Expand Down

0 comments on commit e94ea5f

Please sign in to comment.