-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make schema name for the CTA queries and limit configurable #8867
Changes from 10 commits
357db81
0f4d5a1
763a8ef
93a393f
6e7dc8c
abe1bc7
2df033a
689967c
e2a96e5
6661e39
4cc14b2
802fcae
6c72dcc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,7 +28,7 @@ | |
import sys | ||
from collections import OrderedDict | ||
from datetime import date | ||
from typing import Any, Callable, Dict, List, Optional | ||
from typing import Any, Callable, Dict, List, Optional, TYPE_CHECKING | ||
|
||
from celery.schedules import crontab | ||
from dateutil import tz | ||
|
@@ -41,6 +41,9 @@ | |
|
||
logger = logging.getLogger(__name__) | ||
|
||
if TYPE_CHECKING: | ||
from flask_appbuilder.security.sqla import models # pylint: disable=unused-import | ||
from superset.models.core import Database # pylint: disable=unused-import | ||
|
||
# Realtime stats logger, a StatsD implementation exists | ||
STATS_LOGGER = DummyStatsLogger() | ||
|
@@ -523,6 +526,33 @@ class CeleryConfig: # pylint: disable=too-few-public-methods | |
# timeout. | ||
SQLLAB_QUERY_COST_ESTIMATE_TIMEOUT = 10 # seconds | ||
|
||
# Flag that controls if limit should be enforced on the CTA (create table as queries). | ||
SQLLAB_CTAS_NO_LIMIT = False | ||
|
||
# This allows you to define custom logic around the "CREATE TABLE AS" or CTAS feature | ||
# in SQL Lab that defines where the target schema should be for a given user. | ||
# Database `CTAS Schema` has a precedence over this setting. | ||
# Example below returns a username and CTA queries will write tables into the schema | ||
# name `username` | ||
# SQLLAB_CTA_SCHEMA_NAME_FUNC = lambda database, user, schema, sql: user.username | ||
# This is move involved example where depending on the database you can leverage data | ||
# available to assign schema for the CTA query: | ||
# def compute_schema_name(database: Database, user: User, schema: str, sql: str) -> str: | ||
# if database.name == 'mysql_payments_slave': | ||
# return 'tmp_superset_schema' | ||
# if database.name == 'presto_gold': | ||
# return user.username | ||
# if database.name == 'analytics': | ||
# if 'analytics' in [r.name for r in user.roles]: | ||
# return 'analytics_cta' | ||
# else: | ||
# return f'tmp_{schema}' | ||
# Function accepts database object, user object, schema name and sql that will be run. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice comprehensive example 👍 |
||
SQLLAB_CTA_SCHEMA_NAME_FUNC = ( | ||
None | ||
) # type: Optional[Callable[["Database", "models.User", str, str], str]] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A bit of a nit, but as Superset has deprecated support for python 2.7, we prefer regular type annotations over type comments, i.e. SQLLAB_CTA_SCHEMA_NAME_FUNC: Optional[
Callable[["Database", "models.User", str, str], str]
] = None |
||
|
||
|
||
# An instantiated derivative of werkzeug.contrib.cache.BaseCache | ||
# if enabled, it can be used to store the results of long-running queries | ||
# in SQL Lab by using the "Run Async" button/feature | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
"""Add tmp_schema_name to the query object. | ||
|
||
Revision ID: 72428d1ea401 | ||
Revises: 0a6f12f60c73 | ||
Create Date: 2020-02-20 08:52:22.877902 | ||
|
||
""" | ||
|
||
# revision identifiers, used by Alembic. | ||
revision = "72428d1ea401" | ||
down_revision = "0a6f12f60c73" | ||
|
||
import sqlalchemy as sa | ||
from alembic import op | ||
|
||
|
||
def upgrade(): | ||
op.add_column( | ||
"query", sa.Column("tmp_schema_name", sa.String(length=256), nullable=True) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (not to other reviewers) At first I was confused by the choice of column name here, but turns out there was already |
||
) | ||
|
||
|
||
def downgrade(): | ||
try: | ||
# sqlite doesn't like dropping the columns | ||
op.drop_column("query", "tmp_schema_name") | ||
except Exception: | ||
pass |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -151,20 +151,28 @@ def __process_tokenlist(self, token_list: TokenList): | |
self._alias_names.add(token_list.tokens[0].value) | ||
self.__extract_from_token(token_list) | ||
|
||
def as_create_table(self, table_name: str, overwrite: bool = False) -> str: | ||
def as_create_table( | ||
self, | ||
table_name: str, | ||
schema_name: Optional[str] = None, | ||
overwrite: bool = False, | ||
) -> str: | ||
"""Reformats the query into the create table as query. | ||
|
||
Works only for the single select SQL statements, in all other cases | ||
the sql query is not modified. | ||
:param table_name: Table that will contain the results of the query execution | ||
:param table_name: table that will contain the results of the query execution | ||
:param schema_name: schema name for the target table | ||
:param overwrite: table_name will be dropped if true | ||
:return: Create table as query | ||
""" | ||
exec_sql = "" | ||
sql = self.stripped() | ||
# TODO(bkyryliuk): quote full_table_name | ||
full_table_name = f"{schema_name}.{table_name}" if schema_name else table_name | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn’t we address the TODO? Note the quoter needs to be dialect specific. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @john-bodley this would be an additional feature. I kept the logic as it was before. |
||
if overwrite: | ||
exec_sql = f"DROP TABLE IF EXISTS {table_name};\n" | ||
exec_sql += f"CREATE TABLE {table_name} AS \n{sql}" | ||
exec_sql = f"DROP TABLE IF EXISTS {full_table_name};\n" | ||
exec_sql += f"CREATE TABLE {full_table_name} AS \n{sql}" | ||
return exec_sql | ||
|
||
def __extract_from_token(self, token: Token): # pylint: disable=too-many-branches | ||
|
@@ -205,10 +213,12 @@ def __extract_from_token(self, token: Token): # pylint: disable=too-many-branch | |
if not self.__is_identifier(token2): | ||
self.__extract_from_token(item) | ||
|
||
def get_query_with_new_limit(self, new_limit: int) -> str: | ||
""" | ||
returns the query with the specified limit. | ||
Does not change the underlying query | ||
def set_or_update_query_limit(self, new_limit: int) -> str: | ||
"""Returns the query with the specified limit. | ||
|
||
Does not change the underlying query if user did not apply the limit, | ||
otherwise replaces the limit with the lower value between existing limit | ||
in the query and new_limit. | ||
|
||
:param new_limit: Limit to be incorporated into returned query | ||
:return: The original query with new limit | ||
|
@@ -223,7 +233,10 @@ def get_query_with_new_limit(self, new_limit: int) -> str: | |
limit_pos = pos | ||
break | ||
_, limit = statement.token_next(idx=limit_pos) | ||
if limit.ttype == sqlparse.tokens.Literal.Number.Integer: | ||
# Override the limit only when it exceeds the configured value. | ||
if limit.ttype == sqlparse.tokens.Literal.Number.Integer and new_limit < int( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Mmmh, this here in theory changes what the function is expected to do ("returns the query with the specified limit"), so either we change the name/docstring to reflect that, or either we move the conditional logic towards where the function is called. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated the docstring, yeah there is a mismatch. It's hard to move this condition outside of this function as it needs to get the existing limit value from the query and would require to parse query twice. I can't think about the usecase where we would want to override lower user limit with the higher configured value, e.g. I would expect to see 1 row when I query select * from bla limit 1 rather than 100. |
||
limit.value | ||
): | ||
limit.value = new_limit | ||
elif limit.is_group: | ||
limit.value = f"{next(limit.get_identifiers())}, {new_limit}" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,7 +19,7 @@ | |
import re | ||
from contextlib import closing | ||
from datetime import datetime, timedelta | ||
from typing import Any, cast, Dict, List, Optional, Union | ||
from typing import Any, Callable, cast, Dict, List, Optional, Union | ||
from urllib import parse | ||
|
||
import backoff | ||
|
@@ -73,6 +73,7 @@ | |
SupersetTimeoutException, | ||
) | ||
from superset.jinja_context import get_template_processor | ||
from superset.models.core import Database | ||
from superset.models.dashboard import Dashboard | ||
from superset.models.datasource_access_request import DatasourceAccessRequest | ||
from superset.models.slice import Slice | ||
|
@@ -243,6 +244,17 @@ def _deserialize_results_payload( | |
return json.loads(payload) # type: ignore | ||
|
||
|
||
def get_cta_schema_name( | ||
database: Database, user: ab_models.User, schema: str, sql: str | ||
) -> Optional[str]: | ||
func = config.get( | ||
"SQLLAB_CTA_SCHEMA_NAME_FUNC" | ||
) # type: Optional[Callable[[Database, ab_models.User, str, str], str]] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as above. Also, we prefer using square brackets when reading configs, i.e. |
||
if not func: | ||
return None | ||
return func(database, user, schema, sql) | ||
|
||
|
||
class AccessRequestsModelView(SupersetModelView, DeleteMixin): | ||
datamodel = SQLAInterface(DAR) | ||
include_route_methods = RouteMethod.CRUD_SET | ||
|
@@ -2334,9 +2346,14 @@ def sql_json_exec( | |
if not mydb: | ||
return json_error_response(f"Database with id {database_id} is missing.") | ||
|
||
# Set tmp_table_name for CTA | ||
# Set tmp_schema_name for CTA | ||
# TODO(bkyryliuk): consider parsing, splitting tmp_schema_name from tmp_table_name if user enters | ||
# <schema_name>.<table_name> | ||
tmp_schema_name = schema # type: Optional[str] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here |
||
if select_as_cta and mydb.force_ctas_schema: | ||
tmp_table_name = f"{mydb.force_ctas_schema}.{tmp_table_name}" | ||
tmp_schema_name = mydb.force_ctas_schema | ||
elif select_as_cta: | ||
tmp_schema_name = get_cta_schema_name(mydb, g.user, schema, sql) | ||
|
||
# Save current query | ||
query = Query( | ||
|
@@ -2349,6 +2366,7 @@ def sql_json_exec( | |
status=status, | ||
sql_editor_id=sql_editor_id, | ||
tmp_table_name=tmp_table_name, | ||
tmp_schema_name=tmp_schema_name, | ||
user_id=g.user.get_id() if g.user else None, | ||
client_id=client_id, | ||
) | ||
|
@@ -2389,9 +2407,11 @@ def sql_json_exec( | |
f"Query {query_id}: Template rendering failed: {error_msg}" | ||
) | ||
|
||
# set LIMIT after template processing | ||
limits = [mydb.db_engine_spec.get_limit_from_sql(rendered_query), limit] | ||
query.limit = min(lim for lim in limits if lim is not None) | ||
# Limit is not applied to the CTA queries if SQLLAB_CTAS_NO_LIMIT flag is set to True. | ||
if not (config.get("SQLLAB_CTAS_NO_LIMIT") and select_as_cta): | ||
# set LIMIT after template processing | ||
limits = [mydb.db_engine_spec.get_limit_from_sql(rendered_query), limit] | ||
query.limit = min(lim for lim in limits if lim is not None) | ||
|
||
# Flag for whether or not to expand data | ||
# (feature that will expand Presto row objects and arrays) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It’s not clear from this PR why we need these additional two databases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@john-bodley this is purely for testing, e.g. there is a need to have 2 different schemas in the mysql & postgres to test the CTA behavior