-
Notifications
You must be signed in to change notification settings - Fork 14.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make schema name for the CTA queries and limit configurable #8867
Changes from all commits
357db81
0f4d5a1
763a8ef
93a393f
6e7dc8c
abe1bc7
2df033a
689967c
e2a96e5
6661e39
4cc14b2
802fcae
6c72dcc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
"""Add tmp_schema_name to the query object. | ||
|
||
Revision ID: 72428d1ea401 | ||
Revises: 0a6f12f60c73 | ||
Create Date: 2020-02-20 08:52:22.877902 | ||
|
||
""" | ||
|
||
# revision identifiers, used by Alembic. | ||
revision = "72428d1ea401" | ||
down_revision = "0a6f12f60c73" | ||
|
||
import sqlalchemy as sa | ||
from alembic import op | ||
|
||
|
||
def upgrade(): | ||
op.add_column( | ||
"query", sa.Column("tmp_schema_name", sa.String(length=256), nullable=True) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (not to other reviewers) At first I was confused by the choice of column name here, but turns out there was already |
||
) | ||
|
||
|
||
def downgrade(): | ||
try: | ||
# sqlite doesn't like dropping the columns | ||
op.drop_column("query", "tmp_schema_name") | ||
except Exception: | ||
pass |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -151,20 +151,28 @@ def __process_tokenlist(self, token_list: TokenList): | |
self._alias_names.add(token_list.tokens[0].value) | ||
self.__extract_from_token(token_list) | ||
|
||
def as_create_table(self, table_name: str, overwrite: bool = False) -> str: | ||
def as_create_table( | ||
self, | ||
table_name: str, | ||
schema_name: Optional[str] = None, | ||
overwrite: bool = False, | ||
) -> str: | ||
"""Reformats the query into the create table as query. | ||
|
||
Works only for the single select SQL statements, in all other cases | ||
the sql query is not modified. | ||
:param table_name: Table that will contain the results of the query execution | ||
:param table_name: table that will contain the results of the query execution | ||
:param schema_name: schema name for the target table | ||
:param overwrite: table_name will be dropped if true | ||
:return: Create table as query | ||
""" | ||
exec_sql = "" | ||
sql = self.stripped() | ||
# TODO(bkyryliuk): quote full_table_name | ||
full_table_name = f"{schema_name}.{table_name}" if schema_name else table_name | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn’t we address the TODO? Note the quoter needs to be dialect specific. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @john-bodley this would be an additional feature. I kept the logic as it was before. |
||
if overwrite: | ||
exec_sql = f"DROP TABLE IF EXISTS {table_name};\n" | ||
exec_sql += f"CREATE TABLE {table_name} AS \n{sql}" | ||
exec_sql = f"DROP TABLE IF EXISTS {full_table_name};\n" | ||
exec_sql += f"CREATE TABLE {full_table_name} AS \n{sql}" | ||
return exec_sql | ||
|
||
def __extract_from_token(self, token: Token): # pylint: disable=too-many-branches | ||
|
@@ -205,10 +213,12 @@ def __extract_from_token(self, token: Token): # pylint: disable=too-many-branch | |
if not self.__is_identifier(token2): | ||
self.__extract_from_token(item) | ||
|
||
def get_query_with_new_limit(self, new_limit: int) -> str: | ||
""" | ||
returns the query with the specified limit. | ||
Does not change the underlying query | ||
def set_or_update_query_limit(self, new_limit: int) -> str: | ||
"""Returns the query with the specified limit. | ||
|
||
Does not change the underlying query if user did not apply the limit, | ||
otherwise replaces the limit with the lower value between existing limit | ||
in the query and new_limit. | ||
|
||
:param new_limit: Limit to be incorporated into returned query | ||
:return: The original query with new limit | ||
|
@@ -223,7 +233,10 @@ def get_query_with_new_limit(self, new_limit: int) -> str: | |
limit_pos = pos | ||
break | ||
_, limit = statement.token_next(idx=limit_pos) | ||
if limit.ttype == sqlparse.tokens.Literal.Number.Integer: | ||
# Override the limit only when it exceeds the configured value. | ||
if limit.ttype == sqlparse.tokens.Literal.Number.Integer and new_limit < int( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Mmmh, this here in theory changes what the function is expected to do ("returns the query with the specified limit"), so either we change the name/docstring to reflect that, or either we move the conditional logic towards where the function is called. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated the docstring, yeah there is a mismatch. It's hard to move this condition outside of this function as it needs to get the existing limit value from the query and would require to parse query twice. I can't think about the usecase where we would want to override lower user limit with the higher configured value, e.g. I would expect to see 1 row when I query select * from bla limit 1 rather than 100. |
||
limit.value | ||
): | ||
limit.value = new_limit | ||
elif limit.is_group: | ||
limit.value = f"{next(limit.get_identifiers())}, {new_limit}" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It’s not clear from this PR why we need these additional two databases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@john-bodley this is purely for testing, e.g. there is a need to have 2 different schemas in the mysql & postgres to test the CTA behavior