-
Notifications
You must be signed in to change notification settings - Fork 802
/
registry.py
168 lines (139 loc) · 6.05 KB
/
registry.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
from abc import ABC, abstractmethod
import copy
from threading import Lock
from typing import Dict, Iterable, List, Optional
from .metrics_core import Metric
# Ideally this would be a Protocol, but Protocols are only available in Python >= 3.8.
class Collector(ABC):
@abstractmethod
def collect(self) -> Iterable[Metric]:
pass
class _EmptyCollector(Collector):
def collect(self) -> Iterable[Metric]:
return []
class CollectorRegistry(Collector):
"""Metric collector registry.
Collectors must have a no-argument method 'collect' that returns a list of
Metric objects. The returned metrics should be consistent with the Prometheus
exposition formats.
"""
def __init__(self, auto_describe: bool = False, target_info: Optional[Dict[str, str]] = None):
self._collector_to_names: Dict[Collector, List[str]] = {}
self._names_to_collectors: Dict[str, Collector] = {}
self._auto_describe = auto_describe
self._lock = Lock()
self._target_info: Optional[Dict[str, str]] = {}
self.set_target_info(target_info)
def register(self, collector: Collector) -> None:
"""Add a collector to the registry."""
with self._lock:
names = self._get_names(collector)
duplicates = set(self._names_to_collectors).intersection(names)
if duplicates:
raise ValueError(
'Duplicated timeseries in CollectorRegistry: {}'.format(
duplicates))
for name in names:
self._names_to_collectors[name] = collector
self._collector_to_names[collector] = names
def unregister(self, collector: Collector) -> None:
"""Remove a collector from the registry."""
with self._lock:
for name in self._collector_to_names[collector]:
del self._names_to_collectors[name]
del self._collector_to_names[collector]
def _get_names(self, collector):
"""Get names of timeseries the collector produces and clashes with."""
desc_func = None
# If there's a describe function, use it.
try:
desc_func = collector.describe
except AttributeError:
pass
# Otherwise, if auto describe is enabled use the collect function.
if not desc_func and self._auto_describe:
desc_func = collector.collect
if not desc_func:
return []
result = []
type_suffixes = {
'counter': ['_total', '_created'],
'summary': ['_sum', '_count', '_created'],
'histogram': ['_bucket', '_sum', '_count', '_created'],
'gaugehistogram': ['_bucket', '_gsum', '_gcount'],
'info': ['_info'],
}
for metric in desc_func():
result.append(metric.name)
for suffix in type_suffixes.get(metric.type, []):
result.append(metric.name + suffix)
return result
def collect(self) -> Iterable[Metric]:
"""Yields metrics from the collectors in the registry."""
collectors = None
ti = None
with self._lock:
collectors = copy.copy(self._collector_to_names)
if self._target_info:
ti = self._target_info_metric()
if ti:
yield ti
for collector in collectors:
yield from collector.collect()
def restricted_registry(self, names: Iterable[str]) -> "RestrictedRegistry":
"""Returns object that only collects some metrics.
Returns an object which upon collect() will return
only samples with the given names.
Intended usage is:
generate_latest(REGISTRY.restricted_registry(['a_timeseries']))
Experimental."""
names = set(names)
return RestrictedRegistry(names, self)
def set_target_info(self, labels: Optional[Dict[str, str]]) -> None:
with self._lock:
if labels:
if not self._target_info and 'target_info' in self._names_to_collectors:
raise ValueError('CollectorRegistry already contains a target_info metric')
self._names_to_collectors['target_info'] = _EmptyCollector()
elif self._target_info:
self._names_to_collectors.pop('target_info', None)
self._target_info = labels
def get_target_info(self) -> Optional[Dict[str, str]]:
with self._lock:
return self._target_info
def _target_info_metric(self):
m = Metric('target', 'Target metadata', 'info')
m.add_sample('target_info', self._target_info, 1)
return m
def get_sample_value(self, name: str, labels: Optional[Dict[str, str]] = None) -> Optional[float]:
"""Returns the sample value, or None if not found.
This is inefficient, and intended only for use in unittests.
"""
if labels is None:
labels = {}
for metric in self.collect():
for s in metric.samples:
if s.name == name and s.labels == labels:
return s.value
return None
class RestrictedRegistry:
def __init__(self, names: Iterable[str], registry: CollectorRegistry):
self._name_set = set(names)
self._registry = registry
def collect(self) -> Iterable[Metric]:
collectors = set()
target_info_metric = None
with self._registry._lock:
if 'target_info' in self._name_set and self._registry._target_info:
target_info_metric = self._registry._target_info_metric()
for name in self._name_set:
if name != 'target_info' and name in self._registry._names_to_collectors:
collectors.add(self._registry._names_to_collectors[name])
if target_info_metric:
yield target_info_metric
for collector in collectors:
for metric in collector.collect():
m = metric._restricted_metric(self._name_set)
if m:
yield m
REGISTRY = CollectorRegistry(auto_describe=True)