From 28c0292920ee6748b7d83097c105e71776ab413f Mon Sep 17 00:00:00 2001 From: Maxime Date: Mon, 3 Aug 2015 15:34:58 +0000 Subject: [PATCH 1/6] Getting ready to gut panoramix --- app/models.py | 111 ++++++++++++++++++++++++++++++++++++++++++++------ app/views.py | 80 +++++++++++++++++++++++++++++++++++- 2 files changed, 177 insertions(+), 14 deletions(-) diff --git a/app/models.py b/app/models.py index 1a7630a0a3554..49eb370140cd5 100644 --- a/app/models.py +++ b/app/models.py @@ -3,6 +3,8 @@ from datetime import timedelta from flask.ext.appbuilder.models.mixins import AuditMixin, FileColumn from sqlalchemy import Column, Integer, String, ForeignKey, Text, Boolean, DateTime +from sqlalchemy import create_engine, MetaData +from sqlalchemy import Table as sqlaTable from sqlalchemy.orm import relationship from app import get_session from dateutil.parser import parse @@ -11,6 +13,102 @@ import json import requests +from app import db + +class Queryable(object): + @property + def column_names(self): + return sorted([c.column_name for c in self.columns]) + + @property + def groupby_column_names(self): + return sorted([c.column_name for c in self.columns if c.groupby]) + + @property + def filterable_column_names(self): + return sorted([c.column_name for c in self.columns if c.filterable]) + +class Database(Model, AuditMixin): + __tablename__ = 'databases' + id = Column(Integer, primary_key=True) + database_name = Column(String(256), unique=True) + sqlalchemy_uri = Column(String(1024)) + + def __repr__(self): + return self.database_name + + +class Table(Model, AuditMixin, Queryable): + __tablename__ = 'tables' + id = Column(Integer, primary_key=True) + table_name = Column(String(256), unique=True) + default_endpoint = Column(Text) + database_id = Column( + String(256), ForeignKey('databases.id')) + database = relationship( + 'Database', backref='tables', foreign_keys=[database_id]) + + @property + def table_link(self): + url = "/panoramix/table/{}/".format(self.id) + return '{self.table_name}'.format(**locals()) + + @property + def metrics_combo(self): + return sorted( + [ + ( + 'sum__{}'.format(m.column_name), + 'SUM({})'.format(m.column_name), + ) + for m in self.columns if m.sum], + key=lambda x: x[1]) + + def fetch_metadata(self): + engine = create_engine(self.database.sqlalchemy_uri) + meta = MetaData() + table = sqlaTable( + self.table_name, meta, autoload=True, autoload_with=engine) + TC = TableColumn + for col in table.columns: + dbcol = ( + db.session + .query(TC) + .filter(TC.table==self) + .filter(TC.column_name==col.name) + .first() + ) + db.session.flush() + if not dbcol: + dbcol = TableColumn(column_name=col.name) + if str(col.type) in ('VARCHAR', 'STRING'): + dbcol.groupby = True + dbcol.filterable = True + self.columns.append(dbcol) + + dbcol.type = str(col.type) + db.session.commit() + + +class TableColumn(Model, AuditMixin): + __tablename__ = 'table_columns' + id = Column(Integer, primary_key=True) + table_id = Column( + String(256), + ForeignKey('tables.id')) + table = relationship('Table', backref='columns') + column_name = Column(String(256)) + is_dttm = Column(Boolean, default=True) + is_active = Column(Boolean, default=True) + type = Column(String(32), default='') + groupby = Column(Boolean, default=False) + count_distinct = Column(Boolean, default=False) + sum = Column(Boolean, default=False) + max = Column(Boolean, default=False) + min = Column(Boolean, default=False) + filterable = Column(Boolean, default=False) + description = Column(Text, default='') + class Cluster(Model, AuditMixin): __tablename__ = 'clusters' @@ -46,7 +144,7 @@ def refresh_datasources(self): # logging.exception(e) # logging.error("Failed at syncing " + datasource) -class Datasource(Model, AuditMixin): +class Datasource(Model, AuditMixin, Queryable): __tablename__ = 'datasources' id = Column(Integer, primary_key=True) datasource_name = Column(String(256), unique=True) @@ -130,17 +228,6 @@ def sync_to_db(cls, name, cluster): col_obj.generate_metrics() #session.commit() - @property - def column_names(self): - return sorted([c.column_name for c in self.columns]) - - @property - def groupby_column_names(self): - return sorted([c.column_name for c in self.columns if c.groupby]) - - @property - def filterable_column_names(self): - return sorted([c.column_name for c in self.columns if c.filterable]) class Metric(Model): diff --git a/app/views.py b/app/views.py index 561802c1828b3..c3ab428a64543 100644 --- a/app/views.py +++ b/app/views.py @@ -27,6 +27,22 @@ def muldelete(self, items): return redirect(self.get_redirect()) +class TableColumnInlineView(CompactCRUDMixin, ModelView): + datamodel = SQLAInterface(models.TableColumn) + can_delete = False + edit_columns = [ + 'column_name', 'description', 'table', 'groupby', 'filterable', + 'count_distinct', 'sum', 'min', 'max'] + list_columns = [ + 'column_name', 'type', 'groupby', 'count_distinct', + 'sum', 'min', 'max'] + page_size = 100 + list_columns = [ + 'column_name', 'type', 'groupby', 'count_distinct', + 'sum', 'min', 'max'] +appbuilder.add_view_no_menu(TableColumnInlineView) + + class ColumnInlineView(CompactCRUDMixin, ModelView): datamodel = SQLAInterface(models.Column) edit_columns = [ @@ -80,6 +96,39 @@ class ClusterModelView(ModelView, DeleteMixin): category_icon='fa-cogs',) +class DatabaseView(ModelView, DeleteMixin): + datamodel = SQLAInterface(models.Database) + list_columns = ['database_name'] + add_columns = ['database_name', 'sqlalchemy_uri'] + edit_columns = add_columns + +appbuilder.add_view( + DatabaseView, + "Databases", + icon="fa-database", + category="Admin", + category_icon='fa-cogs',) + + +class TableView(ModelView, DeleteMixin): + datamodel = SQLAInterface(models.Table) + list_columns = ['table_link', 'database'] + add_columns = ['table_name', 'database'] + edit_columns = add_columns + related_views = [TableColumnInlineView] + + def post_insert(self, table): + table.fetch_metadata() + + def post_update(self, table): + table.fetch_metadata() + +appbuilder.add_view( + TableView, + "Tables", + icon='fa-table',) + + class DatasourceModelView(ModelView, DeleteMixin): datamodel = SQLAInterface(models.Datasource) list_columns = [ @@ -101,8 +150,7 @@ def post_update(self, datasource): appbuilder.add_view( DatasourceModelView, "Druid Datasources", - icon="fa-cube", - category_icon='fa-envelope') + icon="fa-cube") @app.route('/health') @@ -116,6 +164,34 @@ def ping(): class Panoramix(BaseView): + @has_access + @permission_name('tables') + @expose("/table//") + def table(self, table_id): + + table = ( + db.session + .query(models.Table) + .filter_by(id=table_id) + .first() + ) + viz_type = request.args.get("viz_type") + if not viz_type and table.default_endpoint: + return redirect(table.default_endpoint) + if not viz_type: + viz_type = "table" + obj = viz.viz_types[viz_type]( + table, + form_data=request.args, view=self) + if request.args.get("json"): + return Response( + json.dumps(obj.get_query(), indent=4), + status=200, + mimetype="application/json") + if obj.df is None or obj.df.empty: + return obj.render_no_data() + return obj.render() + @has_access @permission_name('datasources') @expose("/datasource//") From 2d192d1ae85eebb31f9e5350a8486d136ff9fa83 Mon Sep 17 00:00:00 2001 From: Maxime Date: Mon, 3 Aug 2015 20:37:56 +0000 Subject: [PATCH 2/6] Query obj --- app/viz.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/app/viz.py b/app/viz.py index 02212639565fb..7bde8a373ce35 100644 --- a/app/viz.py +++ b/app/viz.py @@ -15,6 +15,31 @@ 'target_div': 'chart', } +class BaseQuery(object): + def __init__( + self, groupby, metrics, filters, + is_timeseries, + timeseries_limit=15, row_limit=None): + self.groupby = groupby + self.metrics = metrics + self.filters = filters + self.is_timeseries = is_timeseries + self.timeseries_limit = timeseries_limit + self.row_limit = row_limit + + def run(self): + start = datetime.now() + self._execute() + self.duration = (datetime.now() - start).total_seconds() + + def _execution(self): + raise NotImplemented() + + def pandas_df(self): + raise NotImplemented() + + + class OmgWtForm(Form): field_order = ( 'viz_type', 'granularity', 'since', 'group_by', 'limit') From 34acc90f3fa22747e243f4750937cd70ab7aaf09 Mon Sep 17 00:00:00 2001 From: Maxime Date: Wed, 5 Aug 2015 06:41:00 +0000 Subject: [PATCH 3/6] ALL OF SQL is coming along nicely --- app/models.py | 136 +++++++++++++++++++++--- app/templates/panoramix/datasource.html | 2 +- app/views.py | 12 ++- app/viz.py | 73 ++++--------- 4 files changed, 150 insertions(+), 73 deletions(-) diff --git a/app/models.py b/app/models.py index 49eb370140cd5..cb7e04d3bf4a0 100644 --- a/app/models.py +++ b/app/models.py @@ -37,6 +37,16 @@ class Database(Model, AuditMixin): def __repr__(self): return self.database_name + def get_sqla_engine(self): + return create_engine(self.sqlalchemy_uri) + + def get_table(self): + meta = MetaData() + return sqlaTable( + self.table_name, meta, + autoload=True, + autoload_with=self.get_sqla_engine()) + class Table(Model, AuditMixin, Queryable): __tablename__ = 'tables' @@ -48,6 +58,10 @@ class Table(Model, AuditMixin, Queryable): database = relationship( 'Database', backref='tables', foreign_keys=[database_id]) + @property + def name(self): + return self.table_name + @property def table_link(self): url = "/panoramix/table/{}/".format(self.id) @@ -57,18 +71,62 @@ def table_link(self): def metrics_combo(self): return sorted( [ - ( - 'sum__{}'.format(m.column_name), - 'SUM({})'.format(m.column_name), - ) - for m in self.columns if m.sum], + (m.metric_name, m.verbose_name) + for m in self.metrics], key=lambda x: x[1]) + def query( + self, groupby, metrics, + granularity, + from_dttm, to_dttm, + limit_spec=None, + filter=None, + is_timeseries=True, + timeseries_limit=15, row_limit=None): + from pandas import read_sql_query + metrics_exprs = [ + "{} AS {}".format(m.expression, m.metric_name) + for m in self.metrics if m.metric_name in metrics] + from_dttm_iso = from_dttm.isoformat() + to_dttm_iso = to_dttm.isoformat() + + select_exprs = [] + groupby_exprs = [] + + if groupby: + select_exprs = groupby + groupby_exprs = [s for s in groupby] + select_exprs += metrics_exprs + if granularity != "all": + select_exprs += ['ds as timestamp'] + groupby_exprs += ['ds'] + + select_exprs = ",\n".join(select_exprs) + groupby_exprs = ",\n".join(groupby_exprs) + + where_clause = [ + "ds >= '{from_dttm_iso}'", + "ds < '{to_dttm_iso}'" + ] + where_clause = " AND\n".join(where_clause).format(**locals()) + sql = """ + SELECT + {select_exprs} + FROM {self.table_name} + WHERE + {where_clause} + GROUP BY + {groupby_exprs} + """.format(**locals()) + df = read_sql_query( + sql=sql, + con=self.database.get_sqla_engine() + ) + return df + + def fetch_metadata(self): - engine = create_engine(self.database.sqlalchemy_uri) - meta = MetaData() - table = sqlaTable( - self.table_name, meta, autoload=True, autoload_with=engine) + table = self.database.get_table(self.table_name) TC = TableColumn for col in table.columns: dbcol = ( @@ -90,13 +148,28 @@ def fetch_metadata(self): db.session.commit() +class SqlMetric(Model): + __tablename__ = 'sql_metrics' + id = Column(Integer, primary_key=True) + metric_name = Column(String(512)) + verbose_name = Column(String(1024)) + metric_type = Column(String(32)) + table_id = Column( + String(256), + ForeignKey('tables.id')) + table = relationship( + 'Table', backref='metrics', foreign_keys=[table_id]) + expression = Column(Text) + description = Column(Text) + + class TableColumn(Model, AuditMixin): __tablename__ = 'table_columns' id = Column(Integer, primary_key=True) table_id = Column( String(256), ForeignKey('tables.id')) - table = relationship('Table', backref='columns') + table = relationship('Table', backref='columns', foreign_keys=[table_id]) column_name = Column(String(256)) is_dttm = Column(Boolean, default=True) is_active = Column(Boolean, default=True) @@ -138,11 +211,8 @@ def refresh_datasources(self): ).format(self=self) datasources = json.loads(requests.get(endpoint).text) for datasource in datasources: - #try: - Datasource.sync_to_db(datasource, self) - #except Exception as e: - # logging.exception(e) - # logging.error("Failed at syncing " + datasource) + Datasource.sync_to_db(datasource, self) + class Datasource(Model, AuditMixin, Queryable): __tablename__ = 'datasources' @@ -165,6 +235,10 @@ def metrics_combo(self): [(m.metric_name, m.verbose_name) for m in self.metrics], key=lambda x: x[1]) + @property + def name(self): + return self.datasource_name + def __repr__(self): return self.datasource_name @@ -227,7 +301,37 @@ def sync_to_db(cls, name, cluster): col_obj.datasource = datasource col_obj.generate_metrics() #session.commit() - + def query( + self, groupby, metrics, + granularity, + from_dttm, to_dttm, + limit_spec=None, + filter=None, + is_timeseries=True, + timeseries_limit=15, row_limit=None): + + aggregations = { + m.metric_name: m.json_obj + for m in self.metrics if m.metric_name in metrics + } + if not isinstance(granularity, basestring): + granularity = {"type": "duration", "duration": granularity} + + qry = dict( + datasource=self.datasource_name, + dimensions=groupby, + aggregations=aggregations, + granularity=granularity, + intervals= from_dttm.isoformat() + '/' + to_dttm.isoformat(), + ) + if filter: + qry['filter'] = filter + if limit_spec: + qry['limit_spec'] = limit_spec + client = self.cluster.get_pydruid_client() + client.groupby(**qry) + df = client.export_pandas() + return df class Metric(Model): diff --git a/app/templates/panoramix/datasource.html b/app/templates/panoramix/datasource.html index 0a53cc6543ad0..599d80efb4253 100644 --- a/app/templates/panoramix/datasource.html +++ b/app/templates/panoramix/datasource.html @@ -22,7 +22,7 @@

- {{ datasource.datasource_name }} + {{ datasource.name }} {% if datasource.description %} {% endif %} diff --git a/app/views.py b/app/views.py index c3ab428a64543..7f6693814570d 100644 --- a/app/views.py +++ b/app/views.py @@ -62,6 +62,16 @@ def post_update(self, col): appbuilder.add_view_no_menu(ColumnInlineView) +class SqlMetricInlineView(CompactCRUDMixin, ModelView): + datamodel = SQLAInterface(models.SqlMetric) + list_columns = ['metric_name', 'verbose_name', 'metric_type' ] + edit_columns = [ + 'metric_name', 'description', 'verbose_name', 'metric_type', + 'table', 'expression'] + add_columns = edit_columns + page_size = 100 +appbuilder.add_view_no_menu(SqlMetricInlineView) + class MetricInlineView(CompactCRUDMixin, ModelView): datamodel = SQLAInterface(models.Metric) @@ -115,7 +125,7 @@ class TableView(ModelView, DeleteMixin): list_columns = ['table_link', 'database'] add_columns = ['table_name', 'database'] edit_columns = add_columns - related_views = [TableColumnInlineView] + related_views = [TableColumnInlineView, SqlMetricInlineView] def post_insert(self, table): table.fetch_metadata() diff --git a/app/viz.py b/app/viz.py index 7bde8a373ce35..2bcfcea554c9b 100644 --- a/app/viz.py +++ b/app/viz.py @@ -15,30 +15,6 @@ 'target_div': 'chart', } -class BaseQuery(object): - def __init__( - self, groupby, metrics, filters, - is_timeseries, - timeseries_limit=15, row_limit=None): - self.groupby = groupby - self.metrics = metrics - self.filters = filters - self.is_timeseries = is_timeseries - self.timeseries_limit = timeseries_limit - self.row_limit = row_limit - - def run(self): - start = datetime.now() - self._execute() - self.duration = (datetime.now() - start).total_seconds() - - def _execution(self): - raise NotImplemented() - - def pandas_df(self): - raise NotImplemented() - - class OmgWtForm(Form): field_order = ( @@ -146,39 +122,39 @@ def query_filters(self): filters = cond return filters + def bake_query(self): + return self.datasource.query(**self.query_obj()) + def query_obj(self): ds = self.datasource args = self.form_data groupby = args.getlist("groupby") or [] + metrics = args.getlist("metrics") or ['count'] granularity = args.get("granularity", "1 day") - granularity = utils.parse_human_timedelta(granularity).total_seconds() * 1000 - aggregations = { - m.metric_name: m.json_obj - for m in ds.metrics if m.metric_name in self.metrics - } + granularity = utils.parse_human_timedelta( + granularity).total_seconds() * 1000 limit = int( args.get("limit", config.ROW_LIMIT)) or config.ROW_LIMIT since = args.get("since", "1 year ago") from_dttm = utils.parse_human_datetime(since) if from_dttm > datetime.now(): from_dttm = datetime.now() - (from_dttm-datetime.now()) - from_dttm = from_dttm.isoformat() until = args.get("until", "now") - to_dttm = utils.parse_human_datetime(until).isoformat() + to_dttm = utils.parse_human_datetime(until) if from_dttm >= to_dttm: flash("The date range doesn't seem right.", "danger") from_dttm = to_dttm # Making them identicial to not raise d = { - 'datasource': ds.datasource_name, - 'granularity': {"type": "duration", "duration": granularity}, - 'intervals': from_dttm + '/' + to_dttm, - 'dimensions': groupby, - 'aggregations': aggregations, + 'granularity': granularity, + 'from_dttm': from_dttm, + 'to_dttm': to_dttm, + 'groupby': groupby, + 'metrics': metrics, 'limit_spec': { "type": "default", "limit": limit, "columns": [{ - "dimension": self.metrics[0], + "dimension": metrics[0] if metrics else self.metrics[0], "direction": "descending", }], }, @@ -188,17 +164,7 @@ def query_obj(self): d['filter'] = filters return d - def bake_query(self): - client = self.datasource.cluster.get_pydruid_client() - client.groupby(**self.query_obj()) - return client.export_pandas() - - def get_query(self): - client = self.datasource.cluster.get_pydruid_client() - client.groupby(**self.query_obj()) - return client.query_dict - - def df_prep(self, ): + def df_prep(self): pass def form_prep(self): @@ -290,14 +256,12 @@ def bake_query(self): """ Doing a 2 phase query where we limit the number of series. """ - client = self.datasource.cluster.get_pydruid_client() qry = self.query_obj() orig_filter = qry['filter'] if 'filter' in qry else '' qry['granularity'] = "all" - client.groupby(**qry) - df = client.export_pandas() + df = self.datasource.query(**qry) if not df is None: - dims = qry['dimensions'] + dims = qry['groupby'] filters = [] for index, row in df.iterrows(): fields = [] @@ -318,9 +282,8 @@ def bake_query(self): qry['filter'] = Filter(type="and", fields=[ Filter.build_filter(ff), Filter.build_filter(orig_filter)]) - del qry['limit_spec'] - client.groupby(**qry) - return client.export_pandas() + qry['limit_spec'] = None + return self.datasource.query(**qry) class TimeSeriesCompareViz(TimeSeriesViz): verbose_name = "Time Series - Percent Change" From f39b24175b4463e80576a0b393e4bf3442bd7695 Mon Sep 17 00:00:00 2001 From: Maxime Date: Thu, 6 Aug 2015 00:36:33 +0000 Subject: [PATCH 4/6] About to mess with filters --- app/models.py | 41 +++++++++++++++++++++++++++++++++++++---- app/viz.py | 28 +--------------------------- 2 files changed, 38 insertions(+), 31 deletions(-) diff --git a/app/models.py b/app/models.py index cb7e04d3bf4a0..7cde14a588a66 100644 --- a/app/models.py +++ b/app/models.py @@ -1,19 +1,20 @@ from flask.ext.appbuilder import Model -from pydruid import client from datetime import timedelta -from flask.ext.appbuilder.models.mixins import AuditMixin, FileColumn +from flask.ext.appbuilder.models.mixins import AuditMixin from sqlalchemy import Column, Integer, String, ForeignKey, Text, Boolean, DateTime from sqlalchemy import create_engine, MetaData from sqlalchemy import Table as sqlaTable from sqlalchemy.orm import relationship -from app import get_session from dateutil.parser import parse +from pydruid import client +from pydruid.utils.filters import Dimension, Filter +from copy import deepcopy import logging import json import requests -from app import db +from app import db, get_session class Queryable(object): @property @@ -301,6 +302,7 @@ def sync_to_db(cls, name, cluster): col_obj.datasource = datasource col_obj.generate_metrics() #session.commit() + def query( self, groupby, metrics, granularity, @@ -328,7 +330,38 @@ def query( qry['filter'] = filter if limit_spec: qry['limit_spec'] = limit_spec + client = self.cluster.get_pydruid_client() + if timeseries_limit: + # Limit on the number of timeseries, doing a two-phases query + pre_qry = deepcopy(qry) + pre_qry['granularity'] = "all" + client.groupby(**qry) + df = client.export_pandas() + if not df is None and not df.empty: + dims = qry['dimensions'] + filters = [] + for index, row in df.iterrows(): + fields = [] + for dim in dims: + f = Filter.build_filter(Dimension(dim) == row[dim]) + fields.append(f) + if len(fields) > 1: + filt = Filter(type="and", fields=fields) + filters.append(Filter.build_filter(filt)) + elif fields: + filters.append(fields[0]) + + if filters: + ff = Filter(type="or", fields=filters) + if not filter: + qry['filter'] = ff + else: + qry['filter'] = Filter(type="and", fields=[ + Filter.build_filter(ff), + Filter.build_filter(filter)]) + qry['limit_spec'] = None + client.groupby(**qry) df = client.export_pandas() return df diff --git a/app/viz.py b/app/viz.py index 2bcfcea554c9b..3ccc3fbf70b61 100644 --- a/app/viz.py +++ b/app/viz.py @@ -1,4 +1,3 @@ -from pydruid.utils.filters import Dimension, Filter from datetime import datetime from flask import flash, request import pandas as pd @@ -7,6 +6,7 @@ from app.highchart import Highchart from wtforms import Form, SelectMultipleField, SelectField, TextField import config +from pydruid.utils.filters import Dimension, Filter CHART_ARGS = { @@ -257,32 +257,6 @@ def bake_query(self): Doing a 2 phase query where we limit the number of series. """ qry = self.query_obj() - orig_filter = qry['filter'] if 'filter' in qry else '' - qry['granularity'] = "all" - df = self.datasource.query(**qry) - if not df is None: - dims = qry['groupby'] - filters = [] - for index, row in df.iterrows(): - fields = [] - for dim in dims: - f = Filter.build_filter(Dimension(dim) == row[dim]) - fields.append(f) - if len(fields) > 1: - filters.append(Filter.build_filter(Filter(type="and", fields=fields))) - elif fields: - filters.append(fields[0]) - - qry = self.query_obj() - if filters: - ff = Filter(type="or", fields=filters) - if not orig_filter: - qry['filter'] = ff - else: - qry['filter'] = Filter(type="and", fields=[ - Filter.build_filter(ff), - Filter.build_filter(orig_filter)]) - qry['limit_spec'] = None return self.datasource.query(**qry) class TimeSeriesCompareViz(TimeSeriesViz): From b70b270b1aa85512eb51c2aa7249e7858257c6bc Mon Sep 17 00:00:00 2001 From: Maxime Date: Thu, 6 Aug 2015 05:42:42 +0000 Subject: [PATCH 5/6] Pivoting into sqla instead of sql --- app/models.py | 71 ++++++++++++++++++++++++++++++++++++++++++++------- app/views.py | 2 +- app/viz.py | 40 +++-------------------------- 3 files changed, 67 insertions(+), 46 deletions(-) diff --git a/app/models.py b/app/models.py index 7cde14a588a66..95176ba80f56c 100644 --- a/app/models.py +++ b/app/models.py @@ -41,10 +41,10 @@ def __repr__(self): def get_sqla_engine(self): return create_engine(self.sqlalchemy_uri) - def get_table(self): + def get_table(self, table_name): meta = MetaData() return sqlaTable( - self.table_name, meta, + table_name, meta, autoload=True, autoload_with=self.get_sqla_engine()) @@ -109,7 +109,26 @@ def query( "ds >= '{from_dttm_iso}'", "ds < '{to_dttm_iso}'" ] + for col, op, eq in filter: + if op in ('in', 'not in'): + l = ["'{}'".format(s) for s in eq.split(",")] + l = ", ".join(l) + op = op.upper() + where_clause.append( + "{col} {op} ({l})".format(**locals()) + ) where_clause = " AND\n".join(where_clause).format(**locals()) + if timeseries_limit: + limiting_join = """ + JOIN ( + SELECT {groupby_exprs} + FROM {self.table_name} + GROUP BY {groupby_exprs} + ORDER BY {metric} DESC + LIMIT {timeseries_limit} + ) z ON + """ + sql = """ SELECT {select_exprs} @@ -326,17 +345,51 @@ def query( granularity=granularity, intervals= from_dttm.isoformat() + '/' + to_dttm.isoformat(), ) - if filter: - qry['filter'] = filter - if limit_spec: - qry['limit_spec'] = limit_spec + filters = None + for col, op, eq in filter: + cond = None + if op == '==': + cond = Dimension(col)==eq + elif op == '!=': + cond = ~(Dimension(col)==eq) + elif op in ('in', 'not in'): + fields = [] + splitted = eq.split(',') + if len(splitted) > 1: + for s in eq.split(','): + s = s.strip() + fields.append(Filter.build_filter(Dimension(col)==s)) + cond = Filter(type="or", fields=fields) + else: + cond = Dimension(col)==eq + if op == 'not in': + cond = ~cond + if filters: + filters = Filter(type="and", fields=[ + Filter.build_filter(cond), + Filter.build_filter(filters) + ]) + else: + filters = cond + + if filters: + qry['filter'] = filters client = self.cluster.get_pydruid_client() + orig_filters = filters if timeseries_limit: # Limit on the number of timeseries, doing a two-phases query pre_qry = deepcopy(qry) pre_qry['granularity'] = "all" - client.groupby(**qry) + pre_qry['limit_spec'] = { + "type": "default", + "limit": timeseries_limit, + "columns": [{ + "dimension": metrics[0] if metrics else self.metrics[0], + "direction": "descending", + }], + } + client.groupby(**pre_qry) df = client.export_pandas() if not df is None and not df.empty: dims = qry['dimensions'] @@ -354,12 +407,12 @@ def query( if filters: ff = Filter(type="or", fields=filters) - if not filter: + if not orig_filters: qry['filter'] = ff else: qry['filter'] = Filter(type="and", fields=[ Filter.build_filter(ff), - Filter.build_filter(filter)]) + Filter.build_filter(orig_filters)]) qry['limit_spec'] = None client.groupby(**qry) diff --git a/app/views.py b/app/views.py index 7f6693814570d..80ed7289cda15 100644 --- a/app/views.py +++ b/app/views.py @@ -123,7 +123,7 @@ class DatabaseView(ModelView, DeleteMixin): class TableView(ModelView, DeleteMixin): datamodel = SQLAInterface(models.Table) list_columns = ['table_link', 'database'] - add_columns = ['table_name', 'database'] + add_columns = ['table_name', 'database', 'default_endpoint'] edit_columns = add_columns related_views = [TableColumnInlineView, SqlMetricInlineView] diff --git a/app/viz.py b/app/viz.py index 3ccc3fbf70b61..d4929c2df83d7 100644 --- a/app/viz.py +++ b/app/viz.py @@ -90,36 +90,13 @@ def form_class(self): def query_filters(self): args = self.form_data # Building filters - filters = None + filters = [] for i in range(1, 10): col = args.get("flt_col_" + str(i)) op = args.get("flt_op_" + str(i)) eq = args.get("flt_eq_" + str(i)) if col and op and eq: - cond = None - if op == '==': - cond = Dimension(col)==eq - elif op == '!=': - cond = ~(Dimension(col)==eq) - elif op in ('in', 'not in'): - fields = [] - splitted = eq.split(',') - if len(splitted) > 1: - for s in eq.split(','): - s = s.strip() - fields.append(Filter.build_filter(Dimension(col)==s)) - cond = Filter(type="or", fields=fields) - else: - cond = Dimension(col)==eq - if op == 'not in': - cond = ~cond - if filters: - filters = Filter(type="and", fields=[ - Filter.build_filter(cond), - Filter.build_filter(filters) - ]) - else: - filters = cond + filters.append((col, op, eq)) return filters def bake_query(self): @@ -150,18 +127,9 @@ def query_obj(self): 'to_dttm': to_dttm, 'groupby': groupby, 'metrics': metrics, - 'limit_spec': { - "type": "default", - "limit": limit, - "columns": [{ - "dimension": metrics[0] if metrics else self.metrics[0], - "direction": "descending", - }], - }, + 'filter': self.query_filters(), + 'timeseries_limit': limit, } - filters = self.query_filters() - if filters: - d['filter'] = filters return d def df_prep(self): From 8ea0157f1c084a2edaee40c485f1cf21e2f5bcc6 Mon Sep 17 00:00:00 2001 From: Maxime Date: Thu, 6 Aug 2015 07:00:17 +0000 Subject: [PATCH 6/6] SQL working ! --- app/models.py | 29 +++++++++++++++++++++-------- app/viz.py | 3 ++- 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/app/models.py b/app/models.py index 95176ba80f56c..89440f90749ee 100644 --- a/app/models.py +++ b/app/models.py @@ -9,7 +9,7 @@ from pydruid import client from pydruid.utils.filters import Dimension, Filter -from copy import deepcopy +from copy import deepcopy, copy import logging import json import requests @@ -91,12 +91,18 @@ def query( from_dttm_iso = from_dttm.isoformat() to_dttm_iso = to_dttm.isoformat() + if metrics: + main_metric_expr = [m.expression for m in self.metrics if m.metric_name == metrics[0]][0] + else: + main_metric_expr = "COUNT(*)" + select_exprs = [] groupby_exprs = [] if groupby: - select_exprs = groupby + select_exprs = copy(groupby) groupby_exprs = [s for s in groupby] + inner_groupby_exprs = [s for s in groupby] select_exprs += metrics_exprs if granularity != "all": select_exprs += ['ds as timestamp'] @@ -118,21 +124,28 @@ def query( "{col} {op} ({l})".format(**locals()) ) where_clause = " AND\n".join(where_clause).format(**locals()) - if timeseries_limit: + on_clause = " AND ".join(["{g} = __{g}".format(g=g) for g in groupby]) + limiting_join = "" + if timeseries_limit and groupby: + inner_select = ", ".join(["{g} as __{g}".format(g=g) for g in inner_groupby_exprs]) + inner_groupby_exprs = ", ".join(inner_groupby_exprs) limiting_join = """ JOIN ( - SELECT {groupby_exprs} + SELECT {inner_select} FROM {self.table_name} - GROUP BY {groupby_exprs} - ORDER BY {metric} DESC + WHERE + {where_clause} + GROUP BY {inner_groupby_exprs} + ORDER BY {main_metric_expr} DESC LIMIT {timeseries_limit} - ) z ON - """ + ) z ON {on_clause} + """.format(**locals()) sql = """ SELECT {select_exprs} FROM {self.table_name} + {limiting_join} WHERE {where_clause} GROUP BY diff --git a/app/viz.py b/app/viz.py index d4929c2df83d7..42c9570b4710a 100644 --- a/app/viz.py +++ b/app/viz.py @@ -80,7 +80,8 @@ def __init__(self, datasource, form_data, view): self.df = self.bake_query() self.view = view if self.df is not None: - self.df.timestamp = pd.to_datetime(self.df.timestamp) + if 'timestamp' in self.df.columns: + self.df.timestamp = pd.to_datetime(self.df.timestamp) self.df_prep() self.form_prep()