Skip to content

Commit

Permalink
Improve Druid metadata fetching resilience (#1584)
Browse files Browse the repository at this point in the history
  • Loading branch information
mistercrunch authored Nov 11, 2016
1 parent d6bc354 commit 96d32dd
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 12 deletions.
12 changes: 8 additions & 4 deletions superset/bin/superset
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,18 @@ def load_examples(load_test_data):
print("Loading [Unicode test data]")
data.load_unicode_test_data()

@manager.command
def refresh_druid():
"""Refresh all druid datasources"""
@manager.option(
'-d', '--datasource',
help=(
"Specify which datasource name to load, if omitted, all "
"datasources will be refreshed"))
def refresh_druid(datasource):
"""Refresh druid datasources"""
session = db.session()
from superset import models
for cluster in session.query(models.DruidCluster).all():
try:
cluster.refresh_datasources()
cluster.refresh_datasources(datasource_name=datasource)
except Exception as e:
print(
"Error while processing cluster '{}'\n{}".format(
Expand Down
42 changes: 34 additions & 8 deletions superset/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1522,11 +1522,16 @@ def get_druid_version(self):
).format(obj=self)
return json.loads(requests.get(endpoint).text)['version']

def refresh_datasources(self):
def refresh_datasources(self, datasource_name=None):
"""Refresh metadata of all datasources in the cluster
If ``datasource_name`` is specified, only that datasource is updated
"""
self.druid_version = self.get_druid_version()
for datasource in self.get_datasources():
if datasource not in config.get('DRUID_DATA_SOURCE_BLACKLIST'):
DruidDatasource.sync_to_db(datasource, self)
if not datasource_name or datasource_name == datasource:
DruidDatasource.sync_to_db(datasource, self)

@property
def perm(self):
Expand Down Expand Up @@ -1670,15 +1675,35 @@ def latest_metadata(self):
# we need to set this interval to more than 1 day ago to exclude
# realtime segments, which trigged a bug (fixed in druid 0.8.2).
# https://groups.google.com/forum/#!topic/druid-user/gVCqqspHqOQ
start = (0 if self.version_higher(self.cluster.druid_version, '0.8.2') else 1)
intervals = (max_time - timedelta(days=7)).isoformat() + '/'
intervals += (max_time - timedelta(days=start)).isoformat()
segment_metadata = client.segment_metadata(
datasource=self.datasource_name,
intervals=intervals)
lbound = (max_time - timedelta(days=7)).isoformat()
rbound = max_time.isoformat()
if not self.version_higher(self.cluster.druid_version, '0.8.2'):
rbound = (max_time - timedelta(1)).isoformat()
segment_metadata = None
try:
segment_metadata = client.segment_metadata(
datasource=self.datasource_name,
intervals=lbound + '/' + rbound)
except Exception as e:
logging.warning("Failed first attempt to get latest segment")
logging.exception(e)
if not segment_metadata:
# if no segments in the past 7 days, look at all segments
lbound = datetime(1901, 1, 1).isoformat()[:10]
rbound = datetime(2050, 1, 1).isoformat()[:10]
if not self.version_higher(self.cluster.druid_version, '0.8.2'):
rbound = datetime.now().isoformat()[:10]
try:
segment_metadata = client.segment_metadata(
datasource=self.datasource_name,
intervals=lbound + '/' + rbound)
except Exception as e:
logging.warning("Failed 2nd attempt to get latest segment")
logging.exception(e)
if segment_metadata:
return segment_metadata[-1]['columns']


def generate_metrics(self):
for col in self.columns:
col.generate_metrics()
Expand Down Expand Up @@ -1774,6 +1799,7 @@ def sync_to_db(cls, name, cluster):

cols = datasource.latest_metadata()
if not cols:
logging.error("Failed at fetching the latest segment")
return
for col in cols:
col_obj = (
Expand Down

0 comments on commit 96d32dd

Please sign in to comment.