forked from cloudposse/prometheus-to-cloudwatch
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprometheus_to_cloudwatch.go
495 lines (424 loc) · 14.7 KB
/
prometheus_to_cloudwatch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
package main
import (
"bytes"
"compress/gzip"
"context"
"crypto/tls"
"errors"
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/gobwas/glob"
"github.com/matttproud/golang_protobuf_extensions/pbutil"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"
"io"
"log"
"math"
"mime"
"net/http"
"sort"
"time"
)
const (
batchSize = 10
cwHighResLabel = "__cw_high_res"
cwUnitLabel = "__cw_unit"
acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3`
)
// Config defines configuration options
type Config struct {
// AWS access key Id with permissions to publish CloudWatch metrics
AwsAccessKeyId string
// AWS secret access key with permissions to publish CloudWatch metrics
AwsSecretAccessKey string
// AWS session token with permissions to publish CloudWatch metrics
AwsSessionToken string
// Required. The CloudWatch namespace under which metrics should be published
CloudWatchNamespace string
// Required. The AWS Region to use
CloudWatchRegion string
// The frequency with which metrics should be published to Cloudwatch. Default: 15s
CloudWatchPublishInterval time.Duration
// Timeout for sending metrics to Cloudwatch. Default: 3s
CloudWatchPublishTimeout time.Duration
// Prometheus scrape URL
PrometheusScrapeUrl string
// Path to Certificate file
PrometheusCertPath string
// Path to Key file
PrometheusKeyPath string
// Accept any certificate during TLS handshake. Insecure, use only for testing
PrometheusSkipServerCertCheck bool
// Additional dimensions to send to CloudWatch
AdditionalDimensions map[string]string
// Replace dimensions with the provided label. This allows for aggregating metrics across dimensions so we can set CloudWatch Alarms on the metrics
ReplaceDimensions map[string]string
// Only publish the specified metrics (a list of glob patterns, e.g. ["up", "http_*"])
IncludeMetrics []glob.Glob
// Never publish the specified metrics (a list of glob patterns, e.g. ["tomcat_*"])
ExcludeMetrics []glob.Glob
}
// Bridge pushes metrics to AWS CloudWatch
type Bridge struct {
cloudWatchPublishInterval time.Duration
cloudWatchNamespace string
cw *cloudwatch.CloudWatch
prometheusScrapeUrl string
prometheusCertPath string
prometheusKeyPath string
prometheusSkipServerCertCheck bool
additionalDimensions map[string]string
replaceDimensions map[string]string
includeMetrics []glob.Glob
excludeMetrics []glob.Glob
}
// NewBridge initializes and returns a pointer to a Bridge using the
// supplied configuration, or an error if there is a problem with the configuration
func NewBridge(c *Config) (*Bridge, error) {
b := &Bridge{}
if c.CloudWatchNamespace == "" {
return nil, errors.New("CloudWatchNamespace required")
}
b.cloudWatchNamespace = c.CloudWatchNamespace
if c.PrometheusScrapeUrl == "" {
return nil, errors.New("PrometheusScrapeUrl required")
}
b.prometheusScrapeUrl = c.PrometheusScrapeUrl
b.prometheusCertPath = c.PrometheusCertPath
b.prometheusKeyPath = c.PrometheusKeyPath
b.prometheusSkipServerCertCheck = c.PrometheusSkipServerCertCheck
b.additionalDimensions = c.AdditionalDimensions
b.replaceDimensions = c.ReplaceDimensions
b.includeMetrics = c.IncludeMetrics
b.excludeMetrics = c.ExcludeMetrics
if c.CloudWatchPublishInterval > 0 {
b.cloudWatchPublishInterval = c.CloudWatchPublishInterval
} else {
b.cloudWatchPublishInterval = 30 * time.Second
}
var client = http.DefaultClient
if c.CloudWatchPublishTimeout > 0 {
client.Timeout = c.CloudWatchPublishTimeout
} else {
client.Timeout = 5 * time.Second
}
if c.CloudWatchRegion == "" {
return nil, errors.New("CloudWatchRegion required")
}
config := aws.NewConfig().WithHTTPClient(client).WithRegion(c.CloudWatchRegion)
// https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/configuring-sdk.html
// https://docs.aws.amazon.com/sdk-for-go/api/aws/#Config
// If credentials are not provided in the variables, the chain of credential providers will search for credentials
// in environment variables, the shared credential file, and EC2 Instance Roles
if c.AwsAccessKeyId != "" && c.AwsSecretAccessKey != "" {
// Utilise AWS session token if one is provided (Required for temporary AWS credentials)
config.Credentials = credentials.NewStaticCredentials(c.AwsAccessKeyId, c.AwsSecretAccessKey, c.AwsSessionToken)
}
sess, err := session.NewSession(config)
if err != nil {
return nil, err
}
b.cw = cloudwatch.New(sess)
return b, nil
}
// Run starts a loop that will push metrics to Cloudwatch at the configured interval. Accepts a context.Context to support cancellation
func (b *Bridge) Run(ctx context.Context) {
ticker := time.NewTicker(b.cloudWatchPublishInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
mfChan := make(chan *dto.MetricFamily, 1024)
go fetchMetricFamilies(b.prometheusScrapeUrl, mfChan, b.prometheusCertPath, b.prometheusKeyPath, b.prometheusSkipServerCertCheck)
var metricFamilies []*dto.MetricFamily
for mf := range mfChan {
metricFamilies = append(metricFamilies, mf)
}
count, err := b.publishMetricsToCloudWatch(metricFamilies)
if err != nil {
log.Println("prometheus-to-cloudwatch: error publishing to CloudWatch:", err)
}
log.Println(fmt.Sprintf("prometheus-to-cloudwatch: published %d metrics to CloudWatch", count))
case <-ctx.Done():
log.Println("prometheus-to-cloudwatch: stopping")
return
}
}
}
// NOTE: The CloudWatch API has the following limitations:
// - Max 40kb request size
// - Single namespace per request
// - Max 10 dimensions per metric
func (b *Bridge) publishMetricsToCloudWatch(mfs []*dto.MetricFamily) (count int, e error) {
vec, err := expfmt.ExtractSamples(&expfmt.DecodeOptions{Timestamp: model.Now()}, mfs...)
if err != nil {
return 0, err
}
data := make([]*cloudwatch.MetricDatum, 0, batchSize)
for _, s := range vec {
name := getName(s.Metric)
if b.shouldIgnoreMetric(name) {
continue
}
data = appendDatum(data, name, s, b)
if len(data) == batchSize {
count += batchSize
if err := b.flush(data); err != nil {
log.Println("prometheus-to-cloudwatch: error publishing to CloudWatch:", err)
}
data = make([]*cloudwatch.MetricDatum, 0, batchSize)
}
}
count += len(data)
return count, b.flush(data)
}
func (b *Bridge) flush(data []*cloudwatch.MetricDatum) error {
if len(data) > 0 {
in := &cloudwatch.PutMetricDataInput{
MetricData: data,
Namespace: &b.cloudWatchNamespace,
}
req, _ := b.cw.PutMetricDataRequest(in)
req.Handlers.Build.PushBack(compressPayload)
return req.Send()
}
return nil
}
// Compresses the payload before sending it to the API.
// According to the documentation:
// "Each PutMetricData request is limited to 40 KB in size for HTTP POST requests.
// You can send a payload compressed by gzip."
func compressPayload(r *request.Request) {
var buf bytes.Buffer
zw := gzip.NewWriter(&buf)
if _, err := io.Copy(zw, r.GetBody()); err != nil {
log.Println("prometheus-to-cloudwatch: error compressing HTTP body:", err)
return
}
if err := zw.Close(); err != nil {
log.Println("prometheus-to-cloudwatch: error compressing HTTP body:", err)
return
}
r.SetBufferBody(buf.Bytes())
r.HTTPRequest.Header.Set("Content-Encoding", "gzip")
}
func (b *Bridge) shouldIgnoreMetric(metricName string) bool {
// Blacklist takes priority over the whitelist
if anyPatternMatches(b.excludeMetrics, metricName) {
return true
} else if len(b.includeMetrics) == 0 {
return false
} else if anyPatternMatches(b.includeMetrics, metricName) {
return false
}
return true
}
func anyPatternMatches(patterns []glob.Glob, s string) bool {
for _, pattern := range patterns {
if pattern.Match(s) {
return true
}
}
return false
}
func appendDatum(data []*cloudwatch.MetricDatum, name string, s *model.Sample, b *Bridge) []*cloudwatch.MetricDatum {
metric := s.Metric
if len(metric) == 0 {
return data
}
// Check value before adding the datum
value := float64(s.Value)
if !validValue(value) {
return data
}
datum := &cloudwatch.MetricDatum{}
kubeStateDimensions, replacedDimensions := getDimensions(metric, 10-len(b.additionalDimensions), b)
datum.SetMetricName(name).
SetValue(value).
SetTimestamp(s.Timestamp.Time()).
SetDimensions(append(kubeStateDimensions, getAdditionalDimensions(b)...)).
SetStorageResolution(getResolution(metric)).
SetUnit(getUnit(metric))
data = append(data, datum)
// Don't add replacement if not configured
if replacedDimensions != nil && len(replacedDimensions) > 0 {
replacedDimensionDatum := &cloudwatch.MetricDatum{}
replacedDimensionDatum.SetMetricName(name).
SetValue(value).
SetTimestamp(s.Timestamp.Time()).
SetDimensions(append(replacedDimensions, getAdditionalDimensions(b)...)).
SetStorageResolution(getResolution(metric)).
SetUnit(getUnit(metric))
data = append(data, replacedDimensionDatum)
}
return data
}
var (
valueTooSmall = math.Pow(2, -260)
valueTooLarge = math.Pow(2, 260)
)
// According to the documentation:
// "CloudWatch rejects values that are either too small or too large.
// Values must be in the range of 8.515920e-109 to 1.174271e+108 (Base 10)
// or 2e-360 to 2e360 (Base 2).
// In addition, special values (for example, NaN, +Infinity, -Infinity) are not supported."
func validValue(v float64) bool {
if math.IsInf(v, 0) {
return false
}
if math.IsNaN(v) {
return false
}
// Check for zero first to avoid tripping on "value too small"
if v == 0.0 {
return true
}
// Check that a non-zero value is within the range of accepted values
a := math.Abs(v)
if a <= valueTooSmall || a >= valueTooLarge {
return false
}
return true
}
func getName(m model.Metric) string {
if n, ok := m[model.MetricNameLabel]; ok {
return string(n)
}
return ""
}
// getDimensions returns up to 10 dimensions for the provided metric - one for each label (except the __name__ label)
// If a metric has more than 10 labels, it attempts to behave deterministically and returning the first 10 labels as dimensions
func getDimensions(m model.Metric, num int, b *Bridge) ([]*cloudwatch.Dimension, []*cloudwatch.Dimension) {
if len(m) == 0 {
return make([]*cloudwatch.Dimension, 0), nil
} else if _, ok := m[model.MetricNameLabel]; len(m) == 1 && ok {
return make([]*cloudwatch.Dimension, 0), nil
}
names := make([]string, 0, len(m))
for k := range m {
if !(k == model.MetricNameLabel || k == cwHighResLabel || k == cwUnitLabel) {
names = append(names, string(k))
}
}
sort.Strings(names)
dims := make([]*cloudwatch.Dimension, 0, len(names))
replacedDims := make([]*cloudwatch.Dimension, 0, len(names))
for _, name := range names {
if name != "" {
val := string(m[model.LabelName(name)])
if val != "" {
dims = append(dims, new(cloudwatch.Dimension).SetName(name).SetValue(val))
// Don't add replacement if not configured
if b.replaceDimensions != nil && len(b.replaceDimensions) > 0 {
if replacement, ok := b.replaceDimensions[name]; ok {
replacedDims = append(replacedDims, new(cloudwatch.Dimension).SetName(name).SetValue(replacement))
} else {
replacedDims = append(replacedDims, new(cloudwatch.Dimension).SetName(name).SetValue(val))
}
}
}
}
}
if len(dims) > num {
dims = dims[:num]
}
if len(replacedDims) > num {
replacedDims = replacedDims[:num]
}
return dims, replacedDims
}
func getAdditionalDimensions(b *Bridge) []*cloudwatch.Dimension {
dims := make([]*cloudwatch.Dimension, 0, len(b.additionalDimensions))
for k, v := range b.additionalDimensions {
dims = append(dims, new(cloudwatch.Dimension).SetName(k).SetValue(v))
}
return dims
}
// Returns 1 if the metric contains a __cw_high_res label, otherwise returns 60
func getResolution(m model.Metric) int64 {
if _, ok := m[cwHighResLabel]; ok {
return 1
}
return 60
}
func getUnit(m model.Metric) string {
if u, ok := m[cwUnitLabel]; ok {
return string(u)
}
return "None"
}
// fetchMetricFamilies retrieves metrics from the provided URL, decodes them into MetricFamily proto messages, and sends them to the provided channel.
// It returns after all MetricFamilies have been sent
func fetchMetricFamilies(
url string, ch chan<- *dto.MetricFamily,
certificate string, key string,
skipServerCertCheck bool,
) {
defer close(ch)
var transport *http.Transport
if certificate != "" && key != "" {
cert, err := tls.LoadX509KeyPair(certificate, key)
if err != nil {
log.Fatal("prometheus-to-cloudwatch: Error: ", err)
}
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
InsecureSkipVerify: skipServerCertCheck,
}
tlsConfig.BuildNameToCertificate()
transport = &http.Transport{TLSClientConfig: tlsConfig}
} else {
transport = &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: skipServerCertCheck},
}
}
client := &http.Client{Transport: transport}
decodeContent(client, url, ch)
}
func decodeContent(client *http.Client, url string, ch chan<- *dto.MetricFamily) {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
log.Fatalf("prometheus-to-cloudwatch: Error: creating GET request for URL %q failed: %s", url, err)
}
req.Header.Add("Accept", acceptHeader)
resp, err := client.Do(req)
if err != nil {
log.Fatalf("prometheus-to-cloudwatch: Error: executing GET request for URL %q failed: %s", url, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
log.Fatalf("prometheus-to-cloudwatch: Error: GET request for URL %q returned HTTP status %s", url, resp.Status)
}
parseResponse(resp, ch)
}
// parseResponse consumes an http.Response and pushes it to the channel.
// It returns when all all MetricFamilies are parsed and put on the channel.
func parseResponse(resp *http.Response, ch chan<- *dto.MetricFamily) {
mediaType, params, err := mime.ParseMediaType(resp.Header.Get("Content-Type"))
if err == nil && mediaType == "application/vnd.google.protobuf" && params["encoding"] == "delimited" && params["proto"] == "io.prometheus.client.MetricFamily" {
for {
mf := &dto.MetricFamily{}
if _, err = pbutil.ReadDelimited(resp.Body, mf); err != nil {
if err == io.EOF {
break
}
log.Fatalln("prometheus-to-cloudwatch: Error: reading metric family protocol buffer failed:", err)
}
ch <- mf
}
} else {
var parser expfmt.TextParser
metricFamilies, err := parser.TextToMetricFamilies(resp.Body)
if err != nil {
log.Fatalln("reading text format failed:", err)
}
for _, mf := range metricFamilies {
ch <- mf
}
}
}