+
{
+ if (o.id === id) {
+ obj = o;
+ }
+ });
+ return obj;
+}
+
function addToArr(state, arrKey, obj) {
const newObj = Object.assign({}, obj);
if (!newObj.id) {
@@ -87,9 +97,16 @@ export const sqlLabReducer = function (state, action) {
let newState = removeFromArr(state, 'queryEditors', action.queryEditor);
// List of remaining queryEditor ids
const qeIds = newState.queryEditors.map((qe) => qe.id);
- let th = state.tabHistory.slice();
- th = th.filter((id) => qeIds.includes(id));
- newState = Object.assign({}, newState, { tabHistory: th });
+ const queries = {};
+ Object.keys(state.queries).forEach((k) => {
+ const query = state.queries[k];
+ if (qeIds.includes(query.sqlEditorId)) {
+ queries[k] = query;
+ }
+ });
+ let tabHistory = state.tabHistory.slice();
+ tabHistory = tabHistory.filter((id) => qeIds.includes(id));
+ newState = Object.assign({}, newState, { tabHistory, queries });
return newState;
},
[actions.REMOVE_QUERY]() {
@@ -113,7 +130,14 @@ export const sqlLabReducer = function (state, action) {
return removeFromArr(state, 'tables', action.table);
},
[actions.START_QUERY]() {
- const newState = addToObject(state, 'queries', action.query);
+ const qe = getFromArr(state.queryEditors, action.query.sqlEditorId);
+ let newState = Object.assign({}, state);
+ if (qe.latestQueryId) {
+ const q = Object.assign({}, state.queries[qe.latestQueryId], { results: null });
+ const queries = Object.assign({}, state.queries, { [q.id]: q });
+ newState = Object.assign({}, state, { queries });
+ }
+ newState = addToObject(newState, 'queries', action.query);
const sqlEditor = { id: action.query.sqlEditorId };
return alterInArr(newState, 'queryEditors', sqlEditor, { latestQueryId: action.query.id });
},
@@ -121,12 +145,16 @@ export const sqlLabReducer = function (state, action) {
return alterInObject(state, 'queries', action.query, { state: 'stopped' });
},
[actions.QUERY_SUCCESS]() {
+ let rows;
+ if (action.results.data) {
+ rows = action.results.data.length;
+ }
const alts = {
- state: 'success',
- results: action.results,
- rows: action.results.data.length,
- progress: 100,
endDttm: now(),
+ progress: 100,
+ results: action.results,
+ rows,
+ state: 'success',
};
return alterInObject(state, 'queries', action.query, alts);
},
@@ -158,12 +186,6 @@ export const sqlLabReducer = function (state, action) {
[actions.QUERY_EDITOR_SET_AUTORUN]() {
return alterInArr(state, 'queryEditors', action.queryEditor, { autorun: action.autorun });
},
- [actions.ADD_WORKSPACE_QUERY]() {
- return addToArr(state, 'workspaceQueries', action.query);
- },
- [actions.REMOVE_WORKSPACE_QUERY]() {
- return removeFromArr(state, 'workspaceQueries', action.query);
- },
[actions.ADD_ALERT]() {
return addToArr(state, 'alerts', action.alert);
},
diff --git a/caravel/assets/visualizations/nvd3_vis.css b/caravel/assets/visualizations/nvd3_vis.css
index d92440e7ef7cc..20dbd65a74ca6 100644
--- a/caravel/assets/visualizations/nvd3_vis.css
+++ b/caravel/assets/visualizations/nvd3_vis.css
@@ -8,7 +8,6 @@ g.caravel path {
}
text.nv-axislabel {
- // font-weight: bold;
font-size: 14px;
}
diff --git a/caravel/config.py b/caravel/config.py
index 4856c07fa72c1..46eb55936f8e5 100644
--- a/caravel/config.py
+++ b/caravel/config.py
@@ -213,6 +213,9 @@ class CeleryConfig(object):
# The db id here results in selecting this one as a default in SQL Lab
DEFAULT_DB_ID = None
+# Timeout duration for SQL Lab synchronous queries
+SQLLAB_TIMEOUT = 30
+
try:
from caravel_config import * # noqa
except ImportError:
diff --git a/caravel/migrations/versions/4500485bde7d_allow_run_sync_async.py b/caravel/migrations/versions/4500485bde7d_allow_run_sync_async.py
new file mode 100644
index 0000000000000..0695e2cda59b6
--- /dev/null
+++ b/caravel/migrations/versions/4500485bde7d_allow_run_sync_async.py
@@ -0,0 +1,28 @@
+"""allow_run_sync_async
+
+Revision ID: 4500485bde7d
+Revises: 41f6a59a61f2
+Create Date: 2016-09-12 23:33:14.789632
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '4500485bde7d'
+down_revision = '41f6a59a61f2'
+
+from alembic import op
+import sqlalchemy as sa
+
+
+def upgrade():
+ op.add_column('dbs', sa.Column('allow_run_async', sa.Boolean(), nullable=True))
+ op.add_column('dbs', sa.Column('allow_run_sync', sa.Boolean(), nullable=True))
+
+
+def downgrade():
+ try:
+ op.drop_column('dbs', 'allow_run_sync')
+ op.drop_column('dbs', 'allow_run_async')
+ except Exception:
+ pass
+
diff --git a/caravel/migrations/versions/65903709c321_allow_dml.py b/caravel/migrations/versions/65903709c321_allow_dml.py
new file mode 100644
index 0000000000000..d14c6a98235cd
--- /dev/null
+++ b/caravel/migrations/versions/65903709c321_allow_dml.py
@@ -0,0 +1,26 @@
+"""allow_dml
+
+Revision ID: 65903709c321
+Revises: 4500485bde7d
+Create Date: 2016-09-15 08:48:27.284752
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '65903709c321'
+down_revision = '4500485bde7d'
+
+from alembic import op
+import sqlalchemy as sa
+
+
+def upgrade():
+ op.add_column('dbs', sa.Column('allow_dml', sa.Boolean(), nullable=True))
+
+
+def downgrade():
+ try:
+ op.drop_column('dbs', 'allow_dml')
+ except Exception as e:
+ logging.exception(e)
+ pass
diff --git a/caravel/models.py b/caravel/models.py
index 126d19d33a868..e885c95ff1caf 100644
--- a/caravel/models.py
+++ b/caravel/models.py
@@ -416,7 +416,10 @@ class Database(Model, AuditMixinNullable):
cache_timeout = Column(Integer)
select_as_create_table_as = Column(Boolean, default=False)
expose_in_sqllab = Column(Boolean, default=False)
+ allow_run_sync = Column(Boolean, default=True)
+ allow_run_async = Column(Boolean, default=False)
allow_ctas = Column(Boolean, default=False)
+ allow_dml = Column(Boolean, default=False)
force_ctas_schema = Column(String(250))
extra = Column(Text, default=textwrap.dedent("""\
{
diff --git a/caravel/sql_lab.py b/caravel/sql_lab.py
index b1d64a9c09079..13009f5634978 100644
--- a/caravel/sql_lab.py
+++ b/caravel/sql_lab.py
@@ -2,8 +2,10 @@
from datetime import datetime
import pandas as pd
import logging
-from caravel import app, db, models, utils
import time
+import json
+
+from caravel import app, cache, db, models, utils
QueryStatus = models.QueryStatus
@@ -44,92 +46,106 @@ def create_table_as(sql, table_name, schema=None, override=False):
@celery_app.task
def get_sql_results(query_id, return_results=True):
"""Executes the sql query returns the results."""
- db.session.commit() # HACK
- query = db.session.query(models.Query).filter_by(id=query_id).one()
+ session = db.session()
+ session.commit() # HACK
+ query = session.query(models.Query).filter_by(id=query_id).one()
database = query.database
executed_sql = query.sql.strip().strip(';')
+
+ def handle_error(msg):
+ """Local method handling error while processing the SQL"""
+ query.error_message = msg
+ query.status = QueryStatus.FAILED
+ query.tmp_table_name = None
+ session.commit()
+ raise Exception(query.error_message)
+
# Limit enforced only for retrieving the data, not for the CTA queries.
- if is_query_select(executed_sql):
- if query.select_as_cta:
- if not query.tmp_table_name:
- start_dttm = datetime.fromtimestamp(query.start_time)
- query.tmp_table_name = 'tmp_{}_table_{}'.format(
- query.user_id,
- start_dttm.strftime('%Y_%m_%d_%H_%M_%S'))
- executed_sql = create_table_as(
- executed_sql, query.tmp_table_name, database.force_ctas_schema)
- query.select_as_cta_used = True
- elif query.limit:
- executed_sql = database.wrap_sql_limit(executed_sql, query.limit)
- query.limit_used = True
- engine = database.get_sqla_engine(schema=query.schema)
- try:
- query.executed_sql = executed_sql
- logging.info("Running query: \n{}".format(executed_sql))
- result_proxy = engine.execute(query.executed_sql, schema=query.schema)
- except Exception as e:
- logging.exception(e)
- query.error_message = utils.error_msg_from_exception(e)
- query.status = QueryStatus.FAILED
- query.tmp_table_name = None
- db.session.commit()
- raise Exception(query.error_message)
-
- cursor = result_proxy.cursor
- query.status = QueryStatus.RUNNING
- db.session.flush()
- if database.backend == 'presto':
+ is_select = is_query_select(executed_sql);
+ if not is_select and not database.allow_dml:
+ handle_error(
+ "Only `SELECT` statements are allowed against this database")
+ if query.select_as_cta:
+ if not is_select:
+ handle_error(
+ "Only `SELECT` statements can be used with the CREATE TABLE "
+ "feature.")
+ if not query.tmp_table_name:
+ start_dttm = datetime.fromtimestamp(query.start_time)
+ query.tmp_table_name = 'tmp_{}_table_{}'.format(
+ query.user_id,
+ start_dttm.strftime('%Y_%m_%d_%H_%M_%S'))
+ executed_sql = create_table_as(
+ executed_sql, query.tmp_table_name, database.force_ctas_schema)
+ query.select_as_cta_used = True
+ elif query.limit and is_select:
+ executed_sql = database.wrap_sql_limit(executed_sql, query.limit)
+ query.limit_used = True
+ engine = database.get_sqla_engine(schema=query.schema)
+ try:
+ query.executed_sql = executed_sql
+ logging.info("Running query: \n{}".format(executed_sql))
+ result_proxy = engine.execute(query.executed_sql, schema=query.schema)
+ except Exception as e:
+ logging.exception(e)
+ handle_error(utils.error_msg_from_exception(e))
+
+ cursor = result_proxy.cursor
+ query.status = QueryStatus.RUNNING
+ session.flush()
+ if database.backend == 'presto':
+ polled = cursor.poll()
+ # poll returns dict -- JSON status information or ``None``
+ # if the query is done
+ # https://github.com/dropbox/PyHive/blob/
+ # b34bdbf51378b3979eaf5eca9e956f06ddc36ca0/pyhive/presto.py#L178
+ while polled:
+ # Update the object and wait for the kill signal.
+ stats = polled.get('stats', {})
+ if stats:
+ completed_splits = float(stats.get('completedSplits'))
+ total_splits = float(stats.get('totalSplits'))
+ if total_splits and completed_splits:
+ progress = 100 * (completed_splits / total_splits)
+ if progress > query.progress:
+ query.progress = progress
+ session.commit()
+ time.sleep(1)
polled = cursor.poll()
- # poll returns dict -- JSON status information or ``None``
- # if the query is done
- # https://github.com/dropbox/PyHive/blob/
- # b34bdbf51378b3979eaf5eca9e956f06ddc36ca0/pyhive/presto.py#L178
- while polled:
- # Update the object and wait for the kill signal.
- stats = polled.get('stats', {})
- if stats:
- completed_splits = float(stats.get('completedSplits'))
- total_splits = float(stats.get('totalSplits'))
- if total_splits and completed_splits:
- progress = 100 * (completed_splits / total_splits)
- if progress > query.progress:
- query.progress = progress
- db.session.commit()
- time.sleep(1)
- polled = cursor.poll()
-
- columns = None
- data = None
- if result_proxy.cursor:
- columns = [col[0] for col in result_proxy.cursor.description]
- data = result_proxy.fetchall()
- df = pd.DataFrame(data, columns=columns)
- df = df.where((pd.notnull(df)), None)
- # TODO consider generating tuples instead of dicts to send
- # less data through the wire. The command bellow does that,
- # but we'd need to align on the client side.
- # data = df.values.tolist()
- data = df.to_dict(orient='records')
-
- query.rows = result_proxy.rowcount
- query.progress = 100
- query.status = QueryStatus.SUCCESS
- if query.rows == -1 and data:
- # Presto doesn't provide result_proxy.row_count
- query.rows = len(data)
-
- # CTAs queries result in 1 cell having the # of the added rows.
- if query.select_as_cta:
- query.select_sql = '{}'.format(database.select_star(
- query.tmp_table_name, limit=query.limit))
-
- query.end_time = utils.now_as_float()
- db.session.commit()
+
+ columns = None
+ data = None
+ if result_proxy.cursor:
+ columns = [col[0] for col in result_proxy.cursor.description]
+ data = result_proxy.fetchall()
+ df = pd.DataFrame(data, columns=columns)
+ df = df.where((pd.notnull(df)), None)
+ # TODO consider generating tuples instead of dicts to send
+ # less data through the wire. The command bellow does that,
+ # but we'd need to align on the client side.
+ # data = df.values.tolist()
+ data = df.to_dict(orient='records')
+
+ query.rows = result_proxy.rowcount
+ query.progress = 100
+ query.status = QueryStatus.SUCCESS
+ if query.rows == -1 and data:
+ # Presto doesn't provide result_proxy.row_count
+ query.rows = len(data)
+
+ # CTAs queries result in 1 cell having the # of the added rows.
+ if query.select_as_cta:
+ query.select_sql = '{}'.format(database.select_star(
+ query.tmp_table_name, limit=query.limit))
+
+ query.end_time = utils.now_as_float()
+ session.commit()
payload = {
'query_id': query.id,
'status': query.status,
+ 'data': [],
}
if query.status == models.QueryStatus.SUCCESS:
payload['data'] = data
@@ -138,3 +154,9 @@ def get_sql_results(query_id, return_results=True):
payload['error'] = query.error_message
if return_results:
return payload
+ '''
+ # Hack testing using a kv store for results
+ key = "query_id={}".format(query.id)
+ logging.info("Storing results in key=[{}]".format(key))
+ cache.set(key, json.dumps(payload, default=utils.json_iso_dttm_ser))
+ '''
diff --git a/caravel/utils.py b/caravel/utils.py
index 04b0ee28a2642..0e65767e7ab6e 100644
--- a/caravel/utils.py
+++ b/caravel/utils.py
@@ -4,13 +4,14 @@
from __future__ import print_function
from __future__ import unicode_literals
+from builtins import object
from datetime import date, datetime
import decimal
import functools
import json
import logging
import numpy
-import time
+import signal
import uuid
import parsedatetime
@@ -30,6 +31,10 @@ class CaravelException(Exception):
pass
+class CaravelTimeoutException(Exception):
+ pass
+
+
class CaravelSecurityException(CaravelException):
pass
@@ -345,7 +350,7 @@ def datetime_to_epoch(dttm):
def now_as_float():
- return datetime_to_epoch(datetime.now())
+ return datetime_to_epoch(datetime.utcnow())
def json_int_dttm_ser(obj):
@@ -414,3 +419,31 @@ def generic_find_constraint_name(table, columns, referenced, db):
fk.referred_table.name == referenced and
set(fk.column_keys) == columns):
return fk.name
+
+
+class timeout(object):
+ """
+ To be used in a ``with`` block and timeout its content.
+ """
+ def __init__(self, seconds=1, error_message='Timeout'):
+ self.seconds = seconds
+ self.error_message = error_message
+
+ def handle_timeout(self, signum, frame):
+ logging.error("Process timed out")
+ raise CaravelTimeoutException(self.error_message)
+
+ def __enter__(self):
+ try:
+ signal.signal(signal.SIGALRM, self.handle_timeout)
+ signal.alarm(self.seconds)
+ except ValueError as e:
+ logging.warning("timeout can't be used in the current context")
+ logging.exception(e)
+
+ def __exit__(self, type, value, traceback):
+ try:
+ signal.alarm(0)
+ except ValueError as e:
+ logging.warning("timeout can't be used in the current context")
+ logging.exception(e)
diff --git a/caravel/views.py b/caravel/views.py
index c4fa7d8bc826b..acc48dead9848 100755
--- a/caravel/views.py
+++ b/caravel/views.py
@@ -33,7 +33,7 @@
import caravel
from caravel import (
- appbuilder, db, models, viz, utils, app, sm, ascii_art, sql_lab
+ appbuilder, cache, db, models, viz, utils, app, sm, ascii_art, sql_lab
)
config = app.config
@@ -456,7 +456,8 @@ class DatabaseView(CaravelModelView, DeleteMixin): # noqa
list_columns = ['database_name', 'creator', 'changed_on_']
add_columns = [
'database_name', 'sqlalchemy_uri', 'cache_timeout', 'extra',
- 'expose_in_sqllab', 'allow_ctas', 'force_ctas_schema']
+ 'expose_in_sqllab', 'allow_run_sync', 'allow_run_async',
+ 'allow_ctas', 'allow_dml', 'force_ctas_schema']
search_exclude_columns = ('password',)
edit_columns = add_columns
show_columns = [
@@ -479,7 +480,19 @@ class DatabaseView(CaravelModelView, DeleteMixin): # noqa
"to structure your URI here: "
"http://docs.sqlalchemy.org/en/rel_1_0/core/engines.html"),
'expose_in_sqllab': _("Expose this DB in SQL Lab"),
+ 'allow_run_sync': _(
+ "Allow users to run synchronous queries, this is the default "
+ "and should work well for queries that can be executed "
+ "within a web request scope (<~1 minute)"),
+ 'allow_run_async': _(
+ "Allow users to run queries, against an async backend. "
+ "This assumes that you have a Celery worker setup as well "
+ "as a results backend."),
'allow_ctas': _("Allow CREATE TABLE AS option in SQL Lab"),
+ 'allow_dml': _(
+ "Allow users to run non-SELECT statements "
+ "(UPDATE, DELETE, CREATE, ...) "
+ "in SQL Lab"),
'force_ctas_schema': _(
"When allowing CREATE TABLE AS option in SQL Lab, "
"this option forces the table to be created in this schema"),
@@ -496,6 +509,7 @@ class DatabaseView(CaravelModelView, DeleteMixin): # noqa
label_columns = {
'expose_in_sqllab': _("Expose in SQL Lab"),
'allow_ctas': _("Allow CREATE TABLE AS"),
+ 'allow_dml': _("Allow DML"),
'force_ctas_schema': _("CTAS Schema"),
'database_name': _("Database"),
'creator': _("Creator"),
@@ -525,7 +539,8 @@ def pre_update(self, db):
class DatabaseAsync(DatabaseView):
list_columns = [
'id', 'database_name',
- 'expose_in_sqllab', 'allow_ctas', 'force_ctas_schema'
+ 'expose_in_sqllab', 'allow_ctas', 'force_ctas_schema',
+ 'allow_run_async', 'allow_run_sync', 'allow_dml',
]
appbuilder.add_view_no_menu(DatabaseAsync)
@@ -1453,15 +1468,19 @@ def sqllab_viz(self):
table.sql = data.get('sql')
db.session.add(table)
cols = []
+ dims = []
metrics = []
for column_name, config in data.get('columns').items():
is_dim = config.get('is_dim', False)
- cols.append(models.TableColumn(
+ col = models.TableColumn(
column_name=column_name,
filterable=is_dim,
groupby=is_dim,
is_dttm=config.get('is_date', False),
- ))
+ )
+ cols.append(col)
+ if is_dim:
+ dims.append(col)
agg = config.get('agg')
if agg:
metrics.append(models.SqlMetric(
@@ -1476,8 +1495,15 @@ def sqllab_viz(self):
table.columns = cols
table.metrics = metrics
db.session.commit()
- url = '/caravel/explore/table/{table.id}/?viz_type={viz_type}'
- return redirect(url.format(**locals()))
+ params = {
+ 'viz_type': viz_type,
+ 'groupby': dims[0].column_name if dims else '',
+ 'metrics': metrics[0].metric_name if metrics else '',
+ 'metric': metrics[0].metric_name if metrics else '',
+ }
+ params = "&".join([k + '=' + v for k, v in params.items()])
+ url = '/caravel/explore/table/{table.id}/?{params}'.format(**locals())
+ return redirect(url)
@has_access
@expose("/sql//")
@@ -1558,12 +1584,22 @@ def select_star(self, database_id, table_name):
def theme(self):
return self.render_template('caravel/theme.html')
- @has_access
+ @has_access_api
+ @expose("/cached_key//")
+ @log_this
+ def cached_key(self, key):
+ """Returns a key from the cache"""
+ resp = cache.get(key)
+ if resp:
+ return resp
+ return "nope"
+
+ @has_access_api
@expose("/sql_json/", methods=['POST', 'GET'])
@log_this
def sql_json(self):
"""Runs arbitrary sql and returns and json"""
- async = request.form.get('async') == 'true'
+ async = request.form.get('runAsync') == 'true'
sql = request.form.get('sql')
database_id = request.form.get('database_id')
@@ -1600,7 +1636,7 @@ def sql_json(self):
# Async request.
if async:
# Ignore the celery future object and the request may time out.
- sql_lab.get_sql_results.delay(query_id)
+ sql_lab.get_sql_results.delay(query_id, return_results=False)
return Response(
json.dumps({'query': query.to_dict()},
default=utils.json_int_dttm_ser,
@@ -1610,7 +1646,15 @@ def sql_json(self):
# Sync request.
try:
- data = sql_lab.get_sql_results(query_id)
+ SQLLAB_TIMEOUT = config.get("SQLLAB_TIMEOUT")
+ with utils.timeout(
+ seconds=SQLLAB_TIMEOUT,
+ error_message=(
+ "The query exceeded the {SQLLAB_TIMEOUT} seconds "
+ "timeout. You may want to run your query as a "
+ "`CREATE TABLE AS` to prevent timeouts."
+ ).format(**locals())):
+ data = sql_lab.get_sql_results(query_id, return_results=True)
except Exception as e:
logging.exception(e)
return Response(
diff --git a/docs/installation.rst b/docs/installation.rst
index c70cc0082ead6..0d94baa7f5e39 100644
--- a/docs/installation.rst
+++ b/docs/installation.rst
@@ -294,6 +294,25 @@ Upgrading should be as straightforward as running::
pip install caravel --upgrade
caravel db upgrade
+SQL Lab
+-------
+SQL Lab is a powerful SQL IDE that works with all SQLAlchemy compatible
+databases out there. By default, queries are run in a web request, and
+may eventually timeout as queries exceed the maximum duration of a web
+request in your environment, whether it'd be a reverse proxy or the Caravel
+server itself.
+
+In the modern analytics world, it's not uncommon to run large queries that
+run for minutes or hours.
+To enable support for long running queries that
+execute beyond the typical web request's timeout (30-60 seconds), it is
+necessary to deploy an asynchronous backend, which consist of one or many
+Caravel worker, which is implemented as a Celery worker, and a Celery
+broker for which we recommend using Redis or RabbitMQ.
+
+It's also preferable to setup an async result backend as a key value store
+that can hold the long-running query results for a period of time. More
+details to come as to how to set this up here soon.
Making your own build
---------------------
diff --git a/tests/celery_tests.py b/tests/celery_tests.py
index e712a157e98f1..1cd3401c7e361 100644
--- a/tests/celery_tests.py
+++ b/tests/celery_tests.py
@@ -226,7 +226,8 @@ def test_run_async_query(self):
sql_where = "SELECT name FROM ab_role WHERE name='Admin'"
result1 = self.run_sql(
1, sql_where, async='true', tmp_table='tmp_async_1', cta='true')
- self.assertEqual(QueryStatus.PENDING, result1['query']['state'])
+ assert result1['query']['state'] in (
+ QueryStatus.PENDING, QueryStatus.RUNNING, QueryStatus.SUCCESS)
time.sleep(1)