Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): add snowflake-queries source #10835

Merged
merged 35 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
e558177
add preparsed_query abstraction
hsheth2 Jul 2, 2024
ff937f7
add query stats + fixes
hsheth2 Jul 2, 2024
8bed4dc
emit columns in query subjects
hsheth2 Jul 3, 2024
5ad2963
add setup.py
hsheth2 Jul 3, 2024
bc625dc
fix some lint issues
hsheth2 Jul 3, 2024
a8b1c7b
add a SnowflakeConnection wrapper class
hsheth2 Jul 4, 2024
890d63f
remove SnowflakeLoggingProtocol
hsheth2 Jul 4, 2024
ae86094
make snowflake_connection have config info + stop using source_config…
hsheth2 Jul 4, 2024
8827d10
tweak type annotations
hsheth2 Jul 4, 2024
3d21110
add configurability
hsheth2 Jul 8, 2024
b210399
Merge branch 'master' into snowflake-queries-oss
hsheth2 Jul 8, 2024
2205961
improve domain filtering
hsheth2 Jul 8, 2024
4edfce2
add parser exception handling
hsheth2 Jul 8, 2024
8b1eaf1
add additional lines
hsheth2 Jul 9, 2024
605279a
refactor snowsight url generation, create SnowsightUrlBuilder
hsheth2 Jul 9, 2024
de8f108
add schema fields to query subjects
hsheth2 Jul 9, 2024
0de982c
fix unexpected queries in tests
hsheth2 Jul 10, 2024
287d2b6
refactor snowflake configs
hsheth2 Jul 9, 2024
83ba873
tweak ordering of classes
hsheth2 Jul 10, 2024
1794feb
more config refactoring
hsheth2 Jul 10, 2024
c52cf05
more refactoring
hsheth2 Jul 10, 2024
a15f966
add gen urn to identifier mixin
hsheth2 Jul 10, 2024
ca5c0bc
add SnowflakeQueriesExtractor interface
hsheth2 Jul 10, 2024
f0b8e79
improve warnings
hsheth2 Jul 10, 2024
a76e015
add filters support
hsheth2 Jul 10, 2024
b789e3f
add schema fields to aggregator goldens
hsheth2 Jul 10, 2024
69e782a
fix test mock
hsheth2 Jul 11, 2024
0533c83
fix table filtering logic
hsheth2 Jul 11, 2024
c8c9ac5
fix filtering logic
hsheth2 Jul 11, 2024
4c4fb22
fix dup subjects for merge statements
hsheth2 Jul 11, 2024
2e31235
fix lint
hsheth2 Jul 11, 2024
a23f86c
fix(build): upgrade vercel builds to Node 20.x
hsheth2 Jul 11, 2024
9d43343
fix import
hsheth2 Jul 11, 2024
8a1f98d
Merge branch 'master' into snowflake-queries-oss
hsheth2 Jul 11, 2024
95d1ea2
Merge branch 'master' into snowflake-queries-oss
hsheth2 Jul 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@
"salesforce": {"simple-salesforce"},
"snowflake": snowflake_common | usage_common | sqlglot_lib,
"snowflake-summary": snowflake_common | usage_common | sqlglot_lib,
"snowflake-queries": snowflake_common | usage_common | sqlglot_lib,
"sqlalchemy": sql_common,
"sql-queries": usage_common | sqlglot_lib,
"slack": slack,
Expand Down Expand Up @@ -661,6 +662,7 @@
"slack = datahub.ingestion.source.slack.slack:SlackSource",
"snowflake = datahub.ingestion.source.snowflake.snowflake_v2:SnowflakeV2Source",
"snowflake-summary = datahub.ingestion.source.snowflake.snowflake_summary:SnowflakeSummarySource",
"snowflake-queries = datahub.ingestion.source.snowflake.snowflake_queries:SnowflakeQueriesSource",
"superset = datahub.ingestion.source.superset:SupersetSource",
"tableau = datahub.ingestion.source.tableau:TableauSource",
"openapi = datahub.ingestion.source.openapi:OpenApiSource",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from datahub.sql_parsing.sql_parsing_aggregator import (
ColumnLineageInfo,
ColumnRef,
KnownLineageMapping,
KnownQueryLineageInfo,
SqlParsingAggregator,
UrnStr,
Expand Down Expand Up @@ -264,13 +265,20 @@ def _populate_external_upstreams(self, discovered_tables: List[str]) -> None:
with PerfTimer() as timer:
self.report.num_external_table_edges_scanned = 0

self._populate_external_lineage_from_copy_history(discovered_tables)
for (
known_lineage_mapping
) in self._populate_external_lineage_from_copy_history(discovered_tables):
self.sql_aggregator.add(known_lineage_mapping)
logger.info(
"Done populating external lineage from copy history. "
f"Found {self.report.num_external_table_edges_scanned} external lineage edges so far."
)

self._populate_external_lineage_from_show_query(discovered_tables)
for (
known_lineage_mapping
) in self._populate_external_lineage_from_show_query(discovered_tables):
self.sql_aggregator.add(known_lineage_mapping)

logger.info(
"Done populating external lineage from show external tables. "
f"Found {self.report.num_external_table_edges_scanned} external lineage edges so far."
Expand All @@ -282,7 +290,7 @@ def _populate_external_upstreams(self, discovered_tables: List[str]) -> None:
# NOTE: Snowflake does not log this information to the access_history table.
def _populate_external_lineage_from_show_query(
self, discovered_tables: List[str]
) -> None:
) -> Iterable[KnownLineageMapping]:
external_tables_query: str = SnowflakeQuery.show_external_tables()
try:
for db_row in self.query(external_tables_query):
Expand All @@ -293,11 +301,11 @@ def _populate_external_lineage_from_show_query(
if key not in discovered_tables:
continue
if db_row["location"].startswith("s3://"):
self.sql_aggregator.add_known_lineage_mapping(
downstream_urn=self.dataset_urn_builder(key),
yield KnownLineageMapping(
upstream_urn=make_s3_urn_for_lineage(
db_row["location"], self.config.env
),
downstream_urn=self.dataset_urn_builder(key),
)
self.report.num_external_table_edges_scanned += 1

Expand All @@ -316,7 +324,7 @@ def _populate_external_lineage_from_show_query(
# NOTE: Snowflake does not log this information to the access_history table.
def _populate_external_lineage_from_copy_history(
self, discovered_tables: List[str]
) -> None:
) -> Iterable[KnownLineageMapping]:
query: str = SnowflakeQuery.copy_lineage_history(
start_time_millis=int(self.start_time.timestamp() * 1000),
end_time_millis=int(self.end_time.timestamp() * 1000),
Expand All @@ -325,7 +333,11 @@ def _populate_external_lineage_from_copy_history(

try:
for db_row in self.query(query):
self._process_external_lineage_result_row(db_row, discovered_tables)
known_lineage_mapping = self._process_external_lineage_result_row(
db_row, discovered_tables
)
if known_lineage_mapping:
yield known_lineage_mapping
except Exception as e:
if isinstance(e, SnowflakePermissionError):
error_msg = "Failed to get external lineage. Please grant imported privileges on SNOWFLAKE database. "
Expand All @@ -340,7 +352,7 @@ def _populate_external_lineage_from_copy_history(

def _process_external_lineage_result_row(
self, db_row: dict, discovered_tables: List[str]
) -> None:
) -> Optional[KnownLineageMapping]:
# key is the down-stream table name
key: str = self.get_dataset_identifier_from_qualified_name(
db_row["DOWNSTREAM_TABLE_NAME"]
Expand All @@ -353,11 +365,11 @@ def _process_external_lineage_result_row(

for loc in external_locations:
if loc.startswith("s3://"):
self.sql_aggregator.add_known_lineage_mapping(
downstream_urn=self.dataset_urn_builder(key),
self.report.num_external_table_edges_scanned += 1
return KnownLineageMapping(
upstream_urn=make_s3_urn_for_lineage(loc, self.config.env),
downstream_urn=self.dataset_urn_builder(key),
)
self.report.num_external_table_edges_scanned += 1

def _fetch_upstream_lineages_for_tables(self) -> Iterable[UpstreamLineageEdge]:
query: str = SnowflakeQuery.table_to_table_lineage_history_v2(
Expand Down
Loading
Loading