Skip to content

Commit

Permalink
Added index save time to log message. Linting changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Dan committed Mar 2, 2017
1 parent 1011d4d commit 4ebeee9
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 29 deletions.
51 changes: 32 additions & 19 deletions influxgraph/classes/finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import datetime
import logging
from logging.handlers import WatchedFileHandler
import itertools
from collections import deque

try:
Expand All @@ -41,7 +40,7 @@
DEFAULT_AGGREGATIONS, _MEMCACHE_FIELDS_KEY, FILL_PARAMS
from ..utils import NullStatsd, calculate_interval, read_influxdb_values, \
get_aggregation_func, gen_memcache_key, gen_memcache_pattern_key, \
Query, get_retention_policy, _compile_aggregation_patterns, parse_series, \
get_retention_policy, _compile_aggregation_patterns, parse_series, \
make_memcache_client
from ..templates import parse_influxdb_graphite_templates, apply_template, \
TemplateMatchError
Expand Down Expand Up @@ -96,13 +95,13 @@ def __init__(self, config):
influxdb_config.get('aggregation_functions', DEFAULT_AGGREGATIONS))
self.fill_param = influxdb_config.get('fill', 'previous')
if self.fill_param not in FILL_PARAMS and not (
type(self.fill_param) == int or type(self.fill_param) == float):
isinstance(self.fill_param, int) or isinstance(self.fill_param, float)):
raise Exception("Configured fill param %s is not a valid parameter "
"nor integer or float number", self.fill_param,)
series_loader_interval = influxdb_config.get('series_loader_interval', 900)
reindex_interval = influxdb_config.get('reindex_interval', 900)
self.loader_limit = influxdb_config.get('loader_limit', LOADER_LIMIT)
if type(self.loader_limit) != int:
if not isinstance(self.loader_limit, int):
raise Exception("Configured loader limit %s is not an integer",
self.loader_limit)
self.deltas = influxdb_config.get('deltas', None)
Expand All @@ -128,7 +127,8 @@ def _start_loader(self, series_loader_interval):
if not self.memcache:
return
# Run series loader in main thread if due to run to not allow
# requests to be served before series loader has completed at least once.
# requests to be served before series loader has completed at
# least once.
if _SERIES_LOADER_LOCK.acquire(block=False):
if self.memcache.get(SERIES_LOADER_MUTEX_KEY):
logger.debug("Series loader mutex exists %s - "
Expand Down Expand Up @@ -162,7 +162,8 @@ def _start_reindexer(self, reindex_interval):
if not self.index:
self.build_index()
new_index = True
logger.debug("Starting reindexer thread with interval %s", reindex_interval)
logger.debug("Starting reindexer thread with interval %s",
reindex_interval)
reindexer = threading.Thread(target=self._reindex,
kwargs={'interval': reindex_interval,
'new_index': new_index})
Expand All @@ -176,7 +177,8 @@ def _setup_logger(self, level, log_file):
if logger.handlers:
return
formatter = logging.Formatter(
'[%(levelname)s] %(asctime)s - %(module)s.%(funcName)s() - %(message)s')
'[%(levelname)s] %(asctime)s - %(module)s.%(funcName)s() '
'- %(message)s')
level = getattr(logging, level.upper())
logger.setLevel(level)
handler = logging.StreamHandler()
Expand Down Expand Up @@ -249,6 +251,7 @@ def _store_last_offset(self, query_pattern, limit, offset):
def get_all_series(self, cache=True,
offset=0, _data=None, **kwargs):
"""Retrieve all series"""
# pylint: disable=unused-argument
data = self.get_series(
cache=cache, offset=offset)
return self._pagination_runner(data, '*', self.get_all_series,
Expand All @@ -259,10 +262,12 @@ def get_all_series(self, cache=True,
def get_all_series_list(self, offset=0, _data=None,
*args, **kwargs):
"""Retrieve all series for series loader"""
# pylint: disable=unused-argument
query_pattern = '*'
data = self._get_series(offset=offset)
return self._pagination_runner(data, query_pattern, self.get_all_series_list,
limit=self.loader_limit, offset=offset)
return self._pagination_runner(
data, query_pattern, self.get_all_series_list,
limit=self.loader_limit, offset=offset)

def _pagination_runner(self, data, query_pattern, get_series_func,
limit=None, offset=None, _data=None,
Expand All @@ -284,14 +289,16 @@ def _series_loader(self, interval=900):
"""Loads influxdb series list into memcache at a rate of no
more than once per interval
"""
logger.info("Starting background series loader with interval %s", interval)
logger.info("Starting background series loader with interval %s",
interval)
while True:
time.sleep(interval)
if _SERIES_LOADER_LOCK.acquire(block=False):
_SERIES_LOADER_LOCK.release()
if self.memcache.get(SERIES_LOADER_MUTEX_KEY):
logger.debug("Series loader mutex exists %s - "
"skipping series load", SERIES_LOADER_MUTEX_KEY)
"skipping series load",
SERIES_LOADER_MUTEX_KEY)
time.sleep(interval)
continue
self.memcache.set(SERIES_LOADER_MUTEX_KEY, 1, time=interval)
Expand Down Expand Up @@ -332,10 +339,11 @@ def find_nodes(self, query):
yield BranchNode(path)

def _gen_aggregation_func(self, paths):
aggregation_funcs = list(set(get_aggregation_func(path, self.aggregation_functions)
for path in paths))
aggregation_funcs = list(set(get_aggregation_func(
path, self.aggregation_functions) for path in paths))
if len(aggregation_funcs) > 1:
logger.warning("Got multiple aggregation functions %s for paths %s - Using '%s'",
logger.warning("Got multiple aggregation functions %s for paths %s "
"- Using '%s'",
aggregation_funcs, paths, aggregation_funcs[0])
aggregation_func = aggregation_funcs[0]
return aggregation_func
Expand Down Expand Up @@ -422,9 +430,10 @@ def _gen_query_values_from_templates(self, paths, retention):
def _gen_query_values(self, paths, retention):
if self.graphite_templates:
return self._gen_query_values_from_templates(paths, retention)
measurement = ', '.join(('"%s"."%s"' % (retention, path,) for path in paths)) \
if retention \
else ', '.join(('"%s"' % (path,) for path in paths))
measurement = ', '.join(('"%s"."%s"' % (retention, path,)
for path in paths)) if retention \
else ', '.join(('"%s"' % (path,)
for path in paths))
return measurement, None, ['value'], None, None

def _gen_infl_stmt(self, measurements, tags, fields, groupings, start_time,
Expand Down Expand Up @@ -523,6 +532,7 @@ def _read_static_data(self, data_file):
return [d for k in data for d in k if d]

def _reindex(self, new_index=False, interval=900):
"""Perform re-index"""
save_thread = threading.Thread(target=self.save_index)
if new_index:
save_thread.start()
Expand Down Expand Up @@ -579,6 +589,7 @@ def save_index(self):
if not self.index_path:
return
logger.info("Saving index to file %s", self.index_path,)
start_time = datetime.datetime.now()
try:
index_fh = open(self.index_path, 'wt')
self._save_index_file(index_fh)
Expand All @@ -592,7 +603,8 @@ def save_index(self):
raise
else:
index_fh.close()
logger.info("Wrote index file to %s", self.index_path)
dt = datetime.datetime.now() - start_time
logger.info("Wrote index file to %s in %s", self.index_path, dt)

def load_index(self):
"""Load index from file"""
Expand All @@ -602,7 +614,8 @@ def load_index(self):
try:
index_fh = open(self.index_path, 'rt')
except Exception as ex:
logger.error("Error reading index file %s - %s", self.index_path, ex)
logger.error("Error reading index file %s - %s",
self.index_path, ex)
return
try:
index = NodeTreeIndex.from_file(index_fh)
Expand Down
2 changes: 1 addition & 1 deletion influxgraph/classes/leaf.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@
from __future__ import absolute_import
from graphite_api.node import LeafNode

class InfluxDBLeafNode(LeafNode):
class InfluxDBLeafNode(LeafNode): # pylint: disable=too-few-public-methods
"""Tell Graphite-Api that our leaf node supports multi-fetch"""
__fetch_multi__ = 'influxdb'
2 changes: 1 addition & 1 deletion influxgraph/classes/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

logger = logging.getLogger('influxgraph')

class Interval(object):
class Interval(object): # pylint: disable=too-few-public-methods
"""No-op Interval class used by Graphite-API for whisper backends"""
intervals = set()

Expand Down
16 changes: 8 additions & 8 deletions influxgraph/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,29 +96,29 @@ def get_retention_policy(interval, retention_policies):
# return policy for max interval
return retention_policies[max(sorted(retention_policies.keys()))]

class Query(object):
class Query(object): # pylint: disable=too-few-public-methods
"""Graphite-API compatible query class"""

def __init__(self, pattern):
self.pattern = pattern


class NullStatsd(object):
class NullStatsd(object): # pragma: no cover
"""Fake StatsClient compatible class to use when statsd is not configured"""

def __enter__(self):
return self
return self # pragma: no cover

def __exit__(self, _type, value, traceback):
pass
pass # pragma: no cover

def timer(self, key, val=None):
def timer(self, key, val=None): # pylint: disable=unused-argument
"""No-Op"""
return self
return self # pragma: no cover

def timing(self, key, val):
def timing(self, key, val): # pylint: disable=unused-argument
"""No-Op"""
pass
pass # pragma: no cover

def start(self):
"""No-Op"""
Expand Down

0 comments on commit 4ebeee9

Please sign in to comment.