Skip to content

Commit

Permalink
fix(bigquery): add dataset_id for bigquery (#4932)
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored May 19, 2022
1 parent 1de2e2c commit 2bb2c52
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,10 @@ def _ge_context(self) -> Iterator[GEContext]:
yield GEContext(data_context, datasource_name)

def generate_profiles(
self, requests: List[GEProfilerRequest], max_workers: int
self,
requests: List[GEProfilerRequest],
max_workers: int,
platform: Optional[str] = None,
) -> Iterable[Tuple[GEProfilerRequest, Optional[DatasetProfileClass]]]:
with PerfTimer() as timer, concurrent.futures.ThreadPoolExecutor(
max_workers=max_workers
Expand All @@ -704,6 +707,7 @@ def generate_profiles(
self._generate_profile_from_request,
query_combiner,
request,
platform=platform,
)
for request in requests
]
Expand Down Expand Up @@ -751,10 +755,12 @@ def _generate_profile_from_request(
self,
query_combiner: SQLAlchemyQueryCombiner,
request: GEProfilerRequest,
platform: Optional[str] = None,
) -> Tuple[GEProfilerRequest, Optional[DatasetProfileClass]]:
return request, self._generate_single_profile(
query_combiner=query_combiner,
pretty_name=request.pretty_name,
platform=platform,
**request.batch_kwargs,
)

Expand All @@ -781,6 +787,7 @@ def _generate_single_profile(
table: str = None,
partition: Optional[str] = None,
custom_sql: Optional[str] = None,
platform: Optional[str] = None,
**kwargs: Any,
) -> Optional[DatasetProfileClass]:
bigquery_temp_table: Optional[str] = None
Expand Down Expand Up @@ -820,6 +827,7 @@ def _generate_single_profile(
ge_context,
ge_config,
pretty_name=pretty_name,
platform=platform,
)

profile = _SingleDatasetProfiler(
Expand Down Expand Up @@ -852,6 +860,7 @@ def _get_ge_dataset(
ge_context: GEContext,
batch_kwargs: dict,
pretty_name: str,
platform: Optional[str] = None,
) -> Dataset:
# This is effectively emulating the beginning of the process that
# is followed by GE itself. In particular, we simply want to construct
Expand All @@ -878,4 +887,12 @@ def _get_ge_dataset(
**batch_kwargs,
},
)
if platform is not None and platform == "bigquery":
name_parts = pretty_name.split(".")
if len(name_parts) != 3:
logger.error(
f"Unexpected {pretty_name} while profiling. Should have 3 parts but has {len(name_parts)} parts."
)
else:
batch.engine.dialect.dataset_id = name_parts[1]
return batch
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,9 @@ def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]:
)

if profiler and profile_requests:
yield from self.loop_profiler(profile_requests, profiler)
yield from self.loop_profiler(
profile_requests, profiler, platform=self.platform
)

if self.is_stateful_ingestion_configured():
# Clean up stale entities.
Expand Down Expand Up @@ -1329,10 +1331,13 @@ def loop_profiler_requests(
)

def loop_profiler(
self, profile_requests: List["GEProfilerRequest"], profiler: "DatahubGEProfiler"
self,
profile_requests: List["GEProfilerRequest"],
profiler: "DatahubGEProfiler",
platform: Optional[str] = None,
) -> Iterable[MetadataWorkUnit]:
for request, profile in profiler.generate_profiles(
profile_requests, self.config.profiling.max_workers
profile_requests, self.config.profiling.max_workers, platform=platform
):
if profile is None:
continue
Expand Down

0 comments on commit 2bb2c52

Please sign in to comment.