Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(ingest/sql): add _get_view_definition helper method #12033

Merged
merged 2 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

import pydantic
import sqlalchemy.dialects.mssql

# This import verifies that the dependencies are available.
from pydantic.fields import Field
from sqlalchemy import create_engine, inspect
from sqlalchemy.engine.base import Connection
Expand Down
53 changes: 32 additions & 21 deletions metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,8 @@ def get_view_lineage(self) -> Iterable[MetadataWorkUnit]:
generate_operations=False,
)
for dataset_name in self._view_definition_cache.keys():
# TODO: Ensure that the lineage generated from the view definition
# matches the dataset_name.
view_definition = self._view_definition_cache[dataset_name]
result = self._run_sql_parser(
dataset_name,
Expand Down Expand Up @@ -1059,6 +1061,20 @@ def loop_views(
exc=e,
)

def _get_view_definition(self, inspector: Inspector, schema: str, view: str) -> str:
try:
view_definition = inspector.get_view_definition(view, schema)
if view_definition is None:
view_definition = ""
else:
# Some dialects return a TextClause instead of a raw string,
# so we need to convert them to a string.
view_definition = str(view_definition)
except NotImplementedError:
view_definition = ""

return view_definition

def _process_view(
self,
dataset_name: str,
Expand All @@ -1077,7 +1093,10 @@ def _process_view(
columns = inspector.get_columns(view, schema)
except KeyError:
# For certain types of views, we are unable to fetch the list of columns.
self.warn(logger, dataset_name, "unable to get schema for this view")
self.report.warning(
message="Unable to get schema for a view",
context=f"{dataset_name}",
)
schema_metadata = None
else:
schema_fields = self.get_schema_fields(dataset_name, columns, inspector)
Expand All @@ -1091,19 +1110,12 @@ def _process_view(
if self._save_schema_to_resolver():
self.schema_resolver.add_schema_metadata(dataset_urn, schema_metadata)
self.discovered_datasets.add(dataset_name)

description, properties, _ = self.get_table_properties(inspector, schema, view)
try:
view_definition = inspector.get_view_definition(view, schema)
if view_definition is None:
view_definition = ""
else:
# Some dialects return a TextClause instead of a raw string,
# so we need to convert them to a string.
view_definition = str(view_definition)
except NotImplementedError:
view_definition = ""
properties["view_definition"] = view_definition
properties["is_view"] = "True"

view_definition = self._get_view_definition(inspector, schema, view)
properties["view_definition"] = view_definition
if view_definition and self.config.include_view_lineage:
self._view_definition_cache[dataset_name] = view_definition

Expand Down Expand Up @@ -1135,15 +1147,14 @@ def _process_view(
entityUrn=dataset_urn,
aspect=SubTypesClass(typeNames=[DatasetSubTypes.VIEW]),
).as_workunit()
if "view_definition" in properties:
view_definition_string = properties["view_definition"]
view_properties_aspect = ViewPropertiesClass(
materialized=False, viewLanguage="SQL", viewLogic=view_definition_string
)
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=view_properties_aspect,
).as_workunit()

view_properties_aspect = ViewPropertiesClass(
materialized=False, viewLanguage="SQL", viewLogic=view_definition
)
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=view_properties_aspect,
).as_workunit()

if self.config.domain and self.domain_registry:
yield from get_domain_wu(
Expand Down
Loading