Skip to content

Commit

Permalink
Support week_ending_saturday for Druid. (#1491)
Browse files Browse the repository at this point in the history
* Support week_ending_saturday for Druid.

* Use period granularity

* Use ISO 8601 for period definitions.

* Fix tests

* More flexibility for the freeform choices.
  • Loading branch information
bkyryliuk authored Nov 3, 2016
1 parent 1700a80 commit ae46561
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 16 deletions.
4 changes: 4 additions & 0 deletions caravel/forms.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,10 @@ def __init__(self, viz):
('6 hour', _('6 hour')),
('1 day', _('1 day')),
('7 days', _('7 days')),
('week', _('week')),
('week_starting_sunday', _('week_starting_sunday')),
('week_ending_saturday', _('week_ending_saturday')),
('month', _('month')),
),
"description": _(
"The time granularity for the visualization. Note that you "
Expand Down
80 changes: 66 additions & 14 deletions caravel/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1604,7 +1604,9 @@ def time_column_grains(self):
return {
"time_columns": [
'all', '5 seconds', '30 seconds', '1 minute',
'5 minutes', '1 hour', '6 hour', '1 day', '7 days'
'5 minutes', '1 hour', '6 hour', '1 day', '7 days',
'week', 'week_starting_sunday', 'week_ending_saturday',
'month',
],
"time_grains": ['now']
}
Expand Down Expand Up @@ -1793,6 +1795,56 @@ def sync_to_db(cls, name, cluster):
col_obj.generate_metrics()
session.flush()

@staticmethod
def time_offset(granularity):
if granularity == 'week_ending_saturday':
return 6 * 24 * 3600 * 1000 # 6 days
return 0

# uses https://en.wikipedia.org/wiki/ISO_8601
# http://druid.io/docs/0.8.0/querying/granularities.html
# TODO: pass origin from the UI
@staticmethod
def granularity(period_name, timezone=None):
if not period_name or period_name == 'all':
return 'all'
iso_8601_dict = {
'5 seconds': 'PT5S',
'30 seconds': 'PT30S',
'1 minute': 'PT1M',
'5 minutes': 'PT5M',
'1 hour': 'PT1H',
'6 hour': 'PT6H',
'one day': 'P1D',
'1 day': 'P1D',
'7 days': 'P7D',
'week': 'P1W',
'week_starting_sunday': 'P1W',
'week_ending_saturday': 'P1W',
'month': 'P1M',
}

granularity = {'type': 'period'}
if timezone:
granularity['timezone'] = timezone

if period_name in iso_8601_dict:
granularity['period'] = iso_8601_dict[period_name]
if period_name in ('week_ending_saturday', 'week_starting_sunday'):
# use Sunday as start of the week
granularity['origin'] = '2016-01-03T00:00:00'
elif not isinstance(period_name, string_types):
granularity['type'] = 'duration'
granularity['duration'] = period_name
elif period_name.startswith('P'):
# identify if the string is the iso_8601 period
granularity['period'] = period_name
else:
granularity['type'] = 'duration'
granularity['duration'] = utils.parse_human_timedelta(
period_name).total_seconds() * 1000
return granularity

def query( # druid
self, groupby, metrics,
granularity,
Expand Down Expand Up @@ -1820,6 +1872,7 @@ def query( # druid
# add tzinfo to native datetime with config
from_dttm = from_dttm.replace(tzinfo=config.get("DRUID_TZ"))
to_dttm = to_dttm.replace(tzinfo=config.get("DRUID_TZ"))
timezone = from_dttm.tzname()

query_str = ""
metrics_dict = {m.metric_name: m for m in self.metrics}
Expand All @@ -1835,7 +1888,6 @@ def recursive_get_fields(_conf):
field_names.append(_f.get('fieldName'))
elif _type == 'arithmetic':
field_names += recursive_get_fields(_f)

return list(set(field_names))

for metric_name in metrics:
Expand Down Expand Up @@ -1874,22 +1926,12 @@ def recursive_get_fields(_conf):
"Access to the metrics denied: " + ', '.join(rejected_metrics)
)

granularity = granularity or "all"
if granularity != "all":
granularity = utils.parse_human_timedelta(
granularity).total_seconds() * 1000
if not isinstance(granularity, string_types):
granularity = {"type": "duration", "duration": granularity}
origin = extras.get('druid_time_origin')
if origin:
dttm = utils.parse_human_datetime(origin)
granularity['origin'] = dttm.isoformat()

qry = dict(
datasource=self.datasource_name,
dimensions=groupby,
aggregations=aggregations,
granularity=granularity,
granularity=DruidDatasource.granularity(
granularity, timezone=timezone),
post_aggregations=post_aggs,
intervals=from_dttm.isoformat() + '/' + to_dttm.isoformat(),
)
Expand Down Expand Up @@ -1995,6 +2037,16 @@ def recursive_get_fields(_conf):
cols += [col for col in groupby if col in df.columns]
cols += [col for col in metrics if col in df.columns]
df = df[cols]

time_offset = DruidDatasource.time_offset(granularity)

def increment_timestamp(ts):
dt = utils.parse_human_datetime(ts).replace(
tzinfo=config.get("DRUID_TZ"))
return dt + timedelta(milliseconds=time_offset)
if 'timestamp' in df.columns and time_offset:
df.timestamp = df.timestamp.apply(increment_timestamp)

return QueryResult(
df=df,
query=query_str,
Expand Down
1 change: 0 additions & 1 deletion tests/base_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ def get_druid_ds_by_name(self, name):
return db.session.query(models.DruidDatasource).filter_by(
datasource_name=name).first()


def get_resp(self, url):
"""Shortcut to get the parsed results while following redirects"""
resp = self.client.get(url, follow_redirects=True)
Expand Down
2 changes: 1 addition & 1 deletion tests/druid_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def test_client(self, PyDruid):

resp = self.get_resp('/caravel/explore/druid/{}/'.format(
datasource_id))
assert "[test_cluster].[test_datasource]" in resp
self.assertIn("[test_cluster].[test_datasource]", resp)

# One groupby
url = (
Expand Down

0 comments on commit ae46561

Please sign in to comment.