Skip to content

Commit

Permalink
feat(ingest): Improve lookml sql derived tables detection, add cascad…
Browse files Browse the repository at this point in the history
…ing derived tables to lineage (#2770)
  • Loading branch information
remisalmon authored Jun 30, 2021
1 parent 6fee59e commit 2aa95ec
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 16 deletions.
2 changes: 1 addition & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def get_long_description():
},
"ldap": {"python-ldap>=2.4"},
"looker": {"looker-sdk==21.6.0"},
"lookml": {"lkml>=1.1.0", "sql-metadata==1.12.0"},
"lookml": {"lkml>=1.1.0", "sql-metadata==2.2.1"},
"mongodb": {"pymongo>=3.11"},
"mssql": sql_common | {"sqlalchemy-pytds>=0.3"},
"mssql-odbc": sql_common | {"pyodbc"},
Expand Down
24 changes: 9 additions & 15 deletions metadata-ingestion/src/datahub/ingestion/source/lookml.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import lkml
else:
raise ModuleNotFoundError("The lookml plugin requires Python 3.7 or newer.")
from sql_metadata import get_query_tables
from sql_metadata import Parser as SQLParser

from datahub.configuration import ConfigModel
from datahub.configuration.common import AllowDenyPattern
Expand Down Expand Up @@ -197,20 +197,9 @@ class LookerView: # pragma: no cover

@classmethod
def _get_sql_table_names(cls, sql: str) -> List[str]:
sql_tables: List[str] = get_query_tables(sql)

# Remove temporary tables from WITH statements
sql_table_names = [
t
for t in sql_tables
if not re.search(
fr"WITH(.*,)?\s+{t}(\s*\([\w\s,]+\))?\s+AS\s+\(",
sql,
re.IGNORECASE | re.DOTALL,
)
]
sql_table_names: List[str] = SQLParser(sql).tables

# Remove quotes from tables
# Remove quotes from table names
sql_table_names = [t.replace('"', "") for t in sql_table_names]

return sql_table_names
Expand Down Expand Up @@ -383,7 +372,12 @@ def _load_model(self, path: str) -> LookerModel:
def _construct_datalineage_urn(self, sql_table_name: str, connection: str) -> str:
platform = self._get_platform_based_on_connection(connection)

if "." in platform:
# Check if table name matches cascading derived tables pattern (same platform)
if re.fullmatch(r"\w+\.SQL_TABLE_NAME", sql_table_name):
platform_name = self.source_config.platform_name
sql_table_name = sql_table_name.lower().split(".")[0]
# Check if table database is in platform name (upstream platform)
elif "." in platform:
platform_name, database_name = platform.lower().split(".", maxsplit=1)
sql_table_name = f"{database_name}.{sql_table_name}".lower()
else:
Expand Down

0 comments on commit 2aa95ec

Please sign in to comment.