From 8f994b0a77652824f4bde2e988330897c8c1ad1b Mon Sep 17 00:00:00 2001 From: Dan <22e889d8@opayq.com> Date: Wed, 18 Jan 2017 17:27:13 +0000 Subject: [PATCH] Made queries for multiple unique tag values across multiple measurements run in a single statement - resolves #15 --- influxgraph/classes/finder.py | 119 ++++++++----------- influxgraph/utils.py | 42 +++---- tests/test_influxdb_templates_integration.py | 5 +- 3 files changed, 72 insertions(+), 94 deletions(-) diff --git a/influxgraph/classes/finder.py b/influxgraph/classes/finder.py index c523de5..0908240 100644 --- a/influxgraph/classes/finder.py +++ b/influxgraph/classes/finder.py @@ -359,16 +359,16 @@ def _get_template_values_from_paths(self, paths, _filter, template, 'fields', []): path_measurements[measurement].setdefault( 'fields', []).append(field) - path_measurements[measurement]['tags'] = list( - itertools.product(*_tags.values())) + path_measurements[measurement].setdefault( + 'template', template) return _measurements, _tags, _fields, matched_paths def _get_all_template_values(self, paths): paths = paths[:] - query_data = deque() path_measurements = {} + measurements, tags, fields = deque(), deque(), set() for (_filter, template, default_tags, separator) in self.graphite_templates: - # One influx query statement per template + # One influx measurement queried per template if not paths: break _measurements, _tags, _fields, matched_paths = \ @@ -376,51 +376,36 @@ def _get_all_template_values(self, paths): paths, _filter, template, default_tags, separator, path_measurements) if _measurements: - # Found template match for path, add query data and + # Found template match for path, append query data and # remove matched paths so we do not try to match them again - query_data.append((_measurements, _tags, _fields)) + measurements.extend(_measurements) + tags.append(_tags) + fields = fields.union(_fields) for path in matched_paths: del paths[paths.index(path)] - return query_data, path_measurements + return measurements, tags, fields, path_measurements - def _gen_query(self, _measurements, _tags, _fields, retention): - # import ipdb; ipdb.set_trace() + def _gen_query(self, measurements, tags, fields, retention): + groupings = set([k for t in tags for k in t.keys()]) measurements = ', '.join( - ('"%s"."%s"' % (retention, measure,) for measure in _measurements)) \ + ('"%s"."%s"' % (retention, measure,) for measure in measurements)) \ if retention \ - else ', '.join(('"%s"' % (measure,) for measure in _measurements)) - groupings = deque() - tags = deque() - for tag in _tags: - if len(_tags[tag]) > 1: - groupings.append(tag) - continue - tags.append(""""%s" = '%s'""" % (tag, _tags[tag],)) - # tag_sets = [[""""%s" = '%s'""" % (tag, tag_val,) - # for tag_val in _tags[tag]] - # for tag in _tags] \ - # if _tags else None - #### - # tags = [' AND '.join(['(%s)' % ' OR '.join( - # [""""%s" = '%s'""" % (tag, tag_val,) for tag_val in _tags[tag]]) - # for tag in _tags])] if _tags else None - #### - # tag_pairs = itertools.product(*tag_sets) if tag_sets else None - # tags = [" AND ".join(t) for t in tag_pairs] if tag_pairs else None - fields = _fields if _fields else ['value'] - return measurements, tags, fields, groupings + else ', '.join(('"%s"' % (measure,) for measure in measurements)) + _tags = ' OR '.join(['(%s)' % (tag_set,) for tag_set in [ + ' AND '.join(['(%s)' % ' OR '.join([ + """"%s" = '%s'""" % (tag, tag_val,) + for tag_val in __tags[tag]]) + for tag in __tags]) + for __tags in tags]]) if tags else None + fields = fields if fields else ['value'] + return measurements, _tags, fields, groupings def _gen_query_values_from_templates(self, paths, retention): - _query_data, path_measurements = self._get_all_template_values(paths) - if len(_query_data) == 0: - return - query_data = deque() - for (_measurements, _tags, _fields) in _query_data: - measurements, tags, fields, groupings = self._gen_query( - _measurements, _tags, _fields, retention) - query_data.append((measurements, tags, fields, groupings),) - # import ipdb; ipdb.set_trace() - return query_data, path_measurements + measurements, tags, fields, path_measurements = \ + self._get_all_template_values(paths) + measurements, tags, fields, groupings = self._gen_query( + measurements, tags, fields, retention) + return measurements, tags, fields, groupings, path_measurements def _gen_query_values(self, paths, retention): if self.graphite_templates: @@ -428,25 +413,23 @@ def _gen_query_values(self, paths, retention): measurement = ', '.join(('"%s"."%s"' % (retention, path,) for path in paths)) \ if retention \ else ', '.join(('"%s"' % (path,) for path in paths)) - return ((measurement, None, ['value']),), None - - def _gen_unique_infl_queries(self, query_data, start_time, end_time, - aggregation_func, interval): - queries = deque() - for (measurement, tags, fields) in query_data: - query_fields = ', '.join(['%s("%s") as "%s"' % ( - aggregation_func, field, field) for field in fields]) - query = 'select %s from %s where (time > %ds and time <= %ds)' % ( - query_fields, measurement, start_time, end_time,) - group_by = 'GROUP BY time(%ss) fill(previous)' % (interval,) - if tags: - # Add sub queries - queries.append(';'.join( - ["%s AND %s %s" % (query, tag, group_by,) - for tag in tags])) - else: - queries.append(" ".join([query, group_by])) - return queries + return measurement, None, ['value'], None, None + + def _gen_infl_stmt(self, measurements, tags, fields, groupings, start_time, + end_time, aggregation_func, interval): + time_clause = "(time > %ds and time <= %ds)" % (start_time, end_time,) + query_fields = ', '.join(['%s("%s") as "%s"' % ( + aggregation_func, field, field) for field in fields]) + groupings = ['"%s"' % (grouping,) for grouping in groupings] \ + if groupings else [] + groupings.insert(0, 'time(%ss)' % (interval,)) + groupings = ', '.join(groupings) + where_clause = "%s AND %s" % (time_clause, tags,) if tags else \ + time_clause + group_by = '%s fill(previous)' % (groupings,) + query = 'select %s from %s where %s GROUP BY %s' % ( + query_fields, measurements, where_clause, group_by,) + return query def _gen_influxdb_stmt(self, start_time, end_time, paths, interval): retention = get_retention_policy(interval, self.retention_policies) \ @@ -454,13 +437,11 @@ def _gen_influxdb_stmt(self, start_time, end_time, paths, interval): aggregation_func = self._gen_aggregation_func(paths) memcache_key = gen_memcache_key(start_time, end_time, aggregation_func, paths) - try: - query_data, path_measurements = self._gen_query_values(paths, retention) - except TypeError: - return - queries = self._gen_unique_infl_queries( - query_data, start_time, end_time, aggregation_func, interval) - query = ';'.join(queries) + measurements, tags, fields, groupings, path_measurements = \ + self._gen_query_values(paths, retention) + query = self._gen_infl_stmt(measurements, tags, fields, groupings, + start_time, end_time, aggregation_func, + interval) return query, memcache_key, path_measurements def _make_empty_multi_fetch_result(self, time_info, paths): @@ -488,7 +469,8 @@ def fetch_multi(self, nodes, start_time, end_time): try: query, memcache_key, path_measurements = self._gen_influxdb_stmt( start_time, end_time, paths, interval) - except TypeError: + except TypeError as ex: + logger.error("Type error generating query statement - %s", ex) return self._make_empty_multi_fetch_result(time_info, paths) data = self.memcache.get(memcache_key) if self.memcache else None if data: @@ -506,7 +488,6 @@ def fetch_multi(self, nodes, start_time, end_time): logger.debug("Calling influxdb multi fetch with query - %s", query) data = self.client.query(query, params=_INFLUXDB_CLIENT_PARAMS) logger.debug('fetch_multi() - Retrieved %d result set(s)', len(data)) - # import ipdb; ipdb.set_trace() data = read_influxdb_values(data, paths, path_measurements) timer.stop() # Graphite API requires that data contain keys for diff --git a/influxgraph/utils.py b/influxgraph/utils.py index 23931de..b703fda 100644 --- a/influxgraph/utils.py +++ b/influxgraph/utils.py @@ -25,10 +25,10 @@ from .constants import INFLUXDB_AGGREGATIONS try: from .ext.classes.tree import NodeTreeIndex - from .ext.templates import get_series_with_tags + from .ext.templates import get_series_with_tags, heapsort, _make_path_from_template except ImportError: from .classes.tree import NodeTreeIndex - from .templates import get_series_with_tags + from .templates import get_series_with_tags, heapsort, _make_path_from_template def calculate_interval(start_time, end_time, deltas=None): """Calculates wanted data series interval according to start and end times @@ -162,39 +162,34 @@ def get_aggregation_func(path, aggregation_functions): return aggregation_functions[pattern] return 'mean' -def _find_metric_name(measurement_paths, tag_sets, field, fields): - for tag_set in tag_sets: - for path in measurement_paths: - if field in fields \ - and field in path \ - and len([t for t in tag_set if t in path]) == len(tag_set): - del measurement_paths[measurement_paths.index(path)] - return path - -def _retrieve_named_field_data(infl_data, path_measurements, measurement, _data): +def _retrieve_named_field_data(infl_data, path_measurements, measurement, tags, _data): measurement_paths = path_measurements[measurement]['paths'][:] - tag_sets = path_measurements[measurement]['tags'][:] - field_keys = next(infl_data.get_points(measurement)).keys() + field_keys = next(infl_data.get_points(measurement, tags)).keys() point_fields = sorted([k for k in field_keys if k != 'time']) for field in point_fields: - metric = _find_metric_name( - measurement_paths, tag_sets, field, - path_measurements[measurement]['fields']) - if not metric: + split_path = [] + _make_path_from_template( + split_path, measurement, + path_measurements[measurement]['template'], tags.items()) + split_path = [p[1] for p in heapsort(split_path)] + split_path.append(field) + metric = '.'.join(split_path) + if not metric in measurement_paths: continue + del measurement_paths[measurement_paths.index(metric)] _data[metric] = [d[field] - for d in infl_data.get_points(measurement)] + for d in infl_data.get_points(measurement, tags)] path_measurements[measurement]['paths'] = measurement_paths def _retrieve_field_data(infl_data, path_measurements, measurement, - metric, _data): + metric, tags, _data): # Retrieve value field data if 'value' in path_measurements[measurement]['fields']: _data[metric] = [d['value'] - for d in infl_data.get_points(measurement)] + for d in infl_data.get_points(measurement, tags)] # Retrieve non value named field data with fields from path_measurements _retrieve_named_field_data(infl_data, path_measurements, - measurement, _data) + measurement, tags, _data) def read_influxdb_values(influxdb_data, paths, path_measurements): """Return key -> values dict for values from InfluxDB data""" @@ -206,6 +201,7 @@ def read_influxdb_values(influxdb_data, paths, path_measurements): for infl_data in influxdb_data: for infl_keys in infl_data.keys(): measurement = infl_keys[0] + tags = infl_keys[1] if not path_measurements: if not measurement in paths: continue @@ -224,7 +220,7 @@ def read_influxdb_values(influxdb_data, paths, path_measurements): if metric not in paths: continue _retrieve_field_data(infl_data, path_measurements, - measurement, metric, _data) + measurement, metric, tags, _data) return _data def gen_memcache_pattern_key(pattern): diff --git a/tests/test_influxdb_templates_integration.py b/tests/test_influxdb_templates_integration.py index d0a2568..a5b2b15 100644 --- a/tests/test_influxdb_templates_integration.py +++ b/tests/test_influxdb_templates_integration.py @@ -373,7 +373,6 @@ def test_template_multi_tags_multi_templ_multi_nodes_no_fields(self): int(self.end_time.strftime("%s"))) self.assertTrue(data[cpu_metric_nodes[0].path][-1] == idle_data['value'], msg="Got incorrect data from multi-tag query") - # import ipdb; ipdb.set_trace() def test_template_multi_tags_multi_templ_multi_nodes(self): self.client.drop_database(self.db_name) @@ -767,7 +766,9 @@ def test_multi_tag_values_multi_measurements(self): elif tags_env2_h2['env'] in metric and tags_env2_h2['host'] in metric: fields = env2_h2_fields field = [f for f in list(fields.keys()) if metric.endswith(f)][0] - self.assertTrue(data[metric][-1] == fields[field]) + self.assertTrue(data[metric][-1] == fields[field], + msg="Incorrect data for metric %s. Should be %s, got %s" % ( + metric, fields[field], data[metric][-1])) def test_field_data_part_or_no_template_match(self): del self.finder