Skip to content

Commit

Permalink
ALL OF SQL is coming along nicely
Browse files Browse the repository at this point in the history
  • Loading branch information
mistercrunch committed Aug 5, 2015
1 parent 2d192d1 commit 34acc90
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 73 deletions.
136 changes: 120 additions & 16 deletions app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,16 @@ class Database(Model, AuditMixin):
def __repr__(self):
return self.database_name

def get_sqla_engine(self):
return create_engine(self.sqlalchemy_uri)

def get_table(self):
meta = MetaData()
return sqlaTable(
self.table_name, meta,
autoload=True,
autoload_with=self.get_sqla_engine())


class Table(Model, AuditMixin, Queryable):
__tablename__ = 'tables'
Expand All @@ -48,6 +58,10 @@ class Table(Model, AuditMixin, Queryable):
database = relationship(
'Database', backref='tables', foreign_keys=[database_id])

@property
def name(self):
return self.table_name

@property
def table_link(self):
url = "/panoramix/table/{}/".format(self.id)
Expand All @@ -57,18 +71,62 @@ def table_link(self):
def metrics_combo(self):
return sorted(
[
(
'sum__{}'.format(m.column_name),
'SUM({})'.format(m.column_name),
)
for m in self.columns if m.sum],
(m.metric_name, m.verbose_name)
for m in self.metrics],
key=lambda x: x[1])

def query(
self, groupby, metrics,
granularity,
from_dttm, to_dttm,
limit_spec=None,
filter=None,
is_timeseries=True,
timeseries_limit=15, row_limit=None):
from pandas import read_sql_query
metrics_exprs = [
"{} AS {}".format(m.expression, m.metric_name)
for m in self.metrics if m.metric_name in metrics]
from_dttm_iso = from_dttm.isoformat()
to_dttm_iso = to_dttm.isoformat()

select_exprs = []
groupby_exprs = []

if groupby:
select_exprs = groupby
groupby_exprs = [s for s in groupby]
select_exprs += metrics_exprs
if granularity != "all":
select_exprs += ['ds as timestamp']
groupby_exprs += ['ds']

select_exprs = ",\n".join(select_exprs)
groupby_exprs = ",\n".join(groupby_exprs)

where_clause = [
"ds >= '{from_dttm_iso}'",
"ds < '{to_dttm_iso}'"
]
where_clause = " AND\n".join(where_clause).format(**locals())
sql = """
SELECT
{select_exprs}
FROM {self.table_name}
WHERE
{where_clause}
GROUP BY
{groupby_exprs}
""".format(**locals())
df = read_sql_query(
sql=sql,
con=self.database.get_sqla_engine()
)
return df


def fetch_metadata(self):
engine = create_engine(self.database.sqlalchemy_uri)
meta = MetaData()
table = sqlaTable(
self.table_name, meta, autoload=True, autoload_with=engine)
table = self.database.get_table(self.table_name)
TC = TableColumn
for col in table.columns:
dbcol = (
Expand All @@ -90,13 +148,28 @@ def fetch_metadata(self):
db.session.commit()


class SqlMetric(Model):
__tablename__ = 'sql_metrics'
id = Column(Integer, primary_key=True)
metric_name = Column(String(512))
verbose_name = Column(String(1024))
metric_type = Column(String(32))
table_id = Column(
String(256),
ForeignKey('tables.id'))
table = relationship(
'Table', backref='metrics', foreign_keys=[table_id])
expression = Column(Text)
description = Column(Text)


class TableColumn(Model, AuditMixin):
__tablename__ = 'table_columns'
id = Column(Integer, primary_key=True)
table_id = Column(
String(256),
ForeignKey('tables.id'))
table = relationship('Table', backref='columns')
table = relationship('Table', backref='columns', foreign_keys=[table_id])
column_name = Column(String(256))
is_dttm = Column(Boolean, default=True)
is_active = Column(Boolean, default=True)
Expand Down Expand Up @@ -138,11 +211,8 @@ def refresh_datasources(self):
).format(self=self)
datasources = json.loads(requests.get(endpoint).text)
for datasource in datasources:
#try:
Datasource.sync_to_db(datasource, self)
#except Exception as e:
# logging.exception(e)
# logging.error("Failed at syncing " + datasource)
Datasource.sync_to_db(datasource, self)


class Datasource(Model, AuditMixin, Queryable):
__tablename__ = 'datasources'
Expand All @@ -165,6 +235,10 @@ def metrics_combo(self):
[(m.metric_name, m.verbose_name) for m in self.metrics],
key=lambda x: x[1])

@property
def name(self):
return self.datasource_name

def __repr__(self):
return self.datasource_name

Expand Down Expand Up @@ -227,7 +301,37 @@ def sync_to_db(cls, name, cluster):
col_obj.datasource = datasource
col_obj.generate_metrics()
#session.commit()

def query(
self, groupby, metrics,
granularity,
from_dttm, to_dttm,
limit_spec=None,
filter=None,
is_timeseries=True,
timeseries_limit=15, row_limit=None):

aggregations = {
m.metric_name: m.json_obj
for m in self.metrics if m.metric_name in metrics
}
if not isinstance(granularity, basestring):
granularity = {"type": "duration", "duration": granularity}

qry = dict(
datasource=self.datasource_name,
dimensions=groupby,
aggregations=aggregations,
granularity=granularity,
intervals= from_dttm.isoformat() + '/' + to_dttm.isoformat(),
)
if filter:
qry['filter'] = filter
if limit_spec:
qry['limit_spec'] = limit_spec
client = self.cluster.get_pydruid_client()
client.groupby(**qry)
df = client.export_pandas()
return df


class Metric(Model):
Expand Down
2 changes: 1 addition & 1 deletion app/templates/panoramix/datasource.html
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<div class="container-fluid">
<div class="col-md-3">
<h3>
{{ datasource.datasource_name }}
{{ datasource.name }}
{% if datasource.description %}
<i class="fa fa-info-circle" data-toggle="tooltip" data-placement="bottom" title="{{ datasource.description }}"></i>
{% endif %}
Expand Down
12 changes: 11 additions & 1 deletion app/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,16 @@ def post_update(self, col):

appbuilder.add_view_no_menu(ColumnInlineView)

class SqlMetricInlineView(CompactCRUDMixin, ModelView):
datamodel = SQLAInterface(models.SqlMetric)
list_columns = ['metric_name', 'verbose_name', 'metric_type' ]
edit_columns = [
'metric_name', 'description', 'verbose_name', 'metric_type',
'table', 'expression']
add_columns = edit_columns
page_size = 100
appbuilder.add_view_no_menu(SqlMetricInlineView)


class MetricInlineView(CompactCRUDMixin, ModelView):
datamodel = SQLAInterface(models.Metric)
Expand Down Expand Up @@ -115,7 +125,7 @@ class TableView(ModelView, DeleteMixin):
list_columns = ['table_link', 'database']
add_columns = ['table_name', 'database']
edit_columns = add_columns
related_views = [TableColumnInlineView]
related_views = [TableColumnInlineView, SqlMetricInlineView]

def post_insert(self, table):
table.fetch_metadata()
Expand Down
73 changes: 18 additions & 55 deletions app/viz.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,6 @@
'target_div': 'chart',
}

class BaseQuery(object):
def __init__(
self, groupby, metrics, filters,
is_timeseries,
timeseries_limit=15, row_limit=None):
self.groupby = groupby
self.metrics = metrics
self.filters = filters
self.is_timeseries = is_timeseries
self.timeseries_limit = timeseries_limit
self.row_limit = row_limit

def run(self):
start = datetime.now()
self._execute()
self.duration = (datetime.now() - start).total_seconds()

def _execution(self):
raise NotImplemented()

def pandas_df(self):
raise NotImplemented()



class OmgWtForm(Form):
field_order = (
Expand Down Expand Up @@ -146,39 +122,39 @@ def query_filters(self):
filters = cond
return filters

def bake_query(self):
return self.datasource.query(**self.query_obj())

def query_obj(self):
ds = self.datasource
args = self.form_data
groupby = args.getlist("groupby") or []
metrics = args.getlist("metrics") or ['count']
granularity = args.get("granularity", "1 day")
granularity = utils.parse_human_timedelta(granularity).total_seconds() * 1000
aggregations = {
m.metric_name: m.json_obj
for m in ds.metrics if m.metric_name in self.metrics
}
granularity = utils.parse_human_timedelta(
granularity).total_seconds() * 1000
limit = int(
args.get("limit", config.ROW_LIMIT)) or config.ROW_LIMIT
since = args.get("since", "1 year ago")
from_dttm = utils.parse_human_datetime(since)
if from_dttm > datetime.now():
from_dttm = datetime.now() - (from_dttm-datetime.now())
from_dttm = from_dttm.isoformat()
until = args.get("until", "now")
to_dttm = utils.parse_human_datetime(until).isoformat()
to_dttm = utils.parse_human_datetime(until)
if from_dttm >= to_dttm:
flash("The date range doesn't seem right.", "danger")
from_dttm = to_dttm # Making them identicial to not raise
d = {
'datasource': ds.datasource_name,
'granularity': {"type": "duration", "duration": granularity},
'intervals': from_dttm + '/' + to_dttm,
'dimensions': groupby,
'aggregations': aggregations,
'granularity': granularity,
'from_dttm': from_dttm,
'to_dttm': to_dttm,
'groupby': groupby,
'metrics': metrics,
'limit_spec': {
"type": "default",
"limit": limit,
"columns": [{
"dimension": self.metrics[0],
"dimension": metrics[0] if metrics else self.metrics[0],
"direction": "descending",
}],
},
Expand All @@ -188,17 +164,7 @@ def query_obj(self):
d['filter'] = filters
return d

def bake_query(self):
client = self.datasource.cluster.get_pydruid_client()
client.groupby(**self.query_obj())
return client.export_pandas()

def get_query(self):
client = self.datasource.cluster.get_pydruid_client()
client.groupby(**self.query_obj())
return client.query_dict

def df_prep(self, ):
def df_prep(self):
pass

def form_prep(self):
Expand Down Expand Up @@ -290,14 +256,12 @@ def bake_query(self):
"""
Doing a 2 phase query where we limit the number of series.
"""
client = self.datasource.cluster.get_pydruid_client()
qry = self.query_obj()
orig_filter = qry['filter'] if 'filter' in qry else ''
qry['granularity'] = "all"
client.groupby(**qry)
df = client.export_pandas()
df = self.datasource.query(**qry)
if not df is None:
dims = qry['dimensions']
dims = qry['groupby']
filters = []
for index, row in df.iterrows():
fields = []
Expand All @@ -318,9 +282,8 @@ def bake_query(self):
qry['filter'] = Filter(type="and", fields=[
Filter.build_filter(ff),
Filter.build_filter(orig_filter)])
del qry['limit_spec']
client.groupby(**qry)
return client.export_pandas()
qry['limit_spec'] = None
return self.datasource.query(**qry)

class TimeSeriesCompareViz(TimeSeriesViz):
verbose_name = "Time Series - Percent Change"
Expand Down

0 comments on commit 34acc90

Please sign in to comment.