Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/snowflake): support more than 10k views in a db #10718

Merged
merged 5 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from datahub.ingestion.source.snowflake.constants import SnowflakeObjectDomain
from datahub.ingestion.source.snowflake.snowflake_config import DEFAULT_TABLES_DENY_LIST

SHOW_VIEWS_MAX_PAGE_SIZE = 10000


def create_deny_regex_sql_filter(
deny_pattern: List[str], filter_cols: List[str]
Expand Down Expand Up @@ -202,48 +204,28 @@ def get_tags_on_columns_with_propagation(
FROM table("{db_name}".information_schema.tag_references_all_columns('{quoted_table_identifier}', '{SnowflakeObjectDomain.TABLE}'));
"""

# View definition is retrived in information_schema query only if role is owner of view. Hence this query is not used.
# https://community.snowflake.com/s/article/Is-it-possible-to-see-the-view-definition-in-information-schema-views-from-a-non-owner-role
@staticmethod
def views_for_database(db_name: Optional[str]) -> str:
db_clause = f'"{db_name}".' if db_name is not None else ""
return f"""
SELECT table_catalog AS "TABLE_CATALOG",
table_schema AS "TABLE_SCHEMA",
table_name AS "TABLE_NAME",
created AS "CREATED",
last_altered AS "LAST_ALTERED",
comment AS "COMMENT",
view_definition AS "VIEW_DEFINITION"
FROM {db_clause}information_schema.views t
WHERE table_schema != 'INFORMATION_SCHEMA'
order by table_schema, table_name"""

# View definition is retrived in information_schema query only if role is owner of view. Hence this query is not used.
# https://community.snowflake.com/s/article/Is-it-possible-to-see-the-view-definition-in-information-schema-views-from-a-non-owner-role
@staticmethod
def views_for_schema(schema_name: str, db_name: Optional[str]) -> str:
db_clause = f'"{db_name}".' if db_name is not None else ""
return f"""
SELECT table_catalog AS "TABLE_CATALOG",
table_schema AS "TABLE_SCHEMA",
table_name AS "TABLE_NAME",
created AS "CREATED",
last_altered AS "LAST_ALTERED",
comment AS "COMMENT",
view_definition AS "VIEW_DEFINITION"
FROM {db_clause}information_schema.views t
where table_schema='{schema_name}'
order by table_schema, table_name"""
def show_views_for_database(
db_name: str,
limit: int = SHOW_VIEWS_MAX_PAGE_SIZE,
view_pagination_marker: Optional[str] = None,
) -> str:
# While there is an information_schema.views view, that only shows the view definition if the role
# is an owner of the view. That doesn't work for us.
# https://community.snowflake.com/s/article/Is-it-possible-to-see-the-view-definition-in-information-schema-views-from-a-non-owner-role

@staticmethod
def show_views_for_database(db_name: str) -> str:
return f"""show views in database "{db_name}";"""
# SHOW VIEWS can return a maximum of 10000 rows.
# https://docs.snowflake.com/en/sql-reference/sql/show-views#usage-notes
assert limit <= SHOW_VIEWS_MAX_PAGE_SIZE

@staticmethod
def show_views_for_schema(schema_name: str, db_name: Optional[str]) -> str:
db_clause = f'"{db_name}".' if db_name is not None else ""
return f"""show views in schema {db_clause}"{schema_name}";"""
# To work around this, we paginate through the results using the FROM clause.
from_clause = (
f"""FROM '{view_pagination_marker}'""" if view_pagination_marker else ""
)
return f"""\
SHOW VIEWS IN DATABASE "{db_name}"
LIMIT {limit} {from_clause};
"""

@staticmethod
def columns_for_schema(schema_name: str, db_name: Optional[str]) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ class SnowflakeV2Report(
# "Information schema query returned too much data. Please repeat query with more selective predicates.""
# This will result in overall increase in time complexity
num_get_tables_for_schema_queries: int = 0
num_get_views_for_schema_queries: int = 0
num_get_columns_for_table_queries: int = 0

# these will be non-zero if the user choses to enable the extract_tags = "with_lineage" option, which requires
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@

from datahub.ingestion.api.report import SupportsAsObj
from datahub.ingestion.source.snowflake.constants import SnowflakeObjectDomain
from datahub.ingestion.source.snowflake.snowflake_query import SnowflakeQuery
from datahub.ingestion.source.snowflake.snowflake_query import (
SHOW_VIEWS_MAX_PAGE_SIZE,
SnowflakeQuery,
)
from datahub.ingestion.source.snowflake.snowflake_utils import SnowflakeQueryMixin
from datahub.ingestion.source.sql.sql_generic import BaseColumn, BaseTable, BaseView
from datahub.utilities.serialized_lru_cache import serialized_lru_cache
Expand Down Expand Up @@ -324,53 +327,54 @@ def get_tables_for_schema(
return tables

@serialized_lru_cache(maxsize=1)
def get_views_for_database(
self, db_name: str
) -> Optional[Dict[str, List[SnowflakeView]]]:
def get_views_for_database(self, db_name: str) -> Dict[str, List[SnowflakeView]]:
page_limit = SHOW_VIEWS_MAX_PAGE_SIZE

views: Dict[str, List[SnowflakeView]] = {}
try:
cur = self.query(SnowflakeQuery.show_views_for_database(db_name))
except Exception as e:
logger.debug(
f"Failed to get all views for database - {db_name}", exc_info=e
)
# Error - Information schema query returned too much data. Please repeat query with more selective predicates.
return None

for table in cur:
if table["schema_name"] not in views:
views[table["schema_name"]] = []
views[table["schema_name"]].append(
SnowflakeView(
name=table["name"],
created=table["created_on"],
# last_altered=table["last_altered"],
comment=table["comment"],
view_definition=table["text"],
last_altered=table["created_on"],
materialized=table.get("is_materialized", "false").lower()
== "true",
first_iteration = True
view_pagination_marker: Optional[str] = None
while first_iteration or view_pagination_marker is not None:
cur = self.query(
SnowflakeQuery.show_views_for_database(
db_name,
limit=page_limit,
view_pagination_marker=view_pagination_marker,
)
)
return views

def get_views_for_schema(
self, schema_name: str, db_name: str
) -> List[SnowflakeView]:
views: List[SnowflakeView] = []
first_iteration = False
view_pagination_marker = None

result_set_size = 0
for view in cur:
result_set_size += 1

view_name = view["name"]
schema_name = view["schema_name"]
if schema_name not in views:
views[schema_name] = []
views[schema_name].append(
SnowflakeView(
name=view_name,
created=view["created_on"],
# last_altered=table["last_altered"],
comment=view["comment"],
view_definition=view["text"],
last_altered=view["created_on"],
materialized=(
view.get("is_materialized", "false").lower() == "true"
),
)
)

cur = self.query(SnowflakeQuery.show_views_for_schema(schema_name, db_name))
for table in cur:
views.append(
SnowflakeView(
name=table["name"],
created=table["created_on"],
# last_altered=table["last_altered"],
comment=table["comment"],
view_definition=table["text"],
last_altered=table["created_on"],
if result_set_size >= page_limit:
# If we hit the limit, we need to send another request to get the next page.
logger.info(
f"Fetching next page of views for {db_name} - after {view_name}"
)
)
view_pagination_marker = view_name

return views

@serialized_lru_cache(maxsize=SCHEMA_PARALLELISM)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1011,12 +1011,6 @@ def get_views_for_schema(
) -> List[SnowflakeView]:
views = self.data_dictionary.get_views_for_database(db_name)

# get all views for database failed,
# falling back to get views for schema
if views is None:
self.report.num_get_views_for_schema_queries += 1
return self.data_dictionary.get_views_for_schema(schema_name, db_name)

# Some schema may not have any table
return views.get(schema_name, [])

Expand Down
5 changes: 2 additions & 3 deletions metadata-ingestion/tests/integration/snowflake/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,6 @@ def default_query_results( # noqa: C901
]
elif query == SnowflakeQuery.tables_for_database("TEST_DB"):
raise Exception("Information schema query returned too much data")
elif query == SnowflakeQuery.show_views_for_database("TEST_DB"):
raise Exception("Information schema query returned too much data")
elif query == SnowflakeQuery.tables_for_schema("TEST_SCHEMA", "TEST_DB"):
return [
{
Expand All @@ -241,7 +239,8 @@ def default_query_results( # noqa: C901
}
for tbl_idx in range(1, num_tables + 1)
]
elif query == SnowflakeQuery.show_views_for_schema("TEST_SCHEMA", "TEST_DB"):
elif query == SnowflakeQuery.show_views_for_database("TEST_DB"):
# TODO: Add tests for view pagination.
return [
{
"schema_name": "TEST_SCHEMA",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def test_snowflake_no_tables_causes_pipeline_failure(
)
sf_cursor.execute.side_effect = query_permission_response_override(
no_tables_fn,
[SnowflakeQuery.show_views_for_schema("TEST_SCHEMA", "TEST_DB")],
[SnowflakeQuery.show_views_for_database("TEST_DB")],
[],
)

Expand Down
Loading