Skip to content

Commit

Permalink
Further optimisations on reading named field data - cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
Dan committed Jan 19, 2017
1 parent 8f994b0 commit 2cdcde8
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 40 deletions.
28 changes: 14 additions & 14 deletions influxgraph/classes/finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ def _gen_aggregation_func(self, paths):

def _get_template_values_from_paths(self, paths, _filter, template,
default_tags, separator,
path_measurements):
measurement_data):
_measurements = deque()
_tags = {}
_fields = deque()
Expand All @@ -353,19 +353,19 @@ def _get_template_values_from_paths(self, paths, _filter, template,
if not field in _fields:
_fields.append(field)
matched_paths.append(path)
path_measurements.setdefault(measurement, {}).setdefault(
measurement_data.setdefault(measurement, {}).setdefault(
'paths', []).append(path)
if not field in path_measurements[measurement].setdefault(
if not field in measurement_data[measurement].setdefault(
'fields', []):
path_measurements[measurement].setdefault(
measurement_data[measurement].setdefault(
'fields', []).append(field)
path_measurements[measurement].setdefault(
measurement_data[measurement].setdefault(
'template', template)
return _measurements, _tags, _fields, matched_paths

def _get_all_template_values(self, paths):
paths = paths[:]
path_measurements = {}
measurement_data = {}
measurements, tags, fields = deque(), deque(), set()
for (_filter, template, default_tags, separator) in self.graphite_templates:
# One influx measurement queried per template
Expand All @@ -374,7 +374,7 @@ def _get_all_template_values(self, paths):
_measurements, _tags, _fields, matched_paths = \
self._get_template_values_from_paths(
paths, _filter, template, default_tags, separator,
path_measurements)
measurement_data)
if _measurements:
# Found template match for path, append query data and
# remove matched paths so we do not try to match them again
Expand All @@ -383,7 +383,7 @@ def _get_all_template_values(self, paths):
fields = fields.union(_fields)
for path in matched_paths:
del paths[paths.index(path)]
return measurements, tags, fields, path_measurements
return measurements, tags, fields, measurement_data

def _gen_query(self, measurements, tags, fields, retention):
groupings = set([k for t in tags for k in t.keys()])
Expand All @@ -401,11 +401,11 @@ def _gen_query(self, measurements, tags, fields, retention):
return measurements, _tags, fields, groupings

def _gen_query_values_from_templates(self, paths, retention):
measurements, tags, fields, path_measurements = \
measurements, tags, fields, measurement_data = \
self._get_all_template_values(paths)
measurements, tags, fields, groupings = self._gen_query(
measurements, tags, fields, retention)
return measurements, tags, fields, groupings, path_measurements
return measurements, tags, fields, groupings, measurement_data

def _gen_query_values(self, paths, retention):
if self.graphite_templates:
Expand Down Expand Up @@ -437,12 +437,12 @@ def _gen_influxdb_stmt(self, start_time, end_time, paths, interval):
aggregation_func = self._gen_aggregation_func(paths)
memcache_key = gen_memcache_key(start_time, end_time, aggregation_func,
paths)
measurements, tags, fields, groupings, path_measurements = \
measurements, tags, fields, groupings, measurement_data = \
self._gen_query_values(paths, retention)
query = self._gen_infl_stmt(measurements, tags, fields, groupings,
start_time, end_time, aggregation_func,
interval)
return query, memcache_key, path_measurements
return query, memcache_key, measurement_data

def _make_empty_multi_fetch_result(self, time_info, paths):
data = {}
Expand All @@ -467,7 +467,7 @@ def fetch_multi(self, nodes, start_time, end_time):
return self._make_empty_multi_fetch_result(
time_info, [n.path for n in nodes])
try:
query, memcache_key, path_measurements = self._gen_influxdb_stmt(
query, memcache_key, measurement_data = self._gen_influxdb_stmt(
start_time, end_time, paths, interval)
except TypeError as ex:
logger.error("Type error generating query statement - %s", ex)
Expand All @@ -488,7 +488,7 @@ def fetch_multi(self, nodes, start_time, end_time):
logger.debug("Calling influxdb multi fetch with query - %s", query)
data = self.client.query(query, params=_INFLUXDB_CLIENT_PARAMS)
logger.debug('fetch_multi() - Retrieved %d result set(s)', len(data))
data = read_influxdb_values(data, paths, path_measurements)
data = read_influxdb_values(data, paths, measurement_data)
timer.stop()
# Graphite API requires that data contain keys for
# all requested paths even if they have no datapoints
Expand Down
56 changes: 30 additions & 26 deletions influxgraph/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,36 +162,44 @@ def get_aggregation_func(path, aggregation_functions):
return aggregation_functions[pattern]
return 'mean'

def _retrieve_named_field_data(infl_data, path_measurements, measurement, tags, _data):
measurement_paths = path_measurements[measurement]['paths'][:]
field_keys = next(infl_data.get_points(measurement, tags)).keys()
point_fields = sorted([k for k in field_keys if k != 'time'])
for field in point_fields:
def _retrieve_named_field_data(infl_data, measurement_data, measurement, tags, _data):
measurement_paths = measurement_data[measurement]['paths'][:]
for field in measurement_data[measurement]['fields']:
split_path = []
_make_path_from_template(
split_path, measurement,
path_measurements[measurement]['template'], tags.items())
measurement_data[measurement]['template'], tags.items())
split_path = [p[1] for p in heapsort(split_path)]
split_path.append(field)
metric = '.'.join(split_path)
if not metric in measurement_paths:
continue
del measurement_paths[measurement_paths.index(metric)]
_data[metric] = [d[field]
for d in infl_data.get_points(measurement, tags)]
path_measurements[measurement]['paths'] = measurement_paths
for d in infl_data.get_points(measurement=measurement,
tags=tags)]
measurement_data[measurement]['paths'] = measurement_paths

def _retrieve_field_data(infl_data, path_measurements, measurement,
def _retrieve_field_data(infl_data, measurement_data, measurement,
metric, tags, _data):
# Retrieve value field data
if 'value' in path_measurements[measurement]['fields']:
if 'value' in measurement_data[measurement]['fields']:
_data[metric] = [d['value']
for d in infl_data.get_points(measurement, tags)]
# Retrieve non value named field data with fields from path_measurements
_retrieve_named_field_data(infl_data, path_measurements,
measurement, tags, _data)
for d in infl_data.get_points(measurement=measurement,
tags=tags)]
return
# Retrieve non value named field data with fields from measurement_data
_retrieve_named_field_data(infl_data, measurement_data,
measurement, tags, _data)

def read_measurement_metric_values(infl_data, measurement, paths, _data):
if not measurement in paths:
return
_data[measurement] = [d['value']
for d in infl_data.get_points(
measurement=measurement)]

def read_influxdb_values(influxdb_data, paths, path_measurements):
def read_influxdb_values(influxdb_data, paths, measurement_data):
"""Return key -> values dict for values from InfluxDB data"""
_data = {}
if not isinstance(influxdb_data, list):
Expand All @@ -202,24 +210,20 @@ def read_influxdb_values(influxdb_data, paths, path_measurements):
for infl_keys in infl_data.keys():
measurement = infl_keys[0]
tags = infl_keys[1]
if not path_measurements:
if not measurement in paths:
continue
_data[measurement] = [d['value']
for d in infl_data.get_points(measurement)]
if not measurement_data:
read_measurement_metric_values(infl_data, measurement,
paths, _data)
continue
elif not measurement in path_measurements:
elif not measurement in measurement_data:
continue
if measurement not in seen_measurements:
seen_measurements = set(tuple(seen_measurements) + (measurement,))
m_path_ind = 0
elif m_path_ind >= len(path_measurements[measurement]['paths']):
elif m_path_ind >= len(measurement_data[measurement]['paths']):
m_path_ind = 0
metric = path_measurements[measurement]['paths'][m_path_ind]
metric = measurement_data[measurement]['paths'][m_path_ind]
m_path_ind += 1
if metric not in paths:
continue
_retrieve_field_data(infl_data, path_measurements,
_retrieve_field_data(infl_data, measurement_data,
measurement, metric, tags, _data)
return _data

Expand Down

0 comments on commit 2cdcde8

Please sign in to comment.