Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
betodealmeida committed Nov 8, 2023
1 parent d8ff407 commit 8322c30
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 25 deletions.
32 changes: 16 additions & 16 deletions superset/db_engine_specs/trino.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import contextlib
import logging
import threading
import time
from typing import Any, TYPE_CHECKING

import simplejson as json
Expand Down Expand Up @@ -151,13 +152,7 @@ def get_tracking_url(cls, cursor: Cursor) -> str | None:
return None

@classmethod
def handle_cursor_with_query_id(
cls,
cursor: Cursor,
query: Query,
session: Session,
cancel_query_id: str,
) -> None:
def handle_cursor(cls, cursor: Cursor, query: Query, session: Session) -> None:
"""
Handle a trino client cursor.
Expand All @@ -167,6 +162,7 @@ def handle_cursor_with_query_id(
"""

# Adds the executed query id to the extra payload so the query can be cancelled
cancel_query_id = cursor.query_id
logger.debug("Query %d: queryId %s found in cursor", query.id, cancel_query_id)
query.set_extra_json_key(key=QUERY_CANCEL_KEY, value=cancel_query_id)

Expand All @@ -184,15 +180,11 @@ def handle_cursor_with_query_id(
cancel_query_id=cancel_query_id,
)

cls.handle_cursor(cursor=cursor, query=query, session=session)
super().handle_cursor(cursor=cursor, query=query, session=session)

@classmethod
def execute_with_cursor(
cls,
cursor: Cursor,
sql: str,
query: Query,
session: Session,
cls, cursor: Cursor, sql: str, query: Query, session: Session
) -> None:
"""
Trigger execution of a query and handle the resulting cursor.
Expand All @@ -201,7 +193,7 @@ def execute_with_cursor(
in another thread and invoke `handle_cursor` to poll for the query ID
to appear on the cursor in parallel.
"""
# Fetch the query ID before hand, since it might fail inside the thread due to
# Fetch the query ID beforehand, since it might fail inside the thread due to
# how the SQLAlchemy session is handled.
query_id = query.id

Expand All @@ -223,10 +215,18 @@ def _execute(results: dict[str, Any], event: threading.Event) -> None:
args=(execute_result, execute_event),
)
execute_thread.start()
execute_event.wait()

# Wait for a query ID to be available before handling the cursor, as
# it's required by that method; it may never become available on error.
while not cursor.query_id and not execute_event.is_set():
time.sleep(0.1)

logger.debug("Query %d: Handling cursor", query_id)
cls.handle_cursor_with_query_id(cursor, query, session, query_id)
cls.handle_cursor(cursor, query, session)

# Block until the query completes; same behaviour as the client itself
logger.debug("Query %d: Waiting for query to complete", query_id)
execute_event.wait()

# Unfortunately we'll mangle the stack trace due to the thread, but
# throwing the original exception allows mapping database errors as normal
Expand Down
11 changes: 2 additions & 9 deletions tests/unit_tests/db_engine_specs/test_trino.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,12 +360,7 @@ def test_handle_cursor_early_cancel(
if cancel_early:
TrinoEngineSpec.prepare_cancel_query(query=query, session=session_mock)

TrinoEngineSpec.handle_cursor_with_query_id(
cursor=cursor_mock,
query=query,
session=session_mock,
cancel_query_id=query_id,
)
TrinoEngineSpec.handle_cursor(cursor=cursor_mock, query=query, session=session_mock)

if cancel_early:
assert cancel_query_mock.call_args[1]["cancel_query_id"] == query_id
Expand All @@ -383,7 +378,6 @@ def test_execute_with_cursor_in_parallel(mocker: MockerFixture):
mock_cursor.query_id = None

mock_query = mocker.MagicMock()
mock_query.id = query_id
mock_session = mocker.MagicMock()

def _mock_execute(*args, **kwargs):
Expand All @@ -399,6 +393,5 @@ def _mock_execute(*args, **kwargs):
)

mock_query.set_extra_json_key.assert_called_once_with(
key=QUERY_CANCEL_KEY,
value=query_id,
key=QUERY_CANCEL_KEY, value=query_id
)

0 comments on commit 8322c30

Please sign in to comment.