Skip to content

Commit

Permalink
ruler: Add support for simple relabelling in form of label drop. (#662)
Browse files Browse the repository at this point in the history
This allows to setup thanos ruler in HA and avoid duplicated alerts.

Signed-off-by: Bartek Plotka <[email protected]>
  • Loading branch information
bwplotka authored Dec 12, 2018
1 parent 52ea0ad commit 6a82ca2
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 16 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ We use *breaking* word for marking changes that are not backward compatible (rel

## Unreleased

### Added

- Relabel drop for Thanos Ruler to enable replica label drop and alert deduplication on AM side.

### Fixed

- DNS SD bug when having SRV results with different ports.
Expand Down
9 changes: 7 additions & 2 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application, name string)

grpcBindAddr, httpBindAddr, cert, key, clientCA, newPeerFn := regCommonServerFlags(cmd)

labelStrs := cmd.Flag("label", "Labels to be applied to all generated metrics (repeated).").
labelStrs := cmd.Flag("label", "Labels to be applied to all generated metrics (repeated). Similar to external labels for Prometheus, used to identify ruler and its blocks as unique source.").
PlaceHolder("<name>=\"<value>\"").Strings()

dataDir := cmd.Flag("data-dir", "data directory").Default("data/").String()
Expand All @@ -79,6 +79,9 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application, name string)

alertQueryURL := cmd.Flag("alert.query-url", "The external Thanos Query URL that would be set in all alerts 'Source' field").String()

alertExcludeLabels := cmd.Flag("alert.label-drop", "Labels by name to drop before sending to alertmanager. This allows alert to be deduplicated on replica label (repeated). Similar Prometheus alert relabelling").
Strings()

objStoreConfig := regCommonObjStoreFlags(cmd, "")

queries := cmd.Flag("query", "Addresses of statically configured query API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect query API servers through respective DNS lookups.").
Expand Down Expand Up @@ -152,6 +155,7 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application, name string)
tsdbOpts,
name,
alertQueryURL,
*alertExcludeLabels,
*queries,
fileSD,
time.Duration(*dnsSDInterval),
Expand Down Expand Up @@ -181,6 +185,7 @@ func runRule(
tsdbOpts *tsdb.Options,
component string,
alertQueryURL *url.URL,
alertExcludeLabels []string,
queryAddrs []string,
fileSD *file.Discovery,
dnsSDInterval time.Duration,
Expand Down Expand Up @@ -274,7 +279,7 @@ func runRule(
// Run rule evaluation and alert notifications.
var (
alertmgrs = newAlertmanagerSet(alertmgrURLs)
alertQ = alert.NewQueue(logger, reg, 10000, 100, labelsTSDBToProm(lset))
alertQ = alert.NewQueue(logger, reg, 10000, 100, labelsTSDBToProm(lset), alertExcludeLabels)
mgr *rules.Manager
)
{
Expand Down
9 changes: 8 additions & 1 deletion docs/components/rule.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ Flags:
related server will be started.
--label=<name>="<value>" ...
Labels to be applied to all generated metrics
(repeated).
(repeated). Similar to external labels for
Prometheus, used to identify ruler and its
blocks as unique source.
--data-dir="data/" data directory
--rule-file=rules/ ... Rule files that should be used by rule manager.
Can be in glob format (repeated).
Expand All @@ -130,6 +132,11 @@ Flags:
--alert.query-url=ALERT.QUERY-URL
The external Thanos Query URL that would be set
in all alerts 'Source' field
--alert.label-drop=ALERT.LABEL-DROP ...
Labels by name to drop before sending to
alertmanager. This allows alert to be
deduplicated on replica label (repeated).
Similar Prometheus alert relabelling
--objstore.config-file=<bucket.config-yaml-path>
Path to YAML file that contains object store
configuration.
Expand Down
56 changes: 43 additions & 13 deletions pkg/alert/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,11 @@ func (a *Alert) ResolvedAt(ts time.Time) bool {
// Queue is a queue of alert notifications waiting to be sent. The queue is consumed in batches
// and entries are dropped at the front if it runs full.
type Queue struct {
logger log.Logger
maxBatchSize int
capacity int
labels labels.Labels
logger log.Logger
maxBatchSize int
capacity int
toAddLset labels.Labels
toExcludeLabels labels.Labels

mtx sync.Mutex
queue []*Alert
Expand All @@ -92,17 +93,39 @@ type Queue struct {
dropped prometheus.Counter
}

func relabelLabels(lset labels.Labels, excludeLset []string) (toAdd labels.Labels, toExclude labels.Labels) {
for _, ln := range excludeLset {
toExclude = append(toExclude, labels.Label{Name: ln})
}

for _, l := range lset {
// Exclude labels to to add straight away.
if toExclude.Has(l.Name) {
continue
}
toAdd = append(toAdd, labels.Label{
Name: l.Name,
Value: l.Value,
})
}
return toAdd, toExclude
}

// NewQueue returns a new queue. The given label set is attached to all alerts pushed to the queue.
func NewQueue(logger log.Logger, reg prometheus.Registerer, capacity, maxBatchSize int, lset labels.Labels) *Queue {
// The given exclude label set tells what label names to drop including external labels.
func NewQueue(logger log.Logger, reg prometheus.Registerer, capacity, maxBatchSize int, externalLset labels.Labels, excludeLabels []string) *Queue {
toAdd, toExclude := relabelLabels(externalLset, excludeLabels)

if logger == nil {
logger = log.NewNopLogger()
}
q := &Queue{
logger: logger,
capacity: capacity,
morec: make(chan struct{}, 1),
maxBatchSize: maxBatchSize,
labels: lset,
logger: logger,
capacity: capacity,
morec: make(chan struct{}, 1),
maxBatchSize: maxBatchSize,
toAddLset: toAdd,
toExcludeLabels: toExclude,

dropped: prometheus.NewCounter(prometheus.CounterOpts{
Name: "thanos_alert_queue_alerts_dropped_total",
Expand Down Expand Up @@ -179,10 +202,17 @@ func (q *Queue) Push(alerts []*Alert) {

q.pushed.Add(float64(len(alerts)))

// Attach external labels before relabelling and sending.
// Attach external labels and drop excluded labels before sending.
// TODO(bwplotka): User proper relabelling with https://github.com/improbable-eng/thanos/issues/660
for _, a := range alerts {
lb := labels.NewBuilder(a.Labels)
for _, l := range q.labels {
lb := labels.NewBuilder(labels.Labels{})
for _, l := range a.Labels {
if q.toExcludeLabels.Has(l.Name) {
continue
}
lb.Set(l.Name, l.Value)
}
for _, l := range q.toAddLset {
lb.Set(l.Name, l.Value)
}
a.Labels = lb.Labels()
Expand Down
27 changes: 27 additions & 0 deletions pkg/alert/alert_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package alert

import (
"testing"

"github.com/improbable-eng/thanos/pkg/testutil"
"github.com/prometheus/prometheus/pkg/labels"
)

func TestQueue_Push_Relabelled(t *testing.T) {
q := NewQueue(
nil, nil, 10, 10,
labels.FromStrings("a", "1", "replica", "A"), // Labels to be added.
[]string{"b", "replica"}, // Labels to be dropped (excluding those added).
)

q.Push([]*Alert{
{Labels: labels.FromStrings("b", "2", "c", "3")},
{Labels: labels.FromStrings("c", "3")},
{Labels: labels.FromStrings("a", "2")},
})

testutil.Equals(t, 3, len(q.queue))
testutil.Equals(t, labels.FromStrings("a", "1", "c", "3"), q.queue[0].Labels)
testutil.Equals(t, labels.FromStrings("a", "1", "c", "3"), q.queue[1].Labels)
testutil.Equals(t, labels.FromStrings("a", "1"), q.queue[2].Labels)
}

0 comments on commit 6a82ca2

Please sign in to comment.