Skip to content

Commit

Permalink
Add support for label matchers in LabelNames and LabelValues call to …
Browse files Browse the repository at this point in the history
…store. (#4107)

* Add matchers to LabelNamesRequest.

Signed-off-by: Peter Štibraný <[email protected]>

* Don't pass whole SeriesRequest, but only individual parts of it.

Signed-off-by: Peter Štibraný <[email protected]>

* Support series Matchers in LabelNames call.

Signed-off-by: Peter Štibraný <[email protected]>

* Support series Matchers in LabelValues call.

Signed-off-by: Peter Štibraný <[email protected]>

* CHANGELOG.md

Signed-off-by: Peter Štibraný <[email protected]>

* Handle result in a same way in LabelNames and LabelValues.

Signed-off-by: Peter Štibraný <[email protected]>

* Make lint happy.

Signed-off-by: Peter Štibraný <[email protected]>

* Specify that indexheader.Reader returns LabelNames in order, so we can use that.

Signed-off-by: Peter Štibraný <[email protected]>
  • Loading branch information
pstibrany authored Apr 27, 2021
1 parent b053f37 commit d5bd651
Show file tree
Hide file tree
Showing 7 changed files with 454 additions and 158 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re
## Unreleased

### Added
-

- [#4107](https://github.com/thanos-io/thanos/pull/4107) Store: `LabelNames` and `LabelValues` now support label matchers.


### Fixed
-
### Changed
Expand Down
2 changes: 1 addition & 1 deletion pkg/block/indexheader/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,6 @@ type Reader interface {
// then empty string is returned and no error.
LabelValues(name string) ([]string, error)

// LabelNames returns all label names.
// LabelNames returns all label names in sorted order.
LabelNames() ([]string, error)
}
199 changes: 144 additions & 55 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,8 @@ type BucketStore struct {

// chunksLimiterFactory creates a new limiter used to limit the number of chunks fetched by each Series() call.
chunksLimiterFactory ChunksLimiterFactory
// seriesLimiterFactory creates a new limiter used to limit the number of touched series by each Series() call.
// seriesLimiterFactory creates a new limiter used to limit the number of touched series by each Series() call,
// or LabelName and LabelValues calls when used with matchers.
seriesLimiterFactory SeriesLimiterFactory
partitioner Partitioner

Expand Down Expand Up @@ -746,14 +747,17 @@ func (s *bucketSeriesSet) Err() error {
return s.err
}

// blockSeries returns series matching given matchers, that have some data in given time range.
func blockSeries(
extLset labels.Labels,
indexr *bucketIndexReader,
chunkr *bucketChunkReader,
matchers []*labels.Matcher,
req *storepb.SeriesRequest,
chunksLimiter ChunksLimiter,
seriesLimiter SeriesLimiter,
extLset labels.Labels, // External labels added to the returned series labels.
indexr *bucketIndexReader, // Index reader for block.
chunkr *bucketChunkReader, // Chunk reader for block.
matchers []*labels.Matcher, // Series matchers.
chunksLimiter ChunksLimiter, // Rate limiter for loading chunks.
seriesLimiter SeriesLimiter, // Rate limiter for loading series.
skipChunks bool, // If true, chunks are not loaded.
minTime, maxTime int64, // Series must have data in this time range to be returned.
loadAggregates []storepb.Aggr, // List of aggregates to load when loading chunks.
) (storepb.SeriesSet, *queryStats, error) {
ps, err := indexr.ExpandedPostings(matchers)
if err != nil {
Expand Down Expand Up @@ -785,7 +789,7 @@ func blockSeries(
chks []chunks.Meta
)
for _, id := range ps {
ok, err := indexr.LoadSeriesForTime(id, &symbolizedLset, &chks, req.SkipChunks, req.MinTime, req.MaxTime)
ok, err := indexr.LoadSeriesForTime(id, &symbolizedLset, &chks, skipChunks, minTime, maxTime)
if err != nil {
return nil, nil, errors.Wrap(err, "read series")
}
Expand All @@ -795,7 +799,7 @@ func blockSeries(
}

s := seriesEntry{}
if !req.SkipChunks {
if !skipChunks {
// Schedule loading chunks.
s.refs = make([]uint64, 0, len(chks))
s.chks = make([]storepb.AggrChunk, 0, len(chks))
Expand Down Expand Up @@ -825,11 +829,11 @@ func blockSeries(
res = append(res, s)
}

if req.SkipChunks {
if skipChunks {
return newBucketSeriesSet(res), indexr.stats, nil
}

if err := chunkr.load(res, req.Aggregates); err != nil {
if err := chunkr.load(res, loadAggregates); err != nil {
return nil, nil, errors.Wrap(err, "load chunks")
}

Expand Down Expand Up @@ -1026,9 +1030,11 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
indexr,
chunkr,
blockMatchers,
req,
chunksLimiter,
seriesLimiter,
req.SkipChunks,
req.MinTime, req.MaxTime,
req.Aggregates,
)
if err != nil {
return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID)
Expand Down Expand Up @@ -1154,16 +1160,14 @@ func chunksSize(chks []storepb.AggrChunk) (size int) {

// LabelNames implements the storepb.StoreServer interface.
func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
resHints := &hintspb.LabelNamesResponseHints{}

g, gctx := errgroup.WithContext(ctx)
reqSeriesMatchers, err := storepb.MatchersToPromMatchers(req.Matchers...)
if err != nil {
return nil, status.Error(codes.InvalidArgument, errors.Wrap(err, "translate request labels matchers").Error())
}

s.mtx.RLock()
resHints := &hintspb.LabelNamesResponseHints{}

var mtx sync.Mutex
var sets [][]string
var reqBlockMatchers []*labels.Matcher

if req.Hints != nil {
reqHints := &hintspb.LabelNamesRequestHints{}
err := types.UnmarshalAny(req.Hints, reqHints)
Expand All @@ -1177,7 +1181,16 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
}
}

g, gctx := errgroup.WithContext(ctx)

s.mtx.RLock()

var mtx sync.Mutex
var sets [][]string
var seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series"))

for _, b := range s.blocks {
b := b
if !b.overlapsClosedInterval(req.Start, req.End) {
continue
}
Expand All @@ -1188,32 +1201,60 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
resHints.AddQueriedBlock(b.meta.ULID)

indexr := b.indexReader(gctx)
extLabels := b.meta.Thanos.Labels

g.Go(func() error {
defer runutil.CloseWithLogOnErr(s.logger, indexr, "label names")

// Do it via index reader to have pending reader registered correctly.
res, err := indexr.block.indexHeaderReader.LabelNames()
if err != nil {
return errors.Wrap(err, "label names")
}
var result []string
if len(reqSeriesMatchers) == 0 {
// Do it via index reader to have pending reader registered correctly.
// LabelNames are already sorted.
res, err := indexr.block.indexHeaderReader.LabelNames()
if err != nil {
return errors.Wrapf(err, "label names for block %s", b.meta.ULID)
}

// Add a set for the external labels as well.
// We're not adding them directly to res because there could be duplicates.
extRes := make([]string, 0, len(extLabels))
for lName := range extLabels {
extRes = append(extRes, lName)
}
// Add a set for the external labels as well.
// We're not adding them directly to res because there could be duplicates.
// b.extLset is already sorted by label name, no need to sort it again.
extRes := make([]string, 0, len(b.extLset))
for _, l := range b.extLset {
extRes = append(extRes, l.Name)
}

result = strutil.MergeSlices(res, extRes)
} else {
seriesSet, _, err := blockSeries(b.extLset, indexr, nil, reqSeriesMatchers, nil, seriesLimiter, true, req.Start, req.End, nil)
if err != nil {
return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID)
}

sort.Strings(res)
sort.Strings(extRes)
// Extract label names from all series. Many label names will be the same, so we need to deduplicate them.
// Note that label names will already include external labels (passed to blockSeries), so we don't need
// to add them again.
labelNames := map[string]struct{}{}
for seriesSet.Next() {
ls, _ := seriesSet.At()
for _, l := range ls {
labelNames[l.Name] = struct{}{}
}
}
if seriesSet.Err() != nil {
return errors.Wrapf(seriesSet.Err(), "iterate series for block %s", b.meta.ULID)
}

res = strutil.MergeSlices(res, extRes)
result = make([]string, 0, len(labelNames))
for n := range labelNames {
result = append(result, n)
}
sort.Strings(result)
}

mtx.Lock()
sets = append(sets, res)
mtx.Unlock()
if len(result) > 0 {
mtx.Lock()
sets = append(sets, result)
mtx.Unlock()
}

return nil
})
Expand All @@ -1238,16 +1279,16 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq

// LabelValues implements the storepb.StoreServer interface.
func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) {
reqSeriesMatchers, err := storepb.MatchersToPromMatchers(req.Matchers...)
if err != nil {
return nil, status.Error(codes.InvalidArgument, errors.Wrap(err, "translate request labels matchers").Error())
}

resHints := &hintspb.LabelValuesResponseHints{}

g, gctx := errgroup.WithContext(ctx)

s.mtx.RLock()

var mtx sync.Mutex
var sets [][]string
var reqBlockMatchers []*labels.Matcher

if req.Hints != nil {
reqHints := &hintspb.LabelValuesRequestHints{}
err := types.UnmarshalAny(req.Hints, reqHints)
Expand All @@ -1261,7 +1302,25 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
}
}

// If we have series matchers, add <labelName> != "" matcher, to only select series that have given label name.
if len(reqSeriesMatchers) > 0 {
m, err := labels.NewMatcher(labels.MatchNotEqual, req.Label, "")
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}

reqSeriesMatchers = append(reqSeriesMatchers, m)
}

s.mtx.RLock()

var mtx sync.Mutex
var sets [][]string
var seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series"))

for _, b := range s.blocks {
b := b

if !b.overlapsClosedInterval(req.Start, req.End) {
continue
}
Expand All @@ -1272,25 +1331,55 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
resHints.AddQueriedBlock(b.meta.ULID)

indexr := b.indexReader(gctx)
extLabels := b.meta.Thanos.Labels

g.Go(func() error {
defer runutil.CloseWithLogOnErr(s.logger, indexr, "label values")

// Do it via index reader to have pending reader registered correctly.
res, err := indexr.block.indexHeaderReader.LabelValues(req.Label)
if err != nil {
return errors.Wrap(err, "index header label values")
}
var result []string
if len(reqSeriesMatchers) == 0 {
// Do it via index reader to have pending reader registered correctly.
res, err := indexr.block.indexHeaderReader.LabelValues(req.Label)
if err != nil {
return errors.Wrapf(err, "index header label values for block %s", b.meta.ULID)
}

// Add the external label value as well.
if extLabelValue, ok := extLabels[req.Label]; ok {
res = strutil.MergeSlices(res, []string{extLabelValue})
// Add the external label value as well.
if extLabelValue := b.extLset.Get(req.Label); extLabelValue != "" {
res = strutil.MergeSlices(res, []string{extLabelValue})
}
result = res
} else {
seriesSet, _, err := blockSeries(b.extLset, indexr, nil, reqSeriesMatchers, nil, seriesLimiter, true, req.Start, req.End, nil)
if err != nil {
return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID)
}

// Extract given label's value from all series and deduplicate them.
// We don't need to deal with external labels, since they are already added by blockSeries.
values := map[string]struct{}{}
for seriesSet.Next() {
ls, _ := seriesSet.At()
val := ls.Get(req.Label)
if val != "" { // Should never be empty since we added labelName!="" matcher to the list of matchers.
values[val] = struct{}{}
}
}
if seriesSet.Err() != nil {
return errors.Wrapf(seriesSet.Err(), "iterate series for block %s", b.meta.ULID)
}

result = make([]string, 0, len(values))
for n := range values {
result = append(result, n)
}
sort.Strings(result)
}

mtx.Lock()
sets = append(sets, res)
mtx.Unlock()
if len(result) > 0 {
mtx.Lock()
sets = append(sets, result)
mtx.Unlock()
}

return nil
})
Expand Down
Loading

0 comments on commit d5bd651

Please sign in to comment.