Skip to content

Commit

Permalink
Made queries for multiple unique tag values across multiple measureme…
Browse files Browse the repository at this point in the history
…nts run in a single statement - resolves #15
  • Loading branch information
Dan committed Jan 18, 2017
1 parent b470fcd commit 8f994b0
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 94 deletions.
119 changes: 50 additions & 69 deletions influxgraph/classes/finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,108 +359,89 @@ def _get_template_values_from_paths(self, paths, _filter, template,
'fields', []):
path_measurements[measurement].setdefault(
'fields', []).append(field)
path_measurements[measurement]['tags'] = list(
itertools.product(*_tags.values()))
path_measurements[measurement].setdefault(
'template', template)
return _measurements, _tags, _fields, matched_paths

def _get_all_template_values(self, paths):
paths = paths[:]
query_data = deque()
path_measurements = {}
measurements, tags, fields = deque(), deque(), set()
for (_filter, template, default_tags, separator) in self.graphite_templates:
# One influx query statement per template
# One influx measurement queried per template
if not paths:
break
_measurements, _tags, _fields, matched_paths = \
self._get_template_values_from_paths(
paths, _filter, template, default_tags, separator,
path_measurements)
if _measurements:
# Found template match for path, add query data and
# Found template match for path, append query data and
# remove matched paths so we do not try to match them again
query_data.append((_measurements, _tags, _fields))
measurements.extend(_measurements)
tags.append(_tags)
fields = fields.union(_fields)
for path in matched_paths:
del paths[paths.index(path)]
return query_data, path_measurements
return measurements, tags, fields, path_measurements

def _gen_query(self, _measurements, _tags, _fields, retention):
# import ipdb; ipdb.set_trace()
def _gen_query(self, measurements, tags, fields, retention):
groupings = set([k for t in tags for k in t.keys()])
measurements = ', '.join(
('"%s"."%s"' % (retention, measure,) for measure in _measurements)) \
('"%s"."%s"' % (retention, measure,) for measure in measurements)) \
if retention \
else ', '.join(('"%s"' % (measure,) for measure in _measurements))
groupings = deque()
tags = deque()
for tag in _tags:
if len(_tags[tag]) > 1:
groupings.append(tag)
continue
tags.append(""""%s" = '%s'""" % (tag, _tags[tag],))
# tag_sets = [[""""%s" = '%s'""" % (tag, tag_val,)
# for tag_val in _tags[tag]]
# for tag in _tags] \
# if _tags else None
####
# tags = [' AND '.join(['(%s)' % ' OR '.join(
# [""""%s" = '%s'""" % (tag, tag_val,) for tag_val in _tags[tag]])
# for tag in _tags])] if _tags else None
####
# tag_pairs = itertools.product(*tag_sets) if tag_sets else None
# tags = [" AND ".join(t) for t in tag_pairs] if tag_pairs else None
fields = _fields if _fields else ['value']
return measurements, tags, fields, groupings
else ', '.join(('"%s"' % (measure,) for measure in measurements))
_tags = ' OR '.join(['(%s)' % (tag_set,) for tag_set in [
' AND '.join(['(%s)' % ' OR '.join([
""""%s" = '%s'""" % (tag, tag_val,)
for tag_val in __tags[tag]])
for tag in __tags])
for __tags in tags]]) if tags else None
fields = fields if fields else ['value']
return measurements, _tags, fields, groupings

def _gen_query_values_from_templates(self, paths, retention):
_query_data, path_measurements = self._get_all_template_values(paths)
if len(_query_data) == 0:
return
query_data = deque()
for (_measurements, _tags, _fields) in _query_data:
measurements, tags, fields, groupings = self._gen_query(
_measurements, _tags, _fields, retention)
query_data.append((measurements, tags, fields, groupings),)
# import ipdb; ipdb.set_trace()
return query_data, path_measurements
measurements, tags, fields, path_measurements = \
self._get_all_template_values(paths)
measurements, tags, fields, groupings = self._gen_query(
measurements, tags, fields, retention)
return measurements, tags, fields, groupings, path_measurements

def _gen_query_values(self, paths, retention):
if self.graphite_templates:
return self._gen_query_values_from_templates(paths, retention)
measurement = ', '.join(('"%s"."%s"' % (retention, path,) for path in paths)) \
if retention \
else ', '.join(('"%s"' % (path,) for path in paths))
return ((measurement, None, ['value']),), None

def _gen_unique_infl_queries(self, query_data, start_time, end_time,
aggregation_func, interval):
queries = deque()
for (measurement, tags, fields) in query_data:
query_fields = ', '.join(['%s("%s") as "%s"' % (
aggregation_func, field, field) for field in fields])
query = 'select %s from %s where (time > %ds and time <= %ds)' % (
query_fields, measurement, start_time, end_time,)
group_by = 'GROUP BY time(%ss) fill(previous)' % (interval,)
if tags:
# Add sub queries
queries.append(';'.join(
["%s AND %s %s" % (query, tag, group_by,)
for tag in tags]))
else:
queries.append(" ".join([query, group_by]))
return queries
return measurement, None, ['value'], None, None

def _gen_infl_stmt(self, measurements, tags, fields, groupings, start_time,
end_time, aggregation_func, interval):
time_clause = "(time > %ds and time <= %ds)" % (start_time, end_time,)
query_fields = ', '.join(['%s("%s") as "%s"' % (
aggregation_func, field, field) for field in fields])
groupings = ['"%s"' % (grouping,) for grouping in groupings] \
if groupings else []
groupings.insert(0, 'time(%ss)' % (interval,))
groupings = ', '.join(groupings)
where_clause = "%s AND %s" % (time_clause, tags,) if tags else \
time_clause
group_by = '%s fill(previous)' % (groupings,)
query = 'select %s from %s where %s GROUP BY %s' % (
query_fields, measurements, where_clause, group_by,)
return query

def _gen_influxdb_stmt(self, start_time, end_time, paths, interval):
retention = get_retention_policy(interval, self.retention_policies) \
if self.retention_policies else None
aggregation_func = self._gen_aggregation_func(paths)
memcache_key = gen_memcache_key(start_time, end_time, aggregation_func,
paths)
try:
query_data, path_measurements = self._gen_query_values(paths, retention)
except TypeError:
return
queries = self._gen_unique_infl_queries(
query_data, start_time, end_time, aggregation_func, interval)
query = ';'.join(queries)
measurements, tags, fields, groupings, path_measurements = \
self._gen_query_values(paths, retention)
query = self._gen_infl_stmt(measurements, tags, fields, groupings,
start_time, end_time, aggregation_func,
interval)
return query, memcache_key, path_measurements

def _make_empty_multi_fetch_result(self, time_info, paths):
Expand Down Expand Up @@ -488,7 +469,8 @@ def fetch_multi(self, nodes, start_time, end_time):
try:
query, memcache_key, path_measurements = self._gen_influxdb_stmt(
start_time, end_time, paths, interval)
except TypeError:
except TypeError as ex:
logger.error("Type error generating query statement - %s", ex)
return self._make_empty_multi_fetch_result(time_info, paths)
data = self.memcache.get(memcache_key) if self.memcache else None
if data:
Expand All @@ -506,7 +488,6 @@ def fetch_multi(self, nodes, start_time, end_time):
logger.debug("Calling influxdb multi fetch with query - %s", query)
data = self.client.query(query, params=_INFLUXDB_CLIENT_PARAMS)
logger.debug('fetch_multi() - Retrieved %d result set(s)', len(data))
# import ipdb; ipdb.set_trace()
data = read_influxdb_values(data, paths, path_measurements)
timer.stop()
# Graphite API requires that data contain keys for
Expand Down
42 changes: 19 additions & 23 deletions influxgraph/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
from .constants import INFLUXDB_AGGREGATIONS
try:
from .ext.classes.tree import NodeTreeIndex
from .ext.templates import get_series_with_tags
from .ext.templates import get_series_with_tags, heapsort, _make_path_from_template
except ImportError:
from .classes.tree import NodeTreeIndex
from .templates import get_series_with_tags
from .templates import get_series_with_tags, heapsort, _make_path_from_template

def calculate_interval(start_time, end_time, deltas=None):
"""Calculates wanted data series interval according to start and end times
Expand Down Expand Up @@ -162,39 +162,34 @@ def get_aggregation_func(path, aggregation_functions):
return aggregation_functions[pattern]
return 'mean'

def _find_metric_name(measurement_paths, tag_sets, field, fields):
for tag_set in tag_sets:
for path in measurement_paths:
if field in fields \
and field in path \
and len([t for t in tag_set if t in path]) == len(tag_set):
del measurement_paths[measurement_paths.index(path)]
return path

def _retrieve_named_field_data(infl_data, path_measurements, measurement, _data):
def _retrieve_named_field_data(infl_data, path_measurements, measurement, tags, _data):
measurement_paths = path_measurements[measurement]['paths'][:]
tag_sets = path_measurements[measurement]['tags'][:]
field_keys = next(infl_data.get_points(measurement)).keys()
field_keys = next(infl_data.get_points(measurement, tags)).keys()
point_fields = sorted([k for k in field_keys if k != 'time'])
for field in point_fields:
metric = _find_metric_name(
measurement_paths, tag_sets, field,
path_measurements[measurement]['fields'])
if not metric:
split_path = []
_make_path_from_template(
split_path, measurement,
path_measurements[measurement]['template'], tags.items())
split_path = [p[1] for p in heapsort(split_path)]
split_path.append(field)
metric = '.'.join(split_path)
if not metric in measurement_paths:
continue
del measurement_paths[measurement_paths.index(metric)]
_data[metric] = [d[field]
for d in infl_data.get_points(measurement)]
for d in infl_data.get_points(measurement, tags)]
path_measurements[measurement]['paths'] = measurement_paths

def _retrieve_field_data(infl_data, path_measurements, measurement,
metric, _data):
metric, tags, _data):
# Retrieve value field data
if 'value' in path_measurements[measurement]['fields']:
_data[metric] = [d['value']
for d in infl_data.get_points(measurement)]
for d in infl_data.get_points(measurement, tags)]
# Retrieve non value named field data with fields from path_measurements
_retrieve_named_field_data(infl_data, path_measurements,
measurement, _data)
measurement, tags, _data)

def read_influxdb_values(influxdb_data, paths, path_measurements):
"""Return key -> values dict for values from InfluxDB data"""
Expand All @@ -206,6 +201,7 @@ def read_influxdb_values(influxdb_data, paths, path_measurements):
for infl_data in influxdb_data:
for infl_keys in infl_data.keys():
measurement = infl_keys[0]
tags = infl_keys[1]
if not path_measurements:
if not measurement in paths:
continue
Expand All @@ -224,7 +220,7 @@ def read_influxdb_values(influxdb_data, paths, path_measurements):
if metric not in paths:
continue
_retrieve_field_data(infl_data, path_measurements,
measurement, metric, _data)
measurement, metric, tags, _data)
return _data

def gen_memcache_pattern_key(pattern):
Expand Down
5 changes: 3 additions & 2 deletions tests/test_influxdb_templates_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,6 @@ def test_template_multi_tags_multi_templ_multi_nodes_no_fields(self):
int(self.end_time.strftime("%s")))
self.assertTrue(data[cpu_metric_nodes[0].path][-1] == idle_data['value'],
msg="Got incorrect data from multi-tag query")
# import ipdb; ipdb.set_trace()

def test_template_multi_tags_multi_templ_multi_nodes(self):
self.client.drop_database(self.db_name)
Expand Down Expand Up @@ -767,7 +766,9 @@ def test_multi_tag_values_multi_measurements(self):
elif tags_env2_h2['env'] in metric and tags_env2_h2['host'] in metric:
fields = env2_h2_fields
field = [f for f in list(fields.keys()) if metric.endswith(f)][0]
self.assertTrue(data[metric][-1] == fields[field])
self.assertTrue(data[metric][-1] == fields[field],
msg="Incorrect data for metric %s. Should be %s, got %s" % (
metric, fields[field], data[metric][-1]))

def test_field_data_part_or_no_template_match(self):
del self.finder
Expand Down

0 comments on commit 8f994b0

Please sign in to comment.