Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): sql parser perf + asyncio fixes #9119

Merged
merged 6 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@
sqlglot_lib = {
# Using an Acryl fork of sqlglot.
# https://github.com/tobymao/sqlglot/compare/main...hsheth2:sqlglot:hsheth?expand=1
"acryl-sqlglot==18.5.2.dev45",
"acryl-sqlglot==18.17.1.dev16",
}

sql_common = (
Expand Down
5 changes: 5 additions & 0 deletions metadata-ingestion/src/datahub/cli/docker_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import pathlib
import platform
import signal
import subprocess
import sys
import tempfile
Expand Down Expand Up @@ -770,6 +771,10 @@ def quickstart( # noqa: C901
logger.debug("docker compose up still running, sending SIGKILL")
up_process.kill()
up_process.wait()
else:
# If the docker process got a keyboard interrupt, raise one here.
if up_process.returncode in {128 + signal.SIGINT, -signal.SIGINT}:
raise KeyboardInterrupt

# Check docker health every few seconds.
status = check_docker_quickstart()
Expand Down
12 changes: 4 additions & 8 deletions metadata-ingestion/src/datahub/upgrade/upgrade.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import asyncio
import contextlib
import functools
import logging
import sys
from datetime import datetime, timedelta, timezone
Expand Down Expand Up @@ -374,17 +373,14 @@ def check_upgrade(func: Callable[..., T]) -> Callable[..., T]:
@wraps(func)
def async_wrapper(*args: Any, **kwargs: Any) -> Any:
async def run_inner_func():
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None, functools.partial(func, *args, **kwargs)
)
return func(*args, **kwargs)

async def run_func_check_upgrade():
version_stats_future = asyncio.ensure_future(retrieve_version_stats())
the_one_future = asyncio.ensure_future(run_inner_func())
ret = await the_one_future
main_func_future = asyncio.ensure_future(run_inner_func())
ret = await main_func_future

# the one future has returned
# the main future has returned
# we check the other futures quickly
try:
version_stats = await asyncio.wait_for(version_stats_future, 0.5)
Expand Down
5 changes: 2 additions & 3 deletions metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ def get_query_type_of_sql(expression: sqlglot.exp.Expression) -> QueryType:
sqlglot.exp.Update: QueryType.UPDATE,
sqlglot.exp.Delete: QueryType.DELETE,
sqlglot.exp.Merge: QueryType.MERGE,
sqlglot.exp.Subqueryable: QueryType.SELECT, # unions, etc. are also selects
}

for cls, query_type in mapping.items():
Expand Down Expand Up @@ -820,10 +821,8 @@ def _extract_select_from_update(
)

# Update statements always implicitly have the updated table in context.
# TODO: Retain table name alias.
# TODO: Retain table name alias, if one was present.
if select_statement.args.get("from"):
# select_statement = sqlglot.parse_one(select_statement.sql(dialect=dialect))

select_statement = select_statement.join(
statement.this, append=True, join_kind="cross"
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"query_type": "UNKNOWN",
"query_type": "SELECT",
"in_tables": [
"urn:li:dataset:(urn:li:dataPlatform:snowflake,snowflake_sample_data.tpch_sf10.orders,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:snowflake,snowflake_sample_data.tpch_sf100.orders,PROD)"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
{
"query_type": "SELECT",
"in_tables": [
"urn:li:dataset:(urn:li:dataPlatform:teradata,dbc.table1,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:teradata,dbc.table2,PROD)"
],
"out_tables": [],
"column_lineage": [
{
"downstream": {
"table": null,
"column": "col1",
"column_type": null,
"native_column_type": null
},
"upstreams": [
{
"table": "urn:li:dataset:(urn:li:dataPlatform:teradata,dbc.table1,PROD)",
"column": "col1"
},
{
"table": "urn:li:dataset:(urn:li:dataPlatform:teradata,dbc.table2,PROD)",
"column": "col1"
}
]
},
{
"downstream": {
"table": null,
"column": "col2",
"column_type": null,
"native_column_type": null
},
"upstreams": [
{
"table": "urn:li:dataset:(urn:li:dataPlatform:teradata,dbc.table1,PROD)",
"column": "col2"
},
{
"table": "urn:li:dataset:(urn:li:dataPlatform:teradata,dbc.table2,PROD)",
"column": "col2"
}
]
}
]
}
14 changes: 14 additions & 0 deletions metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,20 @@ def test_teradata_default_normalization():
)


def test_teradata_strange_operators():
assert_sql_result(
"""
select col1, col2 from dbc.table1
where col1 eq 'value1'
minus
select col1, col2 from dbc.table2
""",
dialect="teradata",
default_schema="dbc",
expected_file=RESOURCE_DIR / "test_teradata_strange_operators.json",
)


def test_snowflake_update_hardcoded():
assert_sql_result(
"""
Expand Down
Loading