Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sql_views): added views as datasets for SQLAlchemy DBs #2663

Merged
merged 7 commits into from
Jun 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions metadata-ingestion/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,8 @@ We have two options for the underlying library used to connect to SQL Server: (1

Extracts:

- List of databases, schema, and tables
- Column types associated with each table
- List of databases, schema, tables and views
- Column types associated with each table/view

```yml
source:
Expand All @@ -205,6 +205,7 @@ source:
password: pass
host_port: localhost:1433
database: DemoDatabase
include_views: True
table_pattern:
deny:
- "^.*\\.sys_.*" # deny all tables that start with sys_
Expand Down
183 changes: 134 additions & 49 deletions metadata-ingestion/src/datahub/ingestion/source/sql_common.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import logging
import time
import warnings
from abc import abstractmethod
from dataclasses import dataclass, field
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Type

from sqlalchemy import create_engine
from sqlalchemy.engine import reflection
from sqlalchemy import create_engine, inspect
from sqlalchemy.sql import sqltypes as types

from datahub.configuration.common import AllowDenyPattern, ConfigModel
Expand Down Expand Up @@ -38,13 +38,28 @@
@dataclass
class SQLSourceReport(SourceReport):
tables_scanned: int = 0
views_scanned: int = 0
filtered: List[str] = field(default_factory=list)

def report_table_scanned(self, table_name: str) -> None:
warnings.warn(
"report_table_scanned is deprecated, please use report_entity_scanned with argument `table`"
)
self.tables_scanned += 1

def report_dropped(self, table_name: str) -> None:
self.filtered.append(table_name)
def report_entity_scanned(self, name: str, ent_type: str = "table") -> None:
"""
Entity could be a view or a table
"""
if ent_type == "table":
self.tables_scanned += 1
elif ent_type == "view":
self.views_scanned += 1
else:
raise KeyError(f"Unknown entity {ent_type}.")

def report_dropped(self, ent_name: str) -> None:
self.filtered.append(ent_name)


class SQLAlchemyConfig(ConfigModel):
Expand All @@ -56,6 +71,10 @@ class SQLAlchemyConfig(ConfigModel):
# them out afterwards via the table_pattern.
schema_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
table_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
view_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()

include_views: Optional[bool] = False
include_tables: Optional[bool] = True

@abstractmethod
def get_sql_alchemy_url(self):
Expand All @@ -65,11 +84,11 @@ def get_identifier(self, schema: str, table: str) -> str:
return f"{schema}.{table}"

def standardize_schema_table_names(
self, schema: str, table: str
self, schema: str, entity: str
) -> Tuple[str, str]:
# Some SQLAlchemy dialects need a standardization step to clean the schema
# and table names. See BigQuery for an example of when this is useful.
return schema, table
return schema, entity


class BasicSQLAlchemyConfig(SQLAlchemyConfig):
Expand Down Expand Up @@ -207,56 +226,122 @@ def get_workunits(self) -> Iterable[SqlWorkUnit]:
url = sql_config.get_sql_alchemy_url()
logger.debug(f"sql_alchemy_url={url}")
engine = create_engine(url, **sql_config.options)
inspector = reflection.Inspector.from_engine(engine)
inspector = inspect(engine)
for schema in inspector.get_schema_names():
if not sql_config.schema_pattern.allowed(schema):
self.report.report_dropped(schema)
continue

for table in inspector.get_table_names(schema):
schema, table = sql_config.standardize_schema_table_names(schema, table)
dataset_name = sql_config.get_identifier(schema, table)
self.report.report_table_scanned(dataset_name)

if not sql_config.table_pattern.allowed(dataset_name):
self.report.report_dropped(dataset_name)
continue

columns = inspector.get_columns(table, schema)
try:
table_info: dict = inspector.get_table_comment(table, schema)
except NotImplementedError:
description: Optional[str] = None
properties: Dict[str, str] = {}
else:
description = table_info["text"]

# The "properties" field is a non-standard addition to SQLAlchemy's interface.
properties = table_info.get("properties", {})

# TODO: capture inspector.get_pk_constraint
# TODO: capture inspector.get_sorted_table_and_fkc_names

dataset_snapshot = DatasetSnapshot(
urn=f"urn:li:dataset:(urn:li:dataPlatform:{self.platform},{dataset_name},{self.config.env})",
aspects=[],
)
if description is not None or properties:
dataset_properties = DatasetPropertiesClass(
description=description,
customProperties=properties,
# uri=dataset_name,
)
dataset_snapshot.aspects.append(dataset_properties)
schema_metadata = get_schema_metadata(
self.report, dataset_name, self.platform, columns
if sql_config.include_tables:
yield from self.loop_tables(inspector, schema, sql_config)

if sql_config.include_views:
yield from self.loop_views(inspector, schema, sql_config)

def loop_tables(
self,
inspector: Any,
schema: str,
sql_config: SQLAlchemyConfig,
) -> Iterable[SqlWorkUnit]:
for table in inspector.get_table_names(schema):
schema, table = sql_config.standardize_schema_table_names(schema, table)
dataset_name = sql_config.get_identifier(schema, table)
self.report.report_entity_scanned(dataset_name, ent_type="table")

if not sql_config.table_pattern.allowed(dataset_name):
self.report.report_dropped(dataset_name)
continue

columns = inspector.get_columns(table, schema)
try:
table_info: dict = inspector.get_table_comment(table, schema)
except NotImplementedError:
description: Optional[str] = None
properties: Dict[str, str] = {}
else:
description = table_info["text"]

# The "properties" field is a non-standard addition to SQLAlchemy's interface.
properties = table_info.get("properties", {})

# TODO: capture inspector.get_pk_constraint
# TODO: capture inspector.get_sorted_table_and_fkc_names

dataset_snapshot = DatasetSnapshot(
urn=f"urn:li:dataset:(urn:li:dataPlatform:{self.platform},{dataset_name},{self.config.env})",
aspects=[],
)
if description is not None or properties:
dataset_properties = DatasetPropertiesClass(
description=description,
customProperties=properties,
# uri=dataset_name,
)
dataset_snapshot.aspects.append(schema_metadata)
dataset_snapshot.aspects.append(dataset_properties)
schema_metadata = get_schema_metadata(
self.report, dataset_name, self.platform, columns
)
dataset_snapshot.aspects.append(schema_metadata)

mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
wu = SqlWorkUnit(id=dataset_name, mce=mce)
self.report.report_workunit(wu)
yield wu

def loop_views(
self,
inspector: Any,
schema: str,
sql_config: SQLAlchemyConfig,
) -> Iterable[SqlWorkUnit]:
for view in inspector.get_view_names(schema):
schema, view = sql_config.standardize_schema_table_names(schema, view)
dataset_name = sql_config.get_identifier(schema, view)
self.report.report_entity_scanned(dataset_name, ent_type="view")

if not sql_config.view_pattern.allowed(dataset_name):
self.report.report_dropped(dataset_name)
continue

mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
wu = SqlWorkUnit(id=dataset_name, mce=mce)
self.report.report_workunit(wu)
yield wu
columns = inspector.get_columns(view, schema)
try:
view_info: dict = inspector.get_table_comment(view, schema)
except NotImplementedError:
description: Optional[str] = None
properties: Dict[str, str] = {}
else:
description = view_info["text"]

# The "properties" field is a non-standard addition to SQLAlchemy's interface.
properties = view_info.get("properties", {})

view_definition = inspector.get_view_definition(view)
if view_definition is None:
view_definition = ""
properties["view_definition"] = view_definition
properties["is_view"] = "True"

dataset_snapshot = DatasetSnapshot(
urn=f"urn:li:dataset:(urn:li:dataPlatform:{self.platform},{dataset_name},{self.config.env})",
aspects=[],
)
if description is not None or properties:
dataset_properties = DatasetPropertiesClass(
description=description,
customProperties=properties,
# uri=dataset_name,
)
dataset_snapshot.aspects.append(dataset_properties)
schema_metadata = get_schema_metadata(
self.report, dataset_name, self.platform, columns
)
dataset_snapshot.aspects.append(schema_metadata)

mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
wu = SqlWorkUnit(id=dataset_name, mce=mce)
self.report.report_workunit(wu)
yield wu

def get_report(self):
return self.report
Expand Down