Skip to content

Commit

Permalink
fix(GE): fix dependencies for GE DataHubValidationAction, logic for s… (
Browse files Browse the repository at this point in the history
  • Loading branch information
mayurinehate authored and maggiehays committed Aug 1, 2022
1 parent 105fb84 commit 1dced84
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 9 deletions.
8 changes: 3 additions & 5 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,7 @@ def get_long_description():
"cryptography",
}

microsoft_common = {
"msal==1.16.0"
}
microsoft_common = {"msal==1.16.0"}

data_lake_base = {
*aws_common,
Expand All @@ -129,7 +127,7 @@ def get_long_description():
"airflow": {
"apache-airflow >= 1.10.2",
},
"great-expectations": sql_common,
"great-expectations": sql_common | {"sqllineage==1.3.3"},
# Source plugins
# PyAthena is pinned with exact version because we use private method in PyAthena
"athena": sql_common | {"PyAthena[SQLAlchemy]==2.4.1"},
Expand Down Expand Up @@ -188,7 +186,7 @@ def get_long_description():
"trino": sql_common | {"trino"},
"starburst-trino-usage": sql_common | {"trino"},
"nifi": {"requests", "packaging"},
"powerbi": {"orderedset"} | microsoft_common
"powerbi": {"orderedset"} | microsoft_common,
}

all_exclude_plugins: Set[str] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
SqlAlchemyExecutionEngine,
)
from great_expectations.validator.validator import Validator
from sqlalchemy.engine.base import Connection, Engine
from sqlalchemy.engine.url import make_url

import datahub.emitter.mce_builder as builder
Expand Down Expand Up @@ -550,13 +551,20 @@ def get_dataset_partitions(self, batch_identifier, data_asset):
data_asset.active_batch_definition.datasource_name
),
}
sqlalchemy_uri = None
if isinstance(data_asset.execution_engine.engine, Engine):
sqlalchemy_uri = data_asset.execution_engine.engine.url
# For snowflake sqlalchemy_execution_engine.engine is actually instance of Connection
elif isinstance(data_asset.execution_engine.engine, Connection):
sqlalchemy_uri = data_asset.execution_engine.engine.engine.url

if isinstance(ge_batch_spec, SqlAlchemyDatasourceBatchSpec):
# e.g. ConfiguredAssetSqlDataConnector with splitter_method or sampling_method
schema_name = ge_batch_spec.get("schema_name")
table_name = ge_batch_spec.get("table_name")

dataset_urn = make_dataset_urn_from_sqlalchemy_uri(
data_asset.execution_engine.engine.url,
sqlalchemy_uri,
schema_name,
table_name,
self.env,
Expand Down Expand Up @@ -609,7 +617,7 @@ def get_dataset_partitions(self, batch_identifier, data_asset):
)
for table in tables:
dataset_urn = make_dataset_urn_from_sqlalchemy_uri(
data_asset.execution_engine.engine.url,
sqlalchemy_uri,
None,
table,
self.env,
Expand Down Expand Up @@ -640,7 +648,7 @@ def get_dataset_partitions(self, batch_identifier, data_asset):
def get_platform_instance(self, datasource_name):
if self.platform_instance_map and datasource_name in self.platform_instance_map:
return self.platform_instance_map[datasource_name]
if datasource_name:
if self.platform_instance_map:
warn(
f"Datasource {datasource_name} is not present in platform_instance_map"
)
Expand Down Expand Up @@ -680,7 +688,15 @@ def make_dataset_urn_from_sqlalchemy_uri(
for {data_platform}."
)
return None
schema_name = "{}.{}".format(url_instance.database, schema_name)
# If data platform is snowflake, we artificially lowercase the Database name.
# This is because DataHub also does this during ingestion.
# Ref: https://github.com/linkedin/datahub/blob/master/metadata-ingestion%2Fsrc%2Fdatahub%2Fingestion%2Fsource%2Fsql%2Fsnowflake.py#L272
schema_name = "{}.{}".format(
url_instance.database.lower()
if data_platform == "snowflake"
else url_instance.database,
schema_name,
)
elif data_platform == "bigquery":
if url_instance.host is None or url_instance.database is None:
warn(
Expand All @@ -705,6 +721,7 @@ def make_dataset_urn_from_sqlalchemy_uri(
platform_instance=platform_instance,
env=env,
)

return dataset_urn


Expand Down

0 comments on commit 1dced84

Please sign in to comment.