Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingestion/tableau): Fix tableau custom sql lineage gap #10359

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -1693,7 +1693,7 @@ def parse_custom_sql(
) -> Optional["SqlParsingResult"]:
database_info = datasource.get(c.DATABASE) or {
c.NAME: c.UNKNOWN.lower(),
c.CONNECTION_TYPE: "databricks",
c.CONNECTION_TYPE: datasource.get(c.CONNECTION_TYPE),
}

if (
Expand All @@ -1703,7 +1703,10 @@ def parse_custom_sql(
logger.debug(f"datasource {datasource_urn} is not created from custom sql")
return None

if c.NAME not in database_info or c.CONNECTION_TYPE not in database_info:
if (
database_info.get(c.NAME) is None
or database_info.get(c.CONNECTION_TYPE) is None
):
logger.debug(
f"database information is missing from datasource {datasource_urn}"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ class MetadataQueryException(Exception):
totalCount
}
}
connectionType
database{
name
connectionType
Expand Down Expand Up @@ -827,6 +828,7 @@ def get_unique_custom_sql(custom_sql_list: List[dict]) -> List[dict]:
# are missing from api result.
"isUnsupportedCustomSql": True if not custom_sql.get("tables") else False,
"query": custom_sql.get("query"),
"connectionType": custom_sql.get("connectionType"),
"columns": custom_sql.get("columns"),
"tables": custom_sql.get("tables"),
"database": custom_sql.get("database"),
Expand Down
108 changes: 73 additions & 35 deletions metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
)

from datahub.configuration.source_common import DEFAULT_ENV
from datahub.emitter.mce_builder import make_schema_field_urn
from datahub.ingestion.run.pipeline import Pipeline, PipelineContext
from datahub.ingestion.source.tableau import TableauConfig, TableauSource
from datahub.ingestion.source.tableau_common import (
Expand All @@ -24,10 +25,12 @@
)
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
DatasetLineageType,
FineGrainedLineage,
FineGrainedLineageDownstreamType,
FineGrainedLineageUpstreamType,
UpstreamLineage,
)
from datahub.metadata.schema_classes import MetadataChangeProposalClass, UpstreamClass
from datahub.sql_parsing.sqlglot_lineage import SqlParsingResult
from tests.test_helpers import mce_helpers, test_connection_helpers
from tests.test_helpers.state_helpers import (
get_current_checkpoint_from_pipeline,
Expand Down Expand Up @@ -805,55 +808,90 @@ def test_tableau_signout_timeout(pytestconfig, tmp_path, mock_datahub_graph):
)


def test_tableau_unsupported_csql(mock_datahub_graph):
def test_tableau_unsupported_csql():
context = PipelineContext(run_id="0", pipeline_name="test_tableau")
context.graph = mock_datahub_graph
config = TableauConfig.parse_obj(config_source_default.copy())
config_dict = config_source_default.copy()
del config_dict["stateful_ingestion"]
config = TableauConfig.parse_obj(config_dict)
config.extract_lineage_from_unsupported_custom_sql_queries = True
config.lineage_overrides = TableauLineageOverrides(
database_override_map={"production database": "prod"}
)

with mock.patch(
"datahub.ingestion.source.tableau.create_lineage_sql_parsed_result",
return_value=SqlParsingResult(
in_tables=[
"urn:li:dataset:(urn:li:dataPlatform:bigquery,my_bigquery_project.invent_dw.userdetail,PROD)"
],
out_tables=[],
column_lineage=None,
),
def test_lineage_metadata(
lineage, expected_entity_urn, expected_upstream_table, expected_cll
):
source = TableauSource(config=config, ctx=context)

lineage = source._create_lineage_from_unsupported_csql(
csql_urn="urn:li:dataset:(urn:li:dataPlatform:tableau,09988088-05ad-173c-a2f1-f33ba3a13d1a,PROD)",
csql={
"query": "SELECT user_id, source, user_source FROM (SELECT *, ROW_NUMBER() OVER (partition BY user_id ORDER BY __partition_day DESC) AS rank_ FROM invent_dw.UserDetail ) source_user WHERE rank_ = 1",
"isUnsupportedCustomSql": "true",
"database": {
"name": "my-bigquery-project",
"connectionType": "bigquery",
},
},
out_columns=[],
)

mcp = cast(MetadataChangeProposalClass, next(iter(lineage)).metadata)

assert mcp.aspect == UpstreamLineage(
upstreams=[
UpstreamClass(
dataset="urn:li:dataset:(urn:li:dataPlatform:bigquery,my_bigquery_project.invent_dw.userdetail,PROD)",
dataset=expected_upstream_table,
type=DatasetLineageType.TRANSFORMED,
)
],
fineGrainedLineages=[],
)
assert (
mcp.entityUrn
== "urn:li:dataset:(urn:li:dataPlatform:tableau,09988088-05ad-173c-a2f1-f33ba3a13d1a,PROD)"
fineGrainedLineages=[
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
upstreams=[
make_schema_field_urn(expected_upstream_table, upstream_column)
],
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[
make_schema_field_urn(expected_entity_urn, downstream_column)
],
)
for upstream_column, downstream_column in expected_cll.items()
],
)
assert mcp.entityUrn == expected_entity_urn

csql_urn = "urn:li:dataset:(urn:li:dataPlatform:tableau,09988088-05ad-173c-a2f1-f33ba3a13d1a,PROD)"
expected_upstream_table = "urn:li:dataset:(urn:li:dataPlatform:bigquery,my_bigquery_project.invent_dw.UserDetail,PROD)"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to fix the urn lowercasing behavior here - it's not really clear to me why that changed to begin with

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are considering bigquery as platform with case sensitive tables, hence urn contain table without lowercase.

https://github.com/shubhamjagtap639/datahub/blob/725c85815bfc83e2accf1bbb5fa2c9be637dde88/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_common.py#L8

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok that's fine

expected_cll = {
"user_id": "user_id",
"source": "source",
"user_source": "user_source",
}

source = TableauSource(config=config, ctx=context)

lineage = source._create_lineage_from_unsupported_csql(
csql_urn=csql_urn,
csql={
"query": "SELECT user_id, source, user_source FROM (SELECT *, ROW_NUMBER() OVER (partition BY user_id ORDER BY __partition_day DESC) AS rank_ FROM invent_dw.UserDetail ) source_user WHERE rank_ = 1",
"isUnsupportedCustomSql": "true",
"connectionType": "bigquery",
"database": {
"name": "my_bigquery_project",
"connectionType": "bigquery",
},
},
out_columns=[],
)
test_lineage_metadata(
lineage=lineage,
expected_entity_urn=csql_urn,
expected_upstream_table=expected_upstream_table,
expected_cll=expected_cll,
)

# With database as None
lineage = source._create_lineage_from_unsupported_csql(
csql_urn=csql_urn,
csql={
"query": "SELECT user_id, source, user_source FROM (SELECT *, ROW_NUMBER() OVER (partition BY user_id ORDER BY __partition_day DESC) AS rank_ FROM my_bigquery_project.invent_dw.UserDetail ) source_user WHERE rank_ = 1",
"isUnsupportedCustomSql": "true",
"connectionType": "bigquery",
"database": None,
},
out_columns=[],
)
test_lineage_metadata(
lineage=lineage,
expected_entity_urn=csql_urn,
expected_upstream_table=expected_upstream_table,
expected_cll=expected_cll,
)


@freeze_time(FROZEN_TIME)
Expand Down
Loading