Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingester lock tweaks to minimize headblock locking during search #3328

Merged
merged 9 commits into from
Jan 31, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* [BUGFIX] Change exit code if config is successfully verified [#3174](https://github.com/grafana/tempo/pull/3174) (@am3o @agrib-01)
* [BUGFIX] The tempo-cli analyse blocks command no longer fails on compacted blocks [#3183](https://github.com/grafana/tempo/pull/3183) (@stoewer)
* [BUGFIX] Move waitgroup handling for poller error condition [#3224](https://github.com/grafana/tempo/pull/3224) (@zalegrala)
* [BUGFIX] Fix head block excessive locking in ingester search [#3328](https://github.com/grafana/tempo/pull/3328) (@mdisibio)
* [ENHANCEMENT] Introduced `AttributePolicyMatch` & `IntrinsicPolicyMatch` structures to match span attributes based on strongly typed values & precompiled regexp [#3025](https://github.com/grafana/tempo/pull/3025) (@andriusluk)
* [CHANGE] TraceQL/Structural operators performance improvement. [#3088](https://github.com/grafana/tempo/pull/3088) (@joe-elliott)
* [CHANGE] Merge the processors overrides set through runtime overrides and user-configurable overrides [#3125](https://github.com/grafana/tempo/pull/3125) (@kvrhdn)
Expand Down
55 changes: 43 additions & 12 deletions modules/ingester/instance_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (i *instance) Search(ctx context.Context, req *tempopb.SearchRequest) (*tem

span.LogFields(ot_log.String("SearchRequest", req.String()))

sr := search.NewResults()
sr := search.NewResults(maxResults)
defer sr.Close() // signal all running workers to quit

// Lock headblock separately from other blocks and release it as soon as this
Expand Down Expand Up @@ -107,6 +107,13 @@ func (i *instance) searchBlock(ctx context.Context, req *tempopb.SearchRequest,
return
}

if sr.Quit() {
if cleanup != nil {
cleanup()
}
return
}

blockID := meta.BlockID

sr.StartWorker()
Expand Down Expand Up @@ -147,7 +154,9 @@ func (i *instance) searchBlock(ctx context.Context, req *tempopb.SearchRequest,
}

for _, t := range resp.Traces {
sr.AddResult(ctx, t)
if sr.AddResult(ctx, t) {
break
}
}
sr.AddBlockInspected()

Expand All @@ -157,6 +166,9 @@ func (i *instance) searchBlock(ctx context.Context, req *tempopb.SearchRequest,
}

func (i *instance) SearchTags(ctx context.Context, scope string) (*tempopb.SearchTagsResponse, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "instance.SearchTags")
defer span.Finish()

userID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
Expand All @@ -178,7 +190,10 @@ func (i *instance) SearchTags(ctx context.Context, scope string) (*tempopb.Searc
limit := i.limiter.limits.MaxBytesPerTagValuesQuery(userID)
distinctValues := util.NewDistinctStringCollector(limit)

search := func(s common.Searcher, dv *util.DistinctStringCollector) error {
search := func(ctx context.Context, s common.Searcher, dv *util.DistinctStringCollector, spanName string) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "instance.SearchTags."+spanName)
defer span.Finish()

if s == nil {
return nil
}
Expand All @@ -194,22 +209,24 @@ func (i *instance) SearchTags(ctx context.Context, scope string) (*tempopb.Searc
}

i.headBlockMtx.RLock()
err = search(i.headBlock, distinctValues)
span.LogFields(ot_log.String("msg", "acquired headblock mtx"))
err = search(ctx, i.headBlock, distinctValues, "headBlock")
i.headBlockMtx.RUnlock()
if err != nil {
return nil, fmt.Errorf("unexpected error searching head block (%s): %w", i.headBlock.BlockMeta().BlockID, err)
}

i.blocksMtx.RLock()
defer i.blocksMtx.RUnlock()
span.LogFields(ot_log.String("msg", "acquired blocks mtx"))

for _, b := range i.completingBlocks {
if err = search(b, distinctValues); err != nil {
if err = search(ctx, b, distinctValues, "completingBlock"); err != nil {
return nil, fmt.Errorf("unexpected error searching completing block (%s): %w", b.BlockMeta().BlockID, err)
}
}
for _, b := range i.completeBlocks {
if err = search(b, distinctValues); err != nil {
if err = search(ctx, b, distinctValues, "completeBlock"); err != nil {
return nil, fmt.Errorf("unexpected error searching complete block (%s): %w", b.BlockMeta().BlockID, err)
}
}
Expand All @@ -225,6 +242,9 @@ func (i *instance) SearchTags(ctx context.Context, scope string) (*tempopb.Searc

// SearchTagsV2 calls SearchTags for each scope and returns the results.
func (i *instance) SearchTagsV2(ctx context.Context, scope string) (*tempopb.SearchTagsV2Response, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "instance.SearchTagsV2")
defer span.Finish()

scopes := []string{scope}
if scope == "" {
// start with intrinsic scope and all traceql attribute scopes
Expand Down Expand Up @@ -346,6 +366,9 @@ func (i *instance) SearchTagValuesV2(ctx context.Context, req *tempopb.SearchTag
return nil, err
}

span, ctx := opentracing.StartSpanFromContext(ctx, "instance.SearchTagValuesV2")
defer span.Finish()

limit := i.limiter.limits.MaxBytesPerTagValuesQuery(userID)
valueCollector := util.NewDistinctValueCollector[tempopb.TagValue](limit, func(v tempopb.TagValue) int { return len(v.Type) + len(v.Value) })

Expand All @@ -366,11 +389,11 @@ func (i *instance) SearchTagValuesV2(ctx context.Context, req *tempopb.SearchTag

query := extractMatchers(req.Query)

var searchBlock func(common.Searcher) error
var searchBlock func(context.Context, common.Searcher) error
if !i.autocompleteFilteringEnabled || isEmptyQuery(query) {
// If filtering is disabled or query is empty,
// we can use the more efficient SearchTagValuesV2 method.
searchBlock = func(s common.Searcher) error {
searchBlock = func(ctx context.Context, s common.Searcher) error {
if anyErr.Load() != nil {
return nil // Early exit if any error has occurred
}
Expand All @@ -382,7 +405,7 @@ func (i *instance) SearchTagValuesV2(ctx context.Context, req *tempopb.SearchTag
return s.SearchTagValuesV2(ctx, tag, traceql.MakeCollectTagValueFunc(valueCollector.Collect), common.DefaultSearchOptions())
}
} else {
searchBlock = func(s common.Searcher) error {
searchBlock = func(ctx context.Context, s common.Searcher) error {
if anyErr.Load() != nil {
return nil // Early exit if any error has occurred
}
Expand All @@ -406,26 +429,32 @@ func (i *instance) SearchTagValuesV2(ctx context.Context, req *tempopb.SearchTag
// then headblockMtx. Even if the likelihood is low it is a statistical certainly
// that eventually a deadlock will occur.
i.headBlockMtx.RLock()
span.LogFields(ot_log.String("msg", "acquired headblock mtx"))
if i.headBlock != nil {
wg.Add(1)
go func() {
span, ctx := opentracing.StartSpanFromContext(ctx, "instance.SearchTagValuesV2.headBlock")
defer span.Finish()
defer i.headBlockMtx.RUnlock()
defer wg.Done()
if err := searchBlock(i.headBlock); err != nil {
if err := searchBlock(ctx, i.headBlock); err != nil {
anyErr.Store(fmt.Errorf("unexpected error searching head block (%s): %w", i.headBlock.BlockMeta().BlockID, err))
}
}()
}

i.blocksMtx.RLock()
defer i.blocksMtx.RUnlock()
span.LogFields(ot_log.String("msg", "acquired blocks mtx"))

// completed blocks
for _, b := range i.completeBlocks {
wg.Add(1)
go func(b *localBlock) {
span, ctx := opentracing.StartSpanFromContext(ctx, "instance.SearchTagValuesV2.completedBlock")
defer span.Finish()
defer wg.Done()
if err := searchBlock(b); err != nil {
if err := searchBlock(ctx, b); err != nil {
anyErr.Store(fmt.Errorf("unexpected error searching complete block (%s): %w", b.BlockMeta().BlockID, err))
}
}(b)
Expand All @@ -435,8 +464,10 @@ func (i *instance) SearchTagValuesV2(ctx context.Context, req *tempopb.SearchTag
for _, b := range i.completingBlocks {
wg.Add(1)
go func(b common.WALBlock) {
span, ctx := opentracing.StartSpanFromContext(ctx, "instance.SearchTagValuesV2.headBlock")
defer span.Finish()
defer wg.Done()
if err := searchBlock(b); err != nil {
if err := searchBlock(ctx, b); err != nil {
anyErr.Store(fmt.Errorf("unexpected error searching completing block (%s): %w", b.BlockMeta().BlockID, err))
}
}(b)
Expand Down
4 changes: 2 additions & 2 deletions pkg/search/results.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ type Results struct {
blocksInspected atomic.Uint32
}

func NewResults() *Results {
func NewResults(maxResults int) *Results {
return &Results{
resultsCh: make(chan *tempopb.TraceSearchMetadata),
resultsCh: make(chan *tempopb.TraceSearchMetadata, maxResults),
doneCh: make(chan struct{}),
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/search/results_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestResultsDoesNotRace(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

sr := NewResults()
sr := NewResults(1)
defer sr.Close()

workers := 10
Expand Down
2 changes: 1 addition & 1 deletion tempodb/encoding/vparquet3/wal_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,7 @@ func (b *walBlock) FetchTagValues(ctx context.Context, req traceql.AutocompleteR
if err != nil {
return fmt.Errorf("error opening file %s: %w", page.path, err)
}
defer file.Close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the defer inside the loop enough, or do we want to close it out each iteration through blockFlushes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct, it's inside the loop and all of them get closed at the end, which is not optimal. This func and several others are all following the same pattern, and I was aiming to have minimum changes necessary. But happy to make larger changes and defer as expected if we want.


pf := file.parquetFile

Expand All @@ -715,7 +716,6 @@ func (b *walBlock) FetchTagValues(ctx context.Context, req traceql.AutocompleteR
return fmt.Errorf("iterating spans in walBlock: %w", err)
}
if res == nil {
iter.Close()
break
}

Expand Down
Loading