Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Receive: Allow specifying tenant-specific external labels in RouterIngestor #5777

Merged
merged 1 commit into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6167](https://github.com/thanos-io/thanos/pull/6195) Receive: add flag `tsdb.too-far-in-future.time-window` to prevent clock skewed samples to pollute TSDB head and block all valid incoming samples.
- [#6273](https://github.com/thanos-io/thanos/pull/6273) Mixin: Allow specifying an instance name filter in dashboards
- [#6163](https://github.com/thanos-io/thanos/pull/6163) Receiver: Add hidden flag `--receive-forward-max-backoff` to configure the max backoff for forwarding requests.
- [#5777](https://github.com/thanos-io/thanos/pull/5777) Receive: Allow specifying tenant-specific external labels in Router Ingestor.

### Fixed

Expand Down
42 changes: 27 additions & 15 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func runReceive(

level.Debug(logger).Log("msg", "setting up hashring")
{
if err := setupHashring(g, logger, reg, conf, hashringChangedChan, webHandler, statusProber, enableIngestion); err != nil {
if err := setupHashring(g, logger, reg, conf, hashringChangedChan, webHandler, statusProber, enableIngestion, dbs); err != nil {
return err
}
}
Expand Down Expand Up @@ -451,12 +451,13 @@ func setupHashring(g *run.Group,
webHandler *receive.Handler,
statusProber prober.Probe,
enableIngestion bool,
dbs *receive.MultiTSDB,
) error {
// Note: the hashring configuration watcher
// is the sender and thus closes the chan.
// In the single-node case, which has no configuration
// watcher, we close the chan ourselves.
updates := make(chan receive.Hashring, 1)
updates := make(chan []receive.HashringConfig, 1)
algorithm := receive.HashringAlgorithm(conf.hashringsAlgorithm)

// The Hashrings config file path is given initializing config watcher.
Expand All @@ -475,33 +476,28 @@ func setupHashring(g *run.Group,

ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
level.Info(logger).Log("msg", "the hashring initialized with config watcher.")
return receive.HashringFromConfigWatcher(ctx, algorithm, conf.replicationFactor, updates, cw)
return receive.ConfigFromWatcher(ctx, updates, cw)
}, func(error) {
cancel()
})
} else {
var (
ring receive.Hashring
err error
cf []receive.HashringConfig
err error
)
// The Hashrings config file content given initialize configuration from content.
if len(conf.hashringsFileContent) > 0 {
ring, err = receive.HashringFromConfig(algorithm, conf.replicationFactor, conf.hashringsFileContent)
cf, err = receive.ParseConfig([]byte(conf.hashringsFileContent))
if err != nil {
close(updates)
return errors.Wrap(err, "failed to validate hashring configuration file")
return errors.Wrap(err, "failed to validate hashring configuration content")
}
level.Info(logger).Log("msg", "the hashring initialized directly with the given content through the flag.")
} else {
level.Info(logger).Log("msg", "the hashring file is not specified use single node hashring.")
ring = receive.SingleNodeHashring(conf.endpoint)
}

cancel := make(chan struct{})
g.Add(func() error {
defer close(updates)
updates <- ring
updates <- cf
<-cancel
return nil
}, func(error) {
Expand All @@ -518,11 +514,27 @@ func setupHashring(g *run.Group,

for {
select {
case h, ok := <-updates:
case c, ok := <-updates:
if !ok {
return nil
}
webHandler.Hashring(h)

if c == nil {
webHandler.Hashring(receive.SingleNodeHashring(conf.endpoint))
level.Info(logger).Log("msg", "Empty hashring config. Set up single node hashring.")
} else {
h, err := receive.NewMultiHashring(algorithm, conf.replicationFactor, c)
if err != nil {
return errors.Wrap(err, "unable to create new hashring from config")
}
webHandler.Hashring(h)
level.Info(logger).Log("msg", "Set up hashring for the given hashring config.")
}

if err := dbs.SetHashringConfig(c); err != nil {
return errors.Wrap(err, "failed to set hashring config in MultiTSDB")
}

// If ingestion is enabled, send a signal to TSDB to flush.
if enableIngestion {
hashringChangedChan <- struct{}{}
Expand Down
24 changes: 21 additions & 3 deletions pkg/exemplars/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package exemplars

import (
"sync"

"github.com/gogo/status"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
Expand All @@ -16,8 +18,10 @@ import (

// TSDB allows fetching exemplars from a TSDB instance.
type TSDB struct {
db storage.ExemplarQueryable
db storage.ExemplarQueryable

extLabels labels.Labels
mtx sync.RWMutex
}

// NewTSDB creates new exemplars.TSDB.
Expand All @@ -28,9 +32,23 @@ func NewTSDB(db storage.ExemplarQueryable, extLabels labels.Labels) *TSDB {
}
}

func (t *TSDB) SetExtLabels(extLabels labels.Labels) {
haanhvu marked this conversation as resolved.
Show resolved Hide resolved
t.mtx.Lock()
defer t.mtx.Unlock()

t.extLabels = extLabels
}

func (t *TSDB) getExtLabels() labels.Labels {
t.mtx.RLock()
defer t.mtx.RUnlock()

return t.extLabels
}

// Exemplars returns all specified exemplars from a TSDB instance.
func (t *TSDB) Exemplars(matchers [][]*labels.Matcher, start, end int64, s exemplarspb.Exemplars_ExemplarsServer) error {
match, selectors := selectorsMatchesExternalLabels(matchers, t.extLabels)
match, selectors := selectorsMatchesExternalLabels(matchers, t.getExtLabels())

if !match {
return nil
Expand All @@ -53,7 +71,7 @@ func (t *TSDB) Exemplars(matchers [][]*labels.Matcher, start, end int64, s exemp
for _, e := range exemplars {
exd := exemplarspb.ExemplarData{
SeriesLabels: labelpb.ZLabelSet{
Labels: labelpb.ZLabelsFromPromLabels(labelpb.ExtendSortedLabels(e.SeriesLabels, t.extLabels)),
Labels: labelpb.ZLabelsFromPromLabels(labelpb.ExtendSortedLabels(e.SeriesLabels, t.getExtLabels())),
},
Exemplars: exemplarspb.ExemplarsFromPromExemplars(e.Exemplars),
}
Expand Down
42 changes: 30 additions & 12 deletions pkg/receive/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ const (
// HashringConfig represents the configuration for a hashring
// a receive node knows about.
type HashringConfig struct {
Hashring string `json:"hashring,omitempty"`
Tenants []string `json:"tenants,omitempty"`
Endpoints []string `json:"endpoints"`
Algorithm HashringAlgorithm `json:"algorithm,omitempty"`
Hashring string `json:"hashring,omitempty"`
Tenants []string `json:"tenants,omitempty"`
Endpoints []string `json:"endpoints"`
Algorithm HashringAlgorithm `json:"algorithm,omitempty"`
ExternalLabels map[string]string `json:"external_labels,omitempty"`
}

// ConfigWatcher is able to watch a file containing a hashring configuration
Expand Down Expand Up @@ -255,14 +256,38 @@ func (cw *ConfigWatcher) refresh(ctx context.Context) {
}
}

func ConfigFromWatcher(ctx context.Context, updates chan<- []HashringConfig, cw *ConfigWatcher) error {
defer close(updates)
go cw.Run(ctx)

for {
select {
case cfg, ok := <-cw.C():
if !ok {
return errors.New("hashring config watcher stopped unexpectedly")
}
updates <- cfg
case <-ctx.Done():
return ctx.Err()
}
}
}

// ParseConfig parses the raw configuration content and returns a HashringConfig.
func ParseConfig(content []byte) ([]HashringConfig, error) {
var config []HashringConfig
err := json.Unmarshal(content, &config)
return config, err
}

// loadConfig loads raw configuration content and returns a configuration.
func loadConfig(logger log.Logger, path string) ([]HashringConfig, float64, error) {
cfgContent, err := readFile(logger, path)
if err != nil {
return nil, 0, errors.Wrap(err, "failed to read configuration file")
}

config, err := parseConfig(cfgContent)
config, err := ParseConfig(cfgContent)
if err != nil {
return nil, 0, errors.Wrapf(errParseConfigurationFile, "failed to parse configuration file: %v", err)
}
Expand Down Expand Up @@ -290,13 +315,6 @@ func readFile(logger log.Logger, path string) ([]byte, error) {
return io.ReadAll(fd)
}

// parseConfig parses the raw configuration content and returns a HashringConfig.
func parseConfig(content []byte) ([]HashringConfig, error) {
var config []HashringConfig
err := json.Unmarshal(content, &config)
return config, err
}

// hashAsMetricValue generates metric value from hash of data.
func hashAsMetricValue(data []byte) float64 {
sum := md5.Sum(data)
Expand Down
2 changes: 1 addition & 1 deletion pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uin
hashringAlgo = AlgorithmHashmod
}

hashring, err := newMultiHashring(hashringAlgo, replicationFactor, cfg)
hashring, err := NewMultiHashring(hashringAlgo, replicationFactor, cfg)
if err != nil {
return nil, nil, err
}
Expand Down
46 changes: 1 addition & 45 deletions pkg/receive/hashring.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package receive

import (
"context"
"fmt"
"sort"
"strconv"
Expand Down Expand Up @@ -239,7 +238,7 @@ func (m *multiHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st
// groups.
// Which hashring to use for a tenant is determined
// by the tenants field of the hashring configuration.
func newMultiHashring(algorithm HashringAlgorithm, replicationFactor uint64, cfg []HashringConfig) (Hashring, error) {
func NewMultiHashring(algorithm HashringAlgorithm, replicationFactor uint64, cfg []HashringConfig) (Hashring, error) {
m := &multiHashring{
cache: make(map[string]Hashring),
}
Expand Down Expand Up @@ -268,49 +267,6 @@ func newMultiHashring(algorithm HashringAlgorithm, replicationFactor uint64, cfg
return m, nil
}

// HashringFromConfigWatcher creates multi-tenant hashrings from a
// hashring configuration file watcher.
// The configuration file is watched for updates.
// Hashrings are returned on the updates channel.
// Which hashring to use for a tenant is determined
// by the tenants field of the hashring configuration.
// The updates chan is closed before exiting.
func HashringFromConfigWatcher(ctx context.Context, algorithm HashringAlgorithm, replicationFactor uint64, updates chan<- Hashring, cw *ConfigWatcher) error {
defer close(updates)
go cw.Run(ctx)

for {
select {
case cfg, ok := <-cw.C():
if !ok {
return errors.New("hashring config watcher stopped unexpectedly")
}
h, err := newMultiHashring(algorithm, replicationFactor, cfg)
if err != nil {
return errors.Wrap(err, "unable to create new hashring from config")
}
updates <- h
case <-ctx.Done():
return ctx.Err()
}
}
}

// HashringFromConfig loads raw configuration content and returns a Hashring if the given configuration is not valid.
func HashringFromConfig(algorithm HashringAlgorithm, replicationFactor uint64, content string) (Hashring, error) {
config, err := parseConfig([]byte(content))
if err != nil {
return nil, errors.Wrapf(err, "failed to parse configuration")
}

// If hashring is empty, return an error.
if len(config) == 0 {
return nil, errors.Wrapf(err, "failed to load configuration")
}

return newMultiHashring(algorithm, replicationFactor, config)
}

func newHashring(algorithm HashringAlgorithm, endpoints []string, replicationFactor uint64, hashring string, tenants []string) (Hashring, error) {
switch algorithm {
case AlgorithmHashmod:
Expand Down
2 changes: 1 addition & 1 deletion pkg/receive/hashring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func TestHashringGet(t *testing.T) {
},
},
} {
hs, err := newMultiHashring(AlgorithmHashmod, 3, tc.cfg)
hs, err := NewMultiHashring(AlgorithmHashmod, 3, tc.cfg)
require.NoError(t, err)

h, err := hs.Get(tc.tenant, ts)
Expand Down
Loading