-
Notifications
You must be signed in to change notification settings - Fork 801
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for native histograms in OM parser #1040
Changes from 7 commits
977b0b2
fd1b563
e32d2a8
cb013d8
4b1f527
eb6d9de
86f165a
c69a500
c06db3f
d394c71
90cd08e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,13 @@ def _sanitize(s): | |
return _INVALID_GRAPHITE_CHARS.sub('_', s) | ||
|
||
|
||
def safe_float_convert(value): | ||
try: | ||
return float(value) | ||
except ValueError: | ||
return None | ||
|
||
|
||
class _RegularPush(threading.Thread): | ||
def __init__(self, pusher, interval, prefix): | ||
super().__init__() | ||
|
@@ -82,7 +89,9 @@ def push(self, prefix: str = '') -> None: | |
for k, v in sorted(s.labels.items())]) | ||
else: | ||
labelstr = '' | ||
output.append(f'{prefixstr}{_sanitize(s.name)}{labelstr} {float(s.value)} {now}\n') | ||
# using a safe float convert on s.value as a temporary workaround while figuring out what to do | ||
# in case value is a native histogram structured value, if that's ever a possibility | ||
output.append(f'{prefixstr}{_sanitize(s.name)}{labelstr} {safe_float_convert(s.value)} {now}\n') | ||
|
||
conn = socket.create_connection(self._address, self._timeout) | ||
conn.sendall(''.join(output).encode('ascii')) | ||
|
@@ -92,3 +101,4 @@ def start(self, interval: float = 60.0, prefix: str = '') -> None: | |
t = _RegularPush(self, interval, prefix) | ||
t.daemon = True | ||
t.start() | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit, could you just remove this line so we don't have a needless diff/history entry? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done! |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -595,6 +595,14 @@ def __init__(self, | |
registry: Optional[CollectorRegistry] = REGISTRY, | ||
_labelvalues: Optional[Sequence[str]] = None, | ||
buckets: Sequence[Union[float, str]] = DEFAULT_BUCKETS, | ||
# native_hist_schema: Optional[int] = None, # create this dynamically? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe the internal code should create the schema, and may even change the schema value on occassion. |
||
# native_hist_bucket_fact: Optional[float] = None, | ||
# native_hist_zero_threshold: Optional[float] = None, | ||
# native_hist_max_bucket_num: Optional[int] = None, | ||
# native_hist_min_reset_dur: Optional[timedelta] = None, | ||
# native_hist_max_zero_threshold: Optional[float] = None, | ||
# native_hist_max_exemplars: Optional[int] = None, | ||
# native_hist_exemplar_TTL: Optional[timedelta] = None, | ||
): | ||
self._prepare_buckets(buckets) | ||
super().__init__( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,7 @@ | ||
import re | ||
from typing import Dict, List, Optional, Sequence, Tuple, Union | ||
|
||
from .samples import Exemplar, Sample, Timestamp | ||
from .samples import Exemplar, NativeHistStructValue, Sample, Timestamp | ||
|
||
METRIC_TYPES = ( | ||
'counter', 'gauge', 'summary', 'histogram', | ||
|
@@ -36,11 +36,14 @@ def __init__(self, name: str, documentation: str, typ: str, unit: str = ''): | |
self.type: str = typ | ||
self.samples: List[Sample] = [] | ||
|
||
def add_sample(self, name: str, labels: Dict[str, str], value: float, timestamp: Optional[Union[Timestamp, float]] = None, exemplar: Optional[Exemplar] = None) -> None: | ||
def add_sample(self, name: str, labels: Dict[str, str], value: Union[float, NativeHistStructValue], timestamp: Optional[Union[Timestamp, float]] = None, exemplar: Optional[Exemplar] = None) -> None: | ||
"""Add a sample to the metric. | ||
|
||
Internal-only, do not use.""" | ||
self.samples.append(Sample(name, labels, value, timestamp, exemplar)) | ||
if not isinstance(value, NativeHistStructValue): | ||
self.samples.append(Sample(name, labels, value, timestamp, exemplar)) | ||
else: | ||
self.samples.append(Sample(name, labels, value)) | ||
|
||
def __eq__(self, other: object) -> bool: | ||
return (isinstance(other, Metric) | ||
|
@@ -236,6 +239,7 @@ def __init__(self, | |
sum_value: Optional[float] = None, | ||
labels: Optional[Sequence[str]] = None, | ||
unit: str = '', | ||
native_hist_bucket_factor: Optional[float] = None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤔 Not sure if we need this here at all (it will need to be in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I deleted it. |
||
): | ||
Metric.__init__(self, name, documentation, 'histogram', unit) | ||
if sum_value is not None and buckets is None: | ||
|
@@ -284,7 +288,6 @@ def add_metric(self, | |
Sample(self.name + '_sum', dict(zip(self._labelnames, labels)), sum_value, timestamp)) | ||
|
||
|
||
|
||
class GaugeHistogramMetricFamily(Metric): | ||
"""A single gauge histogram and its samples. | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,7 +6,9 @@ | |
import re | ||
|
||
from ..metrics_core import Metric, METRIC_LABEL_NAME_RE | ||
from ..samples import Exemplar, Sample, Timestamp | ||
from ..samples import ( | ||
BucketSpan, Exemplar, NativeHistStructValue, Sample, Timestamp, | ||
) | ||
from ..utils import floatToGoString | ||
|
||
|
||
|
@@ -364,6 +366,100 @@ def _parse_remaining_text(text): | |
return val, ts, exemplar | ||
|
||
|
||
def _parse_nh_sample(text, suffixes): | ||
labels_start = text.find("{") | ||
# check if it's a native histogram with labels | ||
re_nh_without_labels = re.compile(r'^[^{} ]+ {[^{}]+}$') | ||
re_nh_with_labels = re.compile(r'[^{} ]+{[^{}]+} {[^{}]+}$') | ||
csmarchbanks marked this conversation as resolved.
Show resolved
Hide resolved
|
||
print('we are matching \'{}\''.format(text)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should remove the debug printing before merging, there are a couple other lines in this function as well. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I could see just another one, I hope I removed them all now XD |
||
if re_nh_with_labels.match(text): | ||
nh_value_start = text.rindex("{") | ||
labels_end = nh_value_start - 2 | ||
labelstext = text[labels_start + 1:labels_end] | ||
labels = _parse_labels(labelstext) | ||
name_end = labels_start | ||
name = text[:name_end] | ||
if name.endswith(suffixes): | ||
raise ValueError("the sample name of a native histogram with labels should have no suffixes", name) | ||
nh_value = text[nh_value_start:] | ||
value = _parse_nh_struct(nh_value) | ||
return Sample(name, labels, value) | ||
# check if it's a native histogram | ||
if re_nh_without_labels.match(text): | ||
nh_value_start = labels_start | ||
nh_value = text[nh_value_start:] | ||
name_end = nh_value_start - 1 | ||
name = text[:name_end] | ||
if name.endswith(suffixes): | ||
raise ValueError("the sample name of a native histogram should have no suffixes", name) | ||
value = _parse_nh_struct(nh_value) | ||
return Sample(name, None, value) | ||
else: | ||
# it's not a native histogram | ||
return | ||
|
||
|
||
def _parse_nh_struct(text): | ||
pattern = r'(\w+):\s*([^,}]+)' | ||
|
||
re_spans = re.compile(r'(positive_spans|negative_spans):\[(\d+:\d+,\d+:\d+)\]') | ||
re_deltas = re.compile(r'(positive_deltas|negative_deltas):\[(-?\d+(?:,-?\d+)*)\]') | ||
|
||
items = dict(re.findall(pattern, text)) | ||
spans = dict(re_spans.findall(text)) | ||
deltas = dict(re_deltas.findall(text)) | ||
|
||
count_value = int(items['count']) | ||
sum_value = int(items['sum']) | ||
schema = int(items['schema']) | ||
zero_threshold = float(items['zero_threshold']) | ||
zero_count = int(items['zero_count']) | ||
|
||
try: | ||
pos_spans_text = spans['positive_spans'] | ||
except KeyError: | ||
pos_spans = None | ||
else: | ||
elems = pos_spans_text.split(',') | ||
arg1 = [int(x) for x in elems[0].split(':')] | ||
arg2 = [int(x) for x in elems[1].split(':')] | ||
pos_spans = (BucketSpan(arg1[0], arg1[1]), BucketSpan(arg2[0], arg2[1])) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would avoid the else block and move these into the try block. That keeps all the happy path/text parsing code together. |
||
try: | ||
neg_spans_text = spans['negative_spans'] | ||
except KeyError: | ||
neg_spans = None | ||
else: | ||
elems = neg_spans_text.split(',') | ||
arg1 = [int(x) for x in elems[0].split(':')] | ||
arg2 = [int(x) for x in elems[1].split(':')] | ||
neg_spans = (BucketSpan(arg1[0], arg1[1]), BucketSpan(arg2[0], arg2[1])) | ||
try: | ||
pos_deltas_text = deltas['positive_deltas'] | ||
except KeyError: | ||
pos_deltas = None | ||
else: | ||
elems = pos_deltas_text.split(',') | ||
pos_deltas = tuple([int(x) for x in elems]) | ||
try: | ||
neg_deltas_text = deltas['negative_deltas'] | ||
except KeyError: | ||
neg_deltas = None | ||
else: | ||
elems = neg_deltas_text.split(',') | ||
neg_deltas = tuple([int(x) for x in elems]) | ||
return NativeHistStructValue( | ||
count_value=count_value, | ||
sum_value=sum_value, | ||
schema=schema, | ||
zero_threshold=zero_threshold, | ||
zero_count=zero_count, | ||
pos_spans=pos_spans, | ||
neg_spans=neg_spans, | ||
pos_deltas=pos_deltas, | ||
neg_deltas=neg_deltas | ||
) | ||
|
||
|
||
def _group_for_sample(sample, name, typ): | ||
if typ == 'info': | ||
# We can't distinguish between groups for info metrics. | ||
|
@@ -406,37 +502,38 @@ def do_checks(): | |
for s in samples: | ||
suffix = s.name[len(name):] | ||
g = _group_for_sample(s, name, 'histogram') | ||
if g != group or s.timestamp != timestamp: | ||
if group is not None: | ||
do_checks() | ||
count = None | ||
bucket = None | ||
has_negative_buckets = False | ||
has_sum = False | ||
has_gsum = False | ||
has_negative_gsum = False | ||
value = 0 | ||
group = g | ||
timestamp = s.timestamp | ||
|
||
if suffix == '_bucket': | ||
b = float(s.labels['le']) | ||
if b < 0: | ||
has_negative_buckets = True | ||
if bucket is not None and b <= bucket: | ||
raise ValueError("Buckets out of order: " + name) | ||
if s.value < value: | ||
raise ValueError("Bucket values out of order: " + name) | ||
bucket = b | ||
value = s.value | ||
elif suffix in ['_count', '_gcount']: | ||
count = s.value | ||
elif suffix in ['_sum']: | ||
has_sum = True | ||
elif suffix in ['_gsum']: | ||
has_gsum = True | ||
if s.value < 0: | ||
has_negative_gsum = True | ||
if len(suffix) != 0: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rather than nesting everything further in what about a |
||
if g != group or s.timestamp != timestamp: | ||
if group is not None: | ||
do_checks() | ||
count = None | ||
bucket = None | ||
has_negative_buckets = False | ||
has_sum = False | ||
has_gsum = False | ||
has_negative_gsum = False | ||
value = 0 | ||
group = g | ||
timestamp = s.timestamp | ||
|
||
if suffix == '_bucket': | ||
b = float(s.labels['le']) | ||
if b < 0: | ||
has_negative_buckets = True | ||
if bucket is not None and b <= bucket: | ||
raise ValueError("Buckets out of order: " + name) | ||
if s.value < value: | ||
raise ValueError("Bucket values out of order: " + name) | ||
bucket = b | ||
value = s.value | ||
elif suffix in ['_count', '_gcount']: | ||
count = s.value | ||
elif suffix in ['_sum']: | ||
has_sum = True | ||
elif suffix in ['_gsum']: | ||
has_gsum = True | ||
if s.value < 0: | ||
has_negative_gsum = True | ||
|
||
if group is not None: | ||
do_checks() | ||
|
@@ -486,6 +583,8 @@ def build_metric(name, documentation, typ, unit, samples): | |
metric.samples = samples | ||
return metric | ||
|
||
is_nh = True | ||
typ = None | ||
for line in fd: | ||
if line[-1] == '\n': | ||
line = line[:-1] | ||
|
@@ -518,7 +617,7 @@ def build_metric(name, documentation, typ, unit, samples): | |
group_timestamp_samples = set() | ||
samples = [] | ||
allowed_names = [parts[2]] | ||
|
||
if parts[1] == 'HELP': | ||
if documentation is not None: | ||
raise ValueError("More than one HELP for metric: " + line) | ||
|
@@ -537,8 +636,15 @@ def build_metric(name, documentation, typ, unit, samples): | |
else: | ||
raise ValueError("Invalid line: " + line) | ||
else: | ||
sample = _parse_sample(line) | ||
if sample.name not in allowed_names: | ||
if typ == 'histogram': | ||
sample = _parse_nh_sample(line, tuple(type_suffixes['histogram'])) | ||
csmarchbanks marked this conversation as resolved.
Show resolved
Hide resolved
|
||
else: | ||
# It's not a native histogram | ||
sample = None | ||
if sample is None: | ||
is_nh = False | ||
sample = _parse_sample(line) | ||
if sample.name not in allowed_names and not is_nh: | ||
if name is not None: | ||
yield build_metric(name, documentation, typ, unit, samples) | ||
# Start an unknown metric. | ||
|
@@ -570,26 +676,29 @@ def build_metric(name, documentation, typ, unit, samples): | |
or _isUncanonicalNumber(sample.labels['quantile']))): | ||
raise ValueError("Invalid quantile label: " + line) | ||
|
||
g = tuple(sorted(_group_for_sample(sample, name, typ).items())) | ||
if group is not None and g != group and g in seen_groups: | ||
raise ValueError("Invalid metric grouping: " + line) | ||
if group is not None and g == group: | ||
if (sample.timestamp is None) != (group_timestamp is None): | ||
raise ValueError("Mix of timestamp presence within a group: " + line) | ||
if group_timestamp is not None and group_timestamp > sample.timestamp and typ != 'info': | ||
raise ValueError("Timestamps went backwards within a group: " + line) | ||
if not is_nh: | ||
g = tuple(sorted(_group_for_sample(sample, name, typ).items())) | ||
if group is not None and g != group and g in seen_groups: | ||
raise ValueError("Invalid metric grouping: " + line) | ||
if group is not None and g == group: | ||
if (sample.timestamp is None) != (group_timestamp is None): | ||
raise ValueError("Mix of timestamp presence within a group: " + line) | ||
if group_timestamp is not None and group_timestamp > sample.timestamp and typ != 'info': | ||
raise ValueError("Timestamps went backwards within a group: " + line) | ||
else: | ||
group_timestamp_samples = set() | ||
|
||
series_id = (sample.name, tuple(sorted(sample.labels.items()))) | ||
if sample.timestamp != group_timestamp or series_id not in group_timestamp_samples: | ||
# Not a duplicate due to timestamp truncation. | ||
samples.append(sample) | ||
group_timestamp_samples.add(series_id) | ||
|
||
group = g | ||
group_timestamp = sample.timestamp | ||
seen_groups.add(g) | ||
else: | ||
group_timestamp_samples = set() | ||
|
||
series_id = (sample.name, tuple(sorted(sample.labels.items()))) | ||
if sample.timestamp != group_timestamp or series_id not in group_timestamp_samples: | ||
# Not a duplicate due to timestamp truncation. | ||
samples.append(sample) | ||
group_timestamp_samples.add(series_id) | ||
|
||
group = g | ||
group_timestamp = sample.timestamp | ||
seen_groups.add(g) | ||
|
||
if typ == 'stateset' and sample.value not in [0, 1]: | ||
raise ValueError("Stateset samples can only have values zero and one: " + line) | ||
|
@@ -606,7 +715,7 @@ def build_metric(name, documentation, typ, unit, samples): | |
(typ in ['histogram', 'gaugehistogram'] and sample.name.endswith('_bucket')) | ||
or (typ in ['counter'] and sample.name.endswith('_total'))): | ||
raise ValueError("Invalid line only histogram/gaugehistogram buckets and counters can have exemplars: " + line) | ||
|
||
if name is not None: | ||
yield build_metric(name, documentation, typ, unit, samples) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Graphite doesn't have support for native histograms, if for some reason one shows up in a registry we would probably need to drop it with a warning. For now I would say let's not make any changes here as native histograms will be experimental anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're absolutely right. The thing is the
mypy
linter fails without this change. That's due to the native histogram value being a union. I noticed that only when I pushed the PR for the first timeThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Solved after separating fields