diff --git a/influxgraph/classes/finder.py b/influxgraph/classes/finder.py index ccae928..6f5706d 100644 --- a/influxgraph/classes/finder.py +++ b/influxgraph/classes/finder.py @@ -27,7 +27,6 @@ import datetime import logging from logging.handlers import WatchedFileHandler -import itertools from collections import deque try: @@ -41,7 +40,7 @@ DEFAULT_AGGREGATIONS, _MEMCACHE_FIELDS_KEY, FILL_PARAMS from ..utils import NullStatsd, calculate_interval, read_influxdb_values, \ get_aggregation_func, gen_memcache_key, gen_memcache_pattern_key, \ - Query, get_retention_policy, _compile_aggregation_patterns, parse_series, \ + get_retention_policy, _compile_aggregation_patterns, parse_series, \ make_memcache_client from ..templates import parse_influxdb_graphite_templates, apply_template, \ TemplateMatchError @@ -96,13 +95,13 @@ def __init__(self, config): influxdb_config.get('aggregation_functions', DEFAULT_AGGREGATIONS)) self.fill_param = influxdb_config.get('fill', 'previous') if self.fill_param not in FILL_PARAMS and not ( - type(self.fill_param) == int or type(self.fill_param) == float): + isinstance(self.fill_param, int) or isinstance(self.fill_param, float)): raise Exception("Configured fill param %s is not a valid parameter " "nor integer or float number", self.fill_param,) series_loader_interval = influxdb_config.get('series_loader_interval', 900) reindex_interval = influxdb_config.get('reindex_interval', 900) self.loader_limit = influxdb_config.get('loader_limit', LOADER_LIMIT) - if type(self.loader_limit) != int: + if not isinstance(self.loader_limit, int): raise Exception("Configured loader limit %s is not an integer", self.loader_limit) self.deltas = influxdb_config.get('deltas', None) @@ -128,7 +127,8 @@ def _start_loader(self, series_loader_interval): if not self.memcache: return # Run series loader in main thread if due to run to not allow - # requests to be served before series loader has completed at least once. + # requests to be served before series loader has completed at + # least once. if _SERIES_LOADER_LOCK.acquire(block=False): if self.memcache.get(SERIES_LOADER_MUTEX_KEY): logger.debug("Series loader mutex exists %s - " @@ -162,7 +162,8 @@ def _start_reindexer(self, reindex_interval): if not self.index: self.build_index() new_index = True - logger.debug("Starting reindexer thread with interval %s", reindex_interval) + logger.debug("Starting reindexer thread with interval %s", + reindex_interval) reindexer = threading.Thread(target=self._reindex, kwargs={'interval': reindex_interval, 'new_index': new_index}) @@ -176,7 +177,8 @@ def _setup_logger(self, level, log_file): if logger.handlers: return formatter = logging.Formatter( - '[%(levelname)s] %(asctime)s - %(module)s.%(funcName)s() - %(message)s') + '[%(levelname)s] %(asctime)s - %(module)s.%(funcName)s() ' + '- %(message)s') level = getattr(logging, level.upper()) logger.setLevel(level) handler = logging.StreamHandler() @@ -249,6 +251,7 @@ def _store_last_offset(self, query_pattern, limit, offset): def get_all_series(self, cache=True, offset=0, _data=None, **kwargs): """Retrieve all series""" + # pylint: disable=unused-argument data = self.get_series( cache=cache, offset=offset) return self._pagination_runner(data, '*', self.get_all_series, @@ -259,10 +262,12 @@ def get_all_series(self, cache=True, def get_all_series_list(self, offset=0, _data=None, *args, **kwargs): """Retrieve all series for series loader""" + # pylint: disable=unused-argument query_pattern = '*' data = self._get_series(offset=offset) - return self._pagination_runner(data, query_pattern, self.get_all_series_list, - limit=self.loader_limit, offset=offset) + return self._pagination_runner( + data, query_pattern, self.get_all_series_list, + limit=self.loader_limit, offset=offset) def _pagination_runner(self, data, query_pattern, get_series_func, limit=None, offset=None, _data=None, @@ -284,14 +289,16 @@ def _series_loader(self, interval=900): """Loads influxdb series list into memcache at a rate of no more than once per interval """ - logger.info("Starting background series loader with interval %s", interval) + logger.info("Starting background series loader with interval %s", + interval) while True: time.sleep(interval) if _SERIES_LOADER_LOCK.acquire(block=False): _SERIES_LOADER_LOCK.release() if self.memcache.get(SERIES_LOADER_MUTEX_KEY): logger.debug("Series loader mutex exists %s - " - "skipping series load", SERIES_LOADER_MUTEX_KEY) + "skipping series load", + SERIES_LOADER_MUTEX_KEY) time.sleep(interval) continue self.memcache.set(SERIES_LOADER_MUTEX_KEY, 1, time=interval) @@ -332,10 +339,11 @@ def find_nodes(self, query): yield BranchNode(path) def _gen_aggregation_func(self, paths): - aggregation_funcs = list(set(get_aggregation_func(path, self.aggregation_functions) - for path in paths)) + aggregation_funcs = list(set(get_aggregation_func( + path, self.aggregation_functions) for path in paths)) if len(aggregation_funcs) > 1: - logger.warning("Got multiple aggregation functions %s for paths %s - Using '%s'", + logger.warning("Got multiple aggregation functions %s for paths %s " + "- Using '%s'", aggregation_funcs, paths, aggregation_funcs[0]) aggregation_func = aggregation_funcs[0] return aggregation_func @@ -422,9 +430,10 @@ def _gen_query_values_from_templates(self, paths, retention): def _gen_query_values(self, paths, retention): if self.graphite_templates: return self._gen_query_values_from_templates(paths, retention) - measurement = ', '.join(('"%s"."%s"' % (retention, path,) for path in paths)) \ - if retention \ - else ', '.join(('"%s"' % (path,) for path in paths)) + measurement = ', '.join(('"%s"."%s"' % (retention, path,) + for path in paths)) if retention \ + else ', '.join(('"%s"' % (path,) + for path in paths)) return measurement, None, ['value'], None, None def _gen_infl_stmt(self, measurements, tags, fields, groupings, start_time, @@ -523,6 +532,7 @@ def _read_static_data(self, data_file): return [d for k in data for d in k if d] def _reindex(self, new_index=False, interval=900): + """Perform re-index""" save_thread = threading.Thread(target=self.save_index) if new_index: save_thread.start() @@ -579,6 +589,7 @@ def save_index(self): if not self.index_path: return logger.info("Saving index to file %s", self.index_path,) + start_time = datetime.datetime.now() try: index_fh = open(self.index_path, 'wt') self._save_index_file(index_fh) @@ -592,7 +603,8 @@ def save_index(self): raise else: index_fh.close() - logger.info("Wrote index file to %s", self.index_path) + dt = datetime.datetime.now() - start_time + logger.info("Wrote index file to %s in %s", self.index_path, dt) def load_index(self): """Load index from file""" @@ -602,7 +614,8 @@ def load_index(self): try: index_fh = open(self.index_path, 'rt') except Exception as ex: - logger.error("Error reading index file %s - %s", self.index_path, ex) + logger.error("Error reading index file %s - %s", + self.index_path, ex) return try: index = NodeTreeIndex.from_file(index_fh) diff --git a/influxgraph/classes/leaf.py b/influxgraph/classes/leaf.py index ee6bede..80af52c 100644 --- a/influxgraph/classes/leaf.py +++ b/influxgraph/classes/leaf.py @@ -18,6 +18,6 @@ from __future__ import absolute_import from graphite_api.node import LeafNode -class InfluxDBLeafNode(LeafNode): +class InfluxDBLeafNode(LeafNode): # pylint: disable=too-few-public-methods """Tell Graphite-Api that our leaf node supports multi-fetch""" __fetch_multi__ = 'influxdb' diff --git a/influxgraph/classes/reader.py b/influxgraph/classes/reader.py index 038ffdd..1f3be4a 100644 --- a/influxgraph/classes/reader.py +++ b/influxgraph/classes/reader.py @@ -25,7 +25,7 @@ logger = logging.getLogger('influxgraph') -class Interval(object): +class Interval(object): # pylint: disable=too-few-public-methods """No-op Interval class used by Graphite-API for whisper backends""" intervals = set() diff --git a/influxgraph/utils.py b/influxgraph/utils.py index ad6e497..0e2738d 100644 --- a/influxgraph/utils.py +++ b/influxgraph/utils.py @@ -96,29 +96,29 @@ def get_retention_policy(interval, retention_policies): # return policy for max interval return retention_policies[max(sorted(retention_policies.keys()))] -class Query(object): +class Query(object): # pylint: disable=too-few-public-methods """Graphite-API compatible query class""" def __init__(self, pattern): self.pattern = pattern -class NullStatsd(object): +class NullStatsd(object): # pragma: no cover """Fake StatsClient compatible class to use when statsd is not configured""" def __enter__(self): - return self + return self # pragma: no cover def __exit__(self, _type, value, traceback): - pass + pass # pragma: no cover - def timer(self, key, val=None): + def timer(self, key, val=None): # pylint: disable=unused-argument """No-Op""" - return self + return self # pragma: no cover - def timing(self, key, val): + def timing(self, key, val): # pylint: disable=unused-argument """No-Op""" - pass + pass # pragma: no cover def start(self): """No-Op"""