diff --git a/graphite-api.yaml.example b/graphite-api.yaml.example index d8a355a..2086664 100644 --- a/graphite-api.yaml.example +++ b/graphite-api.yaml.example @@ -199,12 +199,14 @@ influxdb: # # Row limit of series loader queries # + # Should _not_ exceed max-row-limit setting of InfluxDB + # # Depending on InfluxDB data and machine memory sizes, this may need to be # lowered to avoid InfluxDB returning partial results due to not enough # available memory. # # Example query: SHOW SERIES LIMIT <..> - # loader_limit: 100000 + # loader_limit: 10000 # ## Fill function parameter to use on data queries diff --git a/influxgraph/classes/finder.py b/influxgraph/classes/finder.py index ccae928..7cdf958 100644 --- a/influxgraph/classes/finder.py +++ b/influxgraph/classes/finder.py @@ -1,5 +1,5 @@ -# Copyright (C) [2015-] [Thomson Reuters LLC] -# Copyright (C) [2015-] [Panos Kittenis] +# Copyright (C) [2015-2017] [Thomson Reuters LLC] +# Copyright (C) [2015-2017] [Panos Kittenis] # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -614,20 +614,37 @@ def load_index(self): self.index = index logger.info("Loaded index from disk") - def get_field_keys(self): - """Get field keys for all measurements""" - field_keys = self.memcache.get(_MEMCACHE_FIELDS_KEY) \ + def get_all_field_keys(self, offset=0, field_keys=None): + """Get all field keys from DB, starting at offset""" + field_keys = {} if field_keys is None else field_keys + cur_field_keys = self.get_field_keys(offset=offset) + if len(cur_field_keys.keys()) == 0: + return field_keys + for key in cur_field_keys: + field_keys.setdefault(key, deque()).extend(cur_field_keys[key]) + return self.get_all_field_keys( + offset=self.loader_limit + offset, + field_keys=field_keys) + + def get_field_keys(self, offset=0): + """Get one field keys list at offset""" + memcache_key = gen_memcache_pattern_key("_".join([ + _MEMCACHE_FIELDS_KEY, str(self.loader_limit), str(offset)])) + field_keys = self.memcache.get(memcache_key) \ if self.memcache else None if field_keys: - logger.debug("Found cached field keys") + logger.debug("Found cached field keys list for limit %s, offset %s", + self.loader_limit, offset,) return field_keys - logger.debug("Calling InfluxDB for field keys") - data = self.client.query('SHOW FIELD KEYS') + logger.debug("Calling InfluxDB for field keys with limit %s, offset %s", + self.loader_limit, offset,) + data = self.client.query('SHOW FIELD KEYS LIMIT %s OFFSET %s' % ( + self.loader_limit, offset,)) field_keys = {} for ((key, _), vals) in data.items(): field_keys[key] = [val['fieldKey'] for val in vals] if self.memcache: - if not self.memcache.set(_MEMCACHE_FIELDS_KEY, field_keys, + if not self.memcache.set(memcache_key, field_keys, time=self.memcache_ttl, min_compress_len=1): logger.error("Could not add field key list to memcache - " diff --git a/tests/test_influxdb_templates_integration.py b/tests/test_influxdb_templates_integration.py index a5b2b15..211d660 100644 --- a/tests/test_influxdb_templates_integration.py +++ b/tests/test_influxdb_templates_integration.py @@ -6,13 +6,18 @@ import datetime import time from random import randint +import logging import influxdb.exceptions import influxgraph -from influxgraph.utils import Query +from influxgraph.utils import Query, make_memcache_client, gen_memcache_pattern_key from influxgraph.constants import SERIES_LOADER_MUTEX_KEY, \ MEMCACHE_SERIES_DEFAULT_TTL, LOADER_LIMIT, _MEMCACHE_FIELDS_KEY from influxdb import InfluxDBClient from influxgraph.templates import InvalidTemplateError +from influxgraph.classes.finder import logger as finder_logger + +finder_logger.setLevel(logging.DEBUG) +logging.basicConfig() os.environ['TZ'] = 'UTC' @@ -1034,6 +1039,51 @@ def test_multi_tmpl_part_filter(self): for metric in cpu_data: self.assertTrue(cpu_data[metric][-1] == fields[metric.split('.')[-1]]) + def test_field_list_pagination(self): + del self.finder + measurements = ['m1', 'm2', 'm3'] + fields = {'f1': self.randval(), + 'f2': self.randval(), + 'f3': self.randval(), + 'f4': self.randval()} + tags = {'t1': 't1_val'} + self.client.drop_database(self.db_name) + self.client.create_database(self.db_name) + self.write_data(measurements, tags, fields) + m4_measurement = 'm4' + m4_fields = {'f5': self.randval()} + self.write_data([m4_measurement], tags, m4_fields) + m5_measurement= 'm5' + m5_fields = {'f5': self.randval(), + 'f6': self.randval(), + 'f7': self.randval(), + 'f8': self.randval(), + 'f9': self.randval(), + 'f10': self.randval(), + } + self.write_data([m5_measurement], tags, m5_fields) + self.config['influxdb']['loader_limit'] = 1 + self.finder = influxgraph.InfluxDBFinder(self.config) + fields_page1 = self.finder.get_field_keys() + self.assertEqual(len(fields_page1.values()[0]), self.finder.loader_limit) + all_db_fields = self.finder.get_all_field_keys() + for measurement in measurements: + self.assertEqual(len(all_db_fields[measurement]), len(fields.keys())) + self.assertEqual(len(all_db_fields['m4']), len(m4_fields.keys())) + self.assertEqual(len(all_db_fields['m5']), len(m5_fields.keys())) + # Memcached test + self.config['influxdb']['memcache'] = {'host': 'localhost'} + memcache_client = make_memcache_client( + self.config['influxdb']['memcache']['host']) + for offset in range(len(m5_fields.keys())): + memcache_key = gen_memcache_pattern_key("_".join([ + _MEMCACHE_FIELDS_KEY, str( + self.config['influxdb']['loader_limit']), str(offset)])) + memcache_client.delete(memcache_key) + self.finder = influxgraph.InfluxDBFinder(self.config) + self.finder.get_all_field_keys() + cached_db_fields = self.finder.get_all_field_keys() + self.assertEqual(all_db_fields, cached_db_fields) if __name__ == '__main__': unittest.main()