Skip to content

Commit

Permalink
chore: Migrate /superset/queries/<last_updated_ms> to API v1 (#22611)
Browse files Browse the repository at this point in the history
  • Loading branch information
diegomedina248 authored Jan 27, 2023
1 parent d00ba15 commit 14878a1
Show file tree
Hide file tree
Showing 9 changed files with 1,145 additions and 50 deletions.
976 changes: 934 additions & 42 deletions docs/static/resources/openapi.json

Large diffs are not rendered by default.

20 changes: 15 additions & 5 deletions superset-frontend/src/SqlLab/components/QueryAutoRefresh/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
*/
import { useState } from 'react';
import { isObject } from 'lodash';
import rison from 'rison';
import {
SupersetClient,
Query,
runningQueryStateList,
QueryResponse,
} from '@superset-ui/core';
import { QueryDictionary } from 'src/SqlLab/types';
import useInterval from 'src/SqlLab/utils/useInterval';
Expand Down Expand Up @@ -62,22 +64,30 @@ function QueryAutoRefresh({
refreshQueries,
queriesLastUpdate,
}: QueryAutoRefreshProps) {
// We do not want to spam requests in the case of slow connections and potentially recieve responses out of order
// We do not want to spam requests in the case of slow connections and potentially receive responses out of order
// pendingRequest check ensures we only have one active http call to check for query statuses
const [pendingRequest, setPendingRequest] = useState(false);

const checkForRefresh = () => {
if (!pendingRequest && shouldCheckForQueries(queries)) {
const params = rison.encode({
last_updated_ms: queriesLastUpdate - QUERY_UPDATE_BUFFER_MS,
});

setPendingRequest(true);
SupersetClient.get({
endpoint: `/superset/queries/${
queriesLastUpdate - QUERY_UPDATE_BUFFER_MS
}`,
endpoint: `/api/v1/query/updated_since?q=${params}`,
timeout: QUERY_TIMEOUT_LIMIT,
})
.then(({ json }) => {
if (json) {
refreshQueries?.(json);
const jsonPayload = json as { result?: QueryResponse[] };
const queries =
jsonPayload?.result?.reduce((acc, current) => {
acc[current.id] = current;
return acc;
}, {}) ?? {};
refreshQueries?.(queries);
}
})
.catch(() => {})
Expand Down
1 change: 1 addition & 0 deletions superset/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ class RouteMethod: # pylint: disable=too-few-public-methods
"get_data": "read",
"samples": "read",
"delete_ssh_tunnel": "write",
"get_updated_since": "read",
"stop_query": "read",
}

Expand Down
62 changes: 61 additions & 1 deletion superset/queries/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
# specific language governing permissions and limitations
# under the License.
import logging
from typing import Any

import backoff
from flask_appbuilder.api import expose, protect, request, safe
from flask_appbuilder.api import expose, protect, request, rison, safe
from flask_appbuilder.models.sqla.interface import SQLAInterface

from superset import db, event_logger
Expand All @@ -29,6 +30,7 @@
from superset.queries.filters import QueryFilter
from superset.queries.schemas import (
openapi_spec_methods_override,
queries_get_updated_since_schema,
QuerySchema,
StopQuerySchema,
)
Expand Down Expand Up @@ -59,6 +61,11 @@ class QueryRestApi(BaseSupersetModelRestApi):
RouteMethod.RELATED,
RouteMethod.DISTINCT,
"stop_query",
"get_updated_since",
}

apispec_parameter_schemas = {
"queries_get_updated_since_schema": queries_get_updated_since_schema,
}

list_columns = [
Expand Down Expand Up @@ -142,6 +149,59 @@ class QueryRestApi(BaseSupersetModelRestApi):
allowed_rel_fields = {"database", "user"}
allowed_distinct_fields = {"status"}

@expose("/updated_since")
@protect()
@safe
@rison(queries_get_updated_since_schema)
@statsd_metrics
@event_logger.log_this_with_context(
action=lambda self, *args, **kwargs: f"{self.__class__.__name__}"
f".get_updated_since",
log_to_statsd=False,
)
def get_updated_since(self, **kwargs: Any) -> FlaskResponse:
"""Get a list of queries that changed after last_updated_ms
---
get:
summary: Get a list of queries that changed after last_updated_ms
parameters:
- in: query
name: q
content:
application/json:
schema:
$ref: '#/components/schemas/queries_get_updated_since_schema'
responses:
200:
description: Queries list
content:
application/json:
schema:
type: object
properties:
result:
description: >-
A List of queries that changed after last_updated_ms
type: array
items:
$ref: '#/components/schemas/{{self.__class__.__name__}}.get'
400:
$ref: '#/components/responses/400'
401:
$ref: '#/components/responses/401'
404:
$ref: '#/components/responses/404'
500:
$ref: '#/components/responses/500'
"""
try:
last_updated_ms = kwargs["rison"].get("last_updated_ms", 0)
queries = QueryDAO.get_queries_changed_after(last_updated_ms)
payload = [q.to_dict() for q in queries]
return self.response(200, result=payload)
except SupersetException as ex:
return self.response(ex.status, message=ex.message)

@expose("/stop", methods=["POST"])
@protect()
@safe
Expand Down
14 changes: 13 additions & 1 deletion superset/queries/dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.
import logging
from datetime import datetime
from typing import Any, Dict
from typing import Any, Dict, List, Union

from superset import sql_lab
from superset.common.db_query_status import QueryStatus
Expand All @@ -25,6 +25,7 @@
from superset.extensions import db
from superset.models.sql_lab import Query, SavedQuery
from superset.queries.filters import QueryFilter
from superset.utils.core import get_user_id
from superset.utils.dates import now_as_float

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -61,6 +62,17 @@ def save_metadata(query: Query, payload: Dict[str, Any]) -> None:
db.session.add(query)
query.set_extra_json_key("columns", columns)

@staticmethod
def get_queries_changed_after(last_updated_ms: Union[float, int]) -> List[Query]:
# UTC date time, same that is stored in the DB.
last_updated_dt = datetime.utcfromtimestamp(last_updated_ms / 1000)

return (
db.session.query(Query)
.filter(Query.user_id == get_user_id(), Query.changed_on >= last_updated_dt)
.all()
)

@staticmethod
def stop_query(client_id: str) -> None:
query = db.session.query(Query).filter_by(client_id=client_id).one_or_none()
Expand Down
8 changes: 8 additions & 0 deletions superset/queries/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@
},
}

queries_get_updated_since_schema = {
"type": "object",
"properties": {
"last_updated_ms": {"type": "number"},
},
"required": ["last_updated_ms"],
}


class DatabaseSchema(Schema):
database_name = fields.String()
Expand Down
1 change: 1 addition & 0 deletions superset/views/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2485,6 +2485,7 @@ def fetch_datasource_metadata(self) -> FlaskResponse: # pylint: disable=no-self
@event_logger.log_this
@expose("/queries/<float:last_updated_ms>")
@expose("/queries/<int:last_updated_ms>")
@deprecated()
def queries(self, last_updated_ms: Union[float, int]) -> FlaskResponse:
"""
Get the updated queries.
Expand Down
57 changes: 56 additions & 1 deletion tests/integration_tests/queries/api_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def insert_query(
rows: int = 100,
tab_name: str = "",
status: str = "success",
changed_on: datetime = datetime(2020, 1, 1),
) -> Query:
database = db.session.query(Database).get(database_id)
user = db.session.query(security_manager.user_model).get(user_id)
Expand All @@ -67,7 +68,7 @@ def insert_query(
rows=rows,
tab_name=tab_name,
status=status,
changed_on=datetime(2020, 1, 1),
changed_on=changed_on,
)
db.session.add(query)
db.session.commit()
Expand Down Expand Up @@ -394,6 +395,60 @@ def test_get_list_query_no_data_access(self):
db.session.delete(query)
db.session.commit()

def test_get_updated_since(self):
"""
Query API: Test get queries updated since timestamp
"""
now = datetime.utcnow()
client_id = self.get_random_string()

admin = self.get_user("admin")
example_db = get_example_database()

old_query = self.insert_query(
example_db.id,
admin.id,
self.get_random_string(),
sql="SELECT col1, col2 from table1",
select_sql="SELECT col1, col2 from table1",
executed_sql="SELECT col1, col2 from table1 LIMIT 100",
changed_on=now - timedelta(days=3),
)
updated_query = self.insert_query(
example_db.id,
admin.id,
client_id,
sql="SELECT col1, col2 from table1",
select_sql="SELECT col1, col2 from table1",
executed_sql="SELECT col1, col2 from table1 LIMIT 100",
changed_on=now - timedelta(days=1),
)

self.login(username="admin")
timestamp = datetime.timestamp(now - timedelta(days=2)) * 1000
uri = f"api/v1/query/updated_since?q={prison.dumps({'last_updated_ms': timestamp})}"
rv = self.client.get(uri)
self.assertEqual(rv.status_code, 200)

expected_result = updated_query.to_dict()
data = json.loads(rv.data.decode("utf-8"))
self.assertEqual(len(data["result"]), 1)
for key, value in data["result"][0].items():
# We can't assert timestamp
if key not in (
"changedOn",
"changed_on",
"end_time",
"start_running_time",
"start_time",
"id",
):
self.assertEqual(value, expected_result[key])
# rollback changes
db.session.delete(old_query)
db.session.delete(updated_query)
db.session.commit()

@mock.patch("superset.sql_lab.cancel_query")
@mock.patch("superset.views.core.db.session")
def test_stop_query_not_found(
Expand Down
56 changes: 56 additions & 0 deletions tests/unit_tests/dao/queries_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
import json
from datetime import datetime, timedelta
from typing import Any, Iterator

import pytest
Expand Down Expand Up @@ -58,6 +59,61 @@ def test_query_dao_save_metadata(session: Session) -> None:
assert query.extra.get("columns", None) == []


def test_query_dao_get_queries_changed_after(session: Session) -> None:
from superset.models.core import Database
from superset.models.sql_lab import Query

engine = session.get_bind()
Query.metadata.create_all(engine) # pylint: disable=no-member

db = Database(database_name="my_database", sqlalchemy_uri="sqlite://")

now = datetime.utcnow()

old_query_obj = Query(
client_id="foo",
database=db,
tab_name="test_tab",
sql_editor_id="test_editor_id",
sql="select * from bar",
select_sql="select * from bar",
executed_sql="select * from bar",
limit=100,
select_as_cta=False,
rows=100,
error_message="none",
results_key="abc",
changed_on=now - timedelta(days=3),
)

updated_query_obj = Query(
client_id="updated_foo",
database=db,
tab_name="test_tab",
sql_editor_id="test_editor_id",
sql="select * from foo",
select_sql="select * from foo",
executed_sql="select * from foo",
limit=100,
select_as_cta=False,
rows=100,
error_message="none",
results_key="abc",
changed_on=now - timedelta(days=1),
)

session.add(db)
session.add(old_query_obj)
session.add(updated_query_obj)

from superset.queries.dao import QueryDAO

timestamp = datetime.timestamp(now - timedelta(days=2)) * 1000
result = QueryDAO.get_queries_changed_after(timestamp)
assert len(result) == 1
assert result[0].client_id == "updated_foo"


def test_query_dao_stop_query_not_found(
mocker: MockFixture, app: Any, session: Session
) -> None:
Expand Down

0 comments on commit 14878a1

Please sign in to comment.