diff --git a/app/models.py b/app/models.py index 1a7630a0a3554..89440f90749ee 100644 --- a/app/models.py +++ b/app/models.py @@ -1,16 +1,220 @@ from flask.ext.appbuilder import Model -from pydruid import client from datetime import timedelta -from flask.ext.appbuilder.models.mixins import AuditMixin, FileColumn +from flask.ext.appbuilder.models.mixins import AuditMixin from sqlalchemy import Column, Integer, String, ForeignKey, Text, Boolean, DateTime +from sqlalchemy import create_engine, MetaData +from sqlalchemy import Table as sqlaTable from sqlalchemy.orm import relationship -from app import get_session from dateutil.parser import parse +from pydruid import client +from pydruid.utils.filters import Dimension, Filter +from copy import deepcopy, copy import logging import json import requests +from app import db, get_session + +class Queryable(object): + @property + def column_names(self): + return sorted([c.column_name for c in self.columns]) + + @property + def groupby_column_names(self): + return sorted([c.column_name for c in self.columns if c.groupby]) + + @property + def filterable_column_names(self): + return sorted([c.column_name for c in self.columns if c.filterable]) + +class Database(Model, AuditMixin): + __tablename__ = 'databases' + id = Column(Integer, primary_key=True) + database_name = Column(String(256), unique=True) + sqlalchemy_uri = Column(String(1024)) + + def __repr__(self): + return self.database_name + + def get_sqla_engine(self): + return create_engine(self.sqlalchemy_uri) + + def get_table(self, table_name): + meta = MetaData() + return sqlaTable( + table_name, meta, + autoload=True, + autoload_with=self.get_sqla_engine()) + + +class Table(Model, AuditMixin, Queryable): + __tablename__ = 'tables' + id = Column(Integer, primary_key=True) + table_name = Column(String(256), unique=True) + default_endpoint = Column(Text) + database_id = Column( + String(256), ForeignKey('databases.id')) + database = relationship( + 'Database', backref='tables', foreign_keys=[database_id]) + + @property + def name(self): + return self.table_name + + @property + def table_link(self): + url = "/panoramix/table/{}/".format(self.id) + return '{self.table_name}'.format(**locals()) + + @property + def metrics_combo(self): + return sorted( + [ + (m.metric_name, m.verbose_name) + for m in self.metrics], + key=lambda x: x[1]) + + def query( + self, groupby, metrics, + granularity, + from_dttm, to_dttm, + limit_spec=None, + filter=None, + is_timeseries=True, + timeseries_limit=15, row_limit=None): + from pandas import read_sql_query + metrics_exprs = [ + "{} AS {}".format(m.expression, m.metric_name) + for m in self.metrics if m.metric_name in metrics] + from_dttm_iso = from_dttm.isoformat() + to_dttm_iso = to_dttm.isoformat() + + if metrics: + main_metric_expr = [m.expression for m in self.metrics if m.metric_name == metrics[0]][0] + else: + main_metric_expr = "COUNT(*)" + + select_exprs = [] + groupby_exprs = [] + + if groupby: + select_exprs = copy(groupby) + groupby_exprs = [s for s in groupby] + inner_groupby_exprs = [s for s in groupby] + select_exprs += metrics_exprs + if granularity != "all": + select_exprs += ['ds as timestamp'] + groupby_exprs += ['ds'] + + select_exprs = ",\n".join(select_exprs) + groupby_exprs = ",\n".join(groupby_exprs) + + where_clause = [ + "ds >= '{from_dttm_iso}'", + "ds < '{to_dttm_iso}'" + ] + for col, op, eq in filter: + if op in ('in', 'not in'): + l = ["'{}'".format(s) for s in eq.split(",")] + l = ", ".join(l) + op = op.upper() + where_clause.append( + "{col} {op} ({l})".format(**locals()) + ) + where_clause = " AND\n".join(where_clause).format(**locals()) + on_clause = " AND ".join(["{g} = __{g}".format(g=g) for g in groupby]) + limiting_join = "" + if timeseries_limit and groupby: + inner_select = ", ".join(["{g} as __{g}".format(g=g) for g in inner_groupby_exprs]) + inner_groupby_exprs = ", ".join(inner_groupby_exprs) + limiting_join = """ + JOIN ( + SELECT {inner_select} + FROM {self.table_name} + WHERE + {where_clause} + GROUP BY {inner_groupby_exprs} + ORDER BY {main_metric_expr} DESC + LIMIT {timeseries_limit} + ) z ON {on_clause} + """.format(**locals()) + + sql = """ + SELECT + {select_exprs} + FROM {self.table_name} + {limiting_join} + WHERE + {where_clause} + GROUP BY + {groupby_exprs} + """.format(**locals()) + df = read_sql_query( + sql=sql, + con=self.database.get_sqla_engine() + ) + return df + + + def fetch_metadata(self): + table = self.database.get_table(self.table_name) + TC = TableColumn + for col in table.columns: + dbcol = ( + db.session + .query(TC) + .filter(TC.table==self) + .filter(TC.column_name==col.name) + .first() + ) + db.session.flush() + if not dbcol: + dbcol = TableColumn(column_name=col.name) + if str(col.type) in ('VARCHAR', 'STRING'): + dbcol.groupby = True + dbcol.filterable = True + self.columns.append(dbcol) + + dbcol.type = str(col.type) + db.session.commit() + + +class SqlMetric(Model): + __tablename__ = 'sql_metrics' + id = Column(Integer, primary_key=True) + metric_name = Column(String(512)) + verbose_name = Column(String(1024)) + metric_type = Column(String(32)) + table_id = Column( + String(256), + ForeignKey('tables.id')) + table = relationship( + 'Table', backref='metrics', foreign_keys=[table_id]) + expression = Column(Text) + description = Column(Text) + + +class TableColumn(Model, AuditMixin): + __tablename__ = 'table_columns' + id = Column(Integer, primary_key=True) + table_id = Column( + String(256), + ForeignKey('tables.id')) + table = relationship('Table', backref='columns', foreign_keys=[table_id]) + column_name = Column(String(256)) + is_dttm = Column(Boolean, default=True) + is_active = Column(Boolean, default=True) + type = Column(String(32), default='') + groupby = Column(Boolean, default=False) + count_distinct = Column(Boolean, default=False) + sum = Column(Boolean, default=False) + max = Column(Boolean, default=False) + min = Column(Boolean, default=False) + filterable = Column(Boolean, default=False) + description = Column(Text, default='') + class Cluster(Model, AuditMixin): __tablename__ = 'clusters' @@ -40,13 +244,10 @@ def refresh_datasources(self): ).format(self=self) datasources = json.loads(requests.get(endpoint).text) for datasource in datasources: - #try: - Datasource.sync_to_db(datasource, self) - #except Exception as e: - # logging.exception(e) - # logging.error("Failed at syncing " + datasource) + Datasource.sync_to_db(datasource, self) + -class Datasource(Model, AuditMixin): +class Datasource(Model, AuditMixin, Queryable): __tablename__ = 'datasources' id = Column(Integer, primary_key=True) datasource_name = Column(String(256), unique=True) @@ -67,6 +268,10 @@ def metrics_combo(self): [(m.metric_name, m.verbose_name) for m in self.metrics], key=lambda x: x[1]) + @property + def name(self): + return self.datasource_name + def __repr__(self): return self.datasource_name @@ -130,17 +335,102 @@ def sync_to_db(cls, name, cluster): col_obj.generate_metrics() #session.commit() - @property - def column_names(self): - return sorted([c.column_name for c in self.columns]) + def query( + self, groupby, metrics, + granularity, + from_dttm, to_dttm, + limit_spec=None, + filter=None, + is_timeseries=True, + timeseries_limit=15, row_limit=None): - @property - def groupby_column_names(self): - return sorted([c.column_name for c in self.columns if c.groupby]) + aggregations = { + m.metric_name: m.json_obj + for m in self.metrics if m.metric_name in metrics + } + if not isinstance(granularity, basestring): + granularity = {"type": "duration", "duration": granularity} - @property - def filterable_column_names(self): - return sorted([c.column_name for c in self.columns if c.filterable]) + qry = dict( + datasource=self.datasource_name, + dimensions=groupby, + aggregations=aggregations, + granularity=granularity, + intervals= from_dttm.isoformat() + '/' + to_dttm.isoformat(), + ) + filters = None + for col, op, eq in filter: + cond = None + if op == '==': + cond = Dimension(col)==eq + elif op == '!=': + cond = ~(Dimension(col)==eq) + elif op in ('in', 'not in'): + fields = [] + splitted = eq.split(',') + if len(splitted) > 1: + for s in eq.split(','): + s = s.strip() + fields.append(Filter.build_filter(Dimension(col)==s)) + cond = Filter(type="or", fields=fields) + else: + cond = Dimension(col)==eq + if op == 'not in': + cond = ~cond + if filters: + filters = Filter(type="and", fields=[ + Filter.build_filter(cond), + Filter.build_filter(filters) + ]) + else: + filters = cond + + if filters: + qry['filter'] = filters + + client = self.cluster.get_pydruid_client() + orig_filters = filters + if timeseries_limit: + # Limit on the number of timeseries, doing a two-phases query + pre_qry = deepcopy(qry) + pre_qry['granularity'] = "all" + pre_qry['limit_spec'] = { + "type": "default", + "limit": timeseries_limit, + "columns": [{ + "dimension": metrics[0] if metrics else self.metrics[0], + "direction": "descending", + }], + } + client.groupby(**pre_qry) + df = client.export_pandas() + if not df is None and not df.empty: + dims = qry['dimensions'] + filters = [] + for index, row in df.iterrows(): + fields = [] + for dim in dims: + f = Filter.build_filter(Dimension(dim) == row[dim]) + fields.append(f) + if len(fields) > 1: + filt = Filter(type="and", fields=fields) + filters.append(Filter.build_filter(filt)) + elif fields: + filters.append(fields[0]) + + if filters: + ff = Filter(type="or", fields=filters) + if not orig_filters: + qry['filter'] = ff + else: + qry['filter'] = Filter(type="and", fields=[ + Filter.build_filter(ff), + Filter.build_filter(orig_filters)]) + qry['limit_spec'] = None + + client.groupby(**qry) + df = client.export_pandas() + return df class Metric(Model): diff --git a/app/templates/panoramix/datasource.html b/app/templates/panoramix/datasource.html index 0a53cc6543ad0..599d80efb4253 100644 --- a/app/templates/panoramix/datasource.html +++ b/app/templates/panoramix/datasource.html @@ -22,7 +22,7 @@