Skip to content

Commit

Permalink
WIP for #24 - Made field key list use pagination and added tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Dan committed Feb 8, 2017
1 parent 5e8b6d0 commit b2c4b38
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 11 deletions.
4 changes: 3 additions & 1 deletion graphite-api.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -199,12 +199,14 @@ influxdb:
#
# Row limit of series loader queries
#
# Should _not_ exceed max-row-limit setting of InfluxDB
#
# Depending on InfluxDB data and machine memory sizes, this may need to be
# lowered to avoid InfluxDB returning partial results due to not enough
# available memory.
#
# Example query: SHOW SERIES LIMIT <loader_limit> <..>
# loader_limit: 100000
# loader_limit: 10000

#
## Fill function parameter to use on data queries
Expand Down
35 changes: 26 additions & 9 deletions influxgraph/classes/finder.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Copyright (C) [2015-] [Thomson Reuters LLC]
# Copyright (C) [2015-] [Panos Kittenis]
# Copyright (C) [2015-2017] [Thomson Reuters LLC]
# Copyright (C) [2015-2017] [Panos Kittenis]

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -614,20 +614,37 @@ def load_index(self):
self.index = index
logger.info("Loaded index from disk")

def get_field_keys(self):
"""Get field keys for all measurements"""
field_keys = self.memcache.get(_MEMCACHE_FIELDS_KEY) \
def get_all_field_keys(self, offset=0, field_keys=None):
"""Get all field keys from DB, starting at offset"""
field_keys = {} if field_keys is None else field_keys
cur_field_keys = self.get_field_keys(offset=offset)
if len(cur_field_keys.keys()) == 0:
return field_keys
for key in cur_field_keys:
field_keys.setdefault(key, deque()).extend(cur_field_keys[key])
return self.get_all_field_keys(
offset=self.loader_limit + offset,
field_keys=field_keys)

def get_field_keys(self, offset=0):
"""Get one field keys list at offset"""
memcache_key = gen_memcache_pattern_key("_".join([
_MEMCACHE_FIELDS_KEY, str(self.loader_limit), str(offset)]))
field_keys = self.memcache.get(memcache_key) \
if self.memcache else None
if field_keys:
logger.debug("Found cached field keys")
logger.debug("Found cached field keys list for limit %s, offset %s",
self.loader_limit, offset,)
return field_keys
logger.debug("Calling InfluxDB for field keys")
data = self.client.query('SHOW FIELD KEYS')
logger.debug("Calling InfluxDB for field keys with limit %s, offset %s",
self.loader_limit, offset,)
data = self.client.query('SHOW FIELD KEYS LIMIT %s OFFSET %s' % (
self.loader_limit, offset,))
field_keys = {}
for ((key, _), vals) in data.items():
field_keys[key] = [val['fieldKey'] for val in vals]
if self.memcache:
if not self.memcache.set(_MEMCACHE_FIELDS_KEY, field_keys,
if not self.memcache.set(memcache_key, field_keys,
time=self.memcache_ttl,
min_compress_len=1):
logger.error("Could not add field key list to memcache - "
Expand Down
52 changes: 51 additions & 1 deletion tests/test_influxdb_templates_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,18 @@
import datetime
import time
from random import randint
import logging
import influxdb.exceptions
import influxgraph
from influxgraph.utils import Query
from influxgraph.utils import Query, make_memcache_client, gen_memcache_pattern_key
from influxgraph.constants import SERIES_LOADER_MUTEX_KEY, \
MEMCACHE_SERIES_DEFAULT_TTL, LOADER_LIMIT, _MEMCACHE_FIELDS_KEY
from influxdb import InfluxDBClient
from influxgraph.templates import InvalidTemplateError
from influxgraph.classes.finder import logger as finder_logger

finder_logger.setLevel(logging.DEBUG)
logging.basicConfig()

os.environ['TZ'] = 'UTC'

Expand Down Expand Up @@ -1034,6 +1039,51 @@ def test_multi_tmpl_part_filter(self):
for metric in cpu_data:
self.assertTrue(cpu_data[metric][-1] == fields[metric.split('.')[-1]])

def test_field_list_pagination(self):
del self.finder
measurements = ['m1', 'm2', 'm3']
fields = {'f1': self.randval(),
'f2': self.randval(),
'f3': self.randval(),
'f4': self.randval()}
tags = {'t1': 't1_val'}
self.client.drop_database(self.db_name)
self.client.create_database(self.db_name)
self.write_data(measurements, tags, fields)
m4_measurement = 'm4'
m4_fields = {'f5': self.randval()}
self.write_data([m4_measurement], tags, m4_fields)
m5_measurement= 'm5'
m5_fields = {'f5': self.randval(),
'f6': self.randval(),
'f7': self.randval(),
'f8': self.randval(),
'f9': self.randval(),
'f10': self.randval(),
}
self.write_data([m5_measurement], tags, m5_fields)
self.config['influxdb']['loader_limit'] = 1
self.finder = influxgraph.InfluxDBFinder(self.config)
fields_page1 = self.finder.get_field_keys()
self.assertEqual(len(fields_page1.values()[0]), self.finder.loader_limit)
all_db_fields = self.finder.get_all_field_keys()
for measurement in measurements:
self.assertEqual(len(all_db_fields[measurement]), len(fields.keys()))
self.assertEqual(len(all_db_fields['m4']), len(m4_fields.keys()))
self.assertEqual(len(all_db_fields['m5']), len(m5_fields.keys()))
# Memcached test
self.config['influxdb']['memcache'] = {'host': 'localhost'}
memcache_client = make_memcache_client(
self.config['influxdb']['memcache']['host'])
for offset in range(len(m5_fields.keys())):
memcache_key = gen_memcache_pattern_key("_".join([
_MEMCACHE_FIELDS_KEY, str(
self.config['influxdb']['loader_limit']), str(offset)]))
memcache_client.delete(memcache_key)
self.finder = influxgraph.InfluxDBFinder(self.config)
self.finder.get_all_field_keys()
cached_db_fields = self.finder.get_all_field_keys()
self.assertEqual(all_db_fields, cached_db_fields)

if __name__ == '__main__':
unittest.main()

0 comments on commit b2c4b38

Please sign in to comment.