Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingester lock tweaks to minimize headblock locking during search #3328

Merged
merged 9 commits into from
Jan 31, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* [BUGFIX] Change exit code if config is successfully verified [#3174](https://github.com/grafana/tempo/pull/3174) (@am3o @agrib-01)
* [BUGFIX] The tempo-cli analyse blocks command no longer fails on compacted blocks [#3183](https://github.com/grafana/tempo/pull/3183) (@stoewer)
* [BUGFIX] Move waitgroup handling for poller error condition [#3224](https://github.com/grafana/tempo/pull/3224) (@zalegrala)
* [BUGFIX] Fix head block excessive locking in ingester search [#3328](https://github.com/grafana/tempo/pull/3328) (@mdisibio)
* [ENHANCEMENT] Introduced `AttributePolicyMatch` & `IntrinsicPolicyMatch` structures to match span attributes based on strongly typed values & precompiled regexp [#3025](https://github.com/grafana/tempo/pull/3025) (@andriusluk)
* [CHANGE] TraceQL/Structural operators performance improvement. [#3088](https://github.com/grafana/tempo/pull/3088) (@joe-elliott)
* [CHANGE] Merge the processors overrides set through runtime overrides and user-configurable overrides [#3125](https://github.com/grafana/tempo/pull/3125) (@kvrhdn)
Expand Down
222 changes: 126 additions & 96 deletions modules/ingester/instance_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"
"sync"

"github.com/google/uuid"
ot_log "github.com/opentracing/opentracing-go/log"
"go.uber.org/atomic"

Expand Down Expand Up @@ -40,83 +41,16 @@ func (i *instance) Search(ctx context.Context, req *tempopb.SearchRequest) (*tem

span.LogFields(ot_log.String("SearchRequest", req.String()))

sr := search.NewResults()
defer sr.Close() // signal all running workers to quit
var (
resultsMtx = sync.Mutex{}
combiner = traceql.NewMetadataCombiner()
metrics = &tempopb.SearchMetrics{}
opts = common.DefaultSearchOptions()
anyErr atomic.Error
)

// Lock headblock separately from other blocks and release it as soon as this
// subtask is finished.
// A warning about deadlocks!! This area does a hard-acquire of both mutexes.
// To avoid deadlocks this function and all others must acquire them in
// the ** same_order ** or else!!! i.e. another function can't acquire blocksMtx
// then headblockMtx. Even if the likelihood is low it is a statistical certainly
// that eventually a deadlock will occur.
i.headBlockMtx.RLock()
i.searchBlock(ctx, req, sr, i.headBlock.BlockMeta(), i.headBlock, i.headBlockMtx.RUnlock)

// Lock blocks mutex until all search tasks are finished and this function exists. This avoids
// deadlocking with other activity (ingest, flushing), caused by releasing
// and then attempting to retake the lock.
i.blocksMtx.RLock()
defer i.blocksMtx.RUnlock()

for _, b := range i.completingBlocks {
i.searchBlock(ctx, req, sr, b.BlockMeta(), b, nil)
}

for _, b := range i.completeBlocks {
i.searchBlock(ctx, req, sr, b.BlockMeta(), b, nil)
}

sr.AllWorkersStarted()

// read and combine search results
combiner := traceql.NewMetadataCombiner()

// collect results from all the goroutines via sr.Results channel.
// range loop will exit when sr.Results channel is closed.
for result := range sr.Results() {
if combiner.Count() >= maxResults {
sr.Close() // signal pending workers to exit
continue
}

combiner.AddMetadata(result)
}

if sr.Error() != nil {
return nil, sr.Error()
}

return &tempopb.SearchResponse{
Traces: combiner.Metadata(),
Metrics: &tempopb.SearchMetrics{
InspectedTraces: sr.TracesInspected(),
InspectedBytes: sr.BytesInspected(),
},
}, nil
}

// searchBlock starts a search task for the given block. The block must already be under lock,
// and this method calls cleanup to unlock the block when done.
func (i *instance) searchBlock(ctx context.Context, req *tempopb.SearchRequest, sr *search.Results, meta *backend.BlockMeta, block common.Searcher, cleanup func()) {
// confirm block should be included in search
if !includeBlock(meta, req) {
if cleanup != nil {
cleanup()
}
return
}

blockID := meta.BlockID

sr.StartWorker()
go func(e common.Searcher, cleanup func()) {
if cleanup != nil {
defer cleanup()
}
defer sr.FinishWorker()

span, ctx := opentracing.StartSpanFromContext(ctx, "instance.searchBlock")
search := func(blockID uuid.UUID, block common.Searcher, spanName string) {
span, ctx := opentracing.StartSpanFromContext(ctx, "instance.searchBlock."+spanName)
defer span.Finish()

span.LogFields(ot_log.Event("block entry mtx acquired"))
Expand All @@ -125,15 +59,14 @@ func (i *instance) searchBlock(ctx context.Context, req *tempopb.SearchRequest,
var resp *tempopb.SearchResponse
var err error

opts := common.DefaultSearchOptions()
if api.IsTraceQLQuery(req) {
// note: we are creating new engine for each wal block,
// and engine.ExecuteSearch is parsing the query for each block
resp, err = traceql.NewEngine().ExecuteSearch(ctx, req, traceql.NewSpansetFetcherWrapper(func(ctx context.Context, req traceql.FetchSpansRequest) (traceql.FetchSpansResponse, error) {
return e.Fetch(ctx, req, opts)
return block.Fetch(ctx, req, opts)
}))
} else {
resp, err = e.Search(ctx, req, opts)
resp, err = block.Search(ctx, req, opts)
}

if errors.Is(err, common.ErrUnsupported) {
Expand All @@ -142,21 +75,99 @@ func (i *instance) searchBlock(ctx context.Context, req *tempopb.SearchRequest,
}
if err != nil {
level.Error(log.Logger).Log("msg", "error searching block", "blockID", blockID, "err", err)
sr.SetError(err)
anyErr.Store(err)
return
}

for _, t := range resp.Traces {
sr.AddResult(ctx, t)
if resp == nil {
return
}
sr.AddBlockInspected()

sr.AddBytesInspected(resp.Metrics.InspectedBytes)
sr.AddTraceInspected(resp.Metrics.InspectedTraces)
}(block, cleanup)
resultsMtx.Lock()
defer resultsMtx.Unlock()

if resp.Metrics != nil {
metrics.InspectedTraces += resp.Metrics.InspectedTraces
metrics.InspectedBytes += resp.Metrics.InspectedBytes
}

for _, tr := range resp.Traces {
combiner.AddMetadata(tr)
if combiner.Count() >= maxResults {
return
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

// Search headblock (synchronously)
// Lock headblock separately from other blocks and release it as quickly as possible.
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved
// A warning about deadlocks!! This area does a hard-acquire of both mutexes.
// To avoid deadlocks this function and all others must acquire them in
// the ** same_order ** or else!!! i.e. another function can't acquire blocksMtx
// then headblockMtx. Even if the likelihood is low it is a statistical certainly
// that eventually a deadlock will occur.
i.headBlockMtx.RLock()
if includeBlock(i.headBlock.BlockMeta(), req) {
search(i.headBlock.BlockMeta().BlockID, i.headBlock, "headBlock")
}
i.headBlockMtx.RUnlock()
if err := anyErr.Load(); err != nil {
return nil, err
}
if combiner.Count() >= maxResults {
return &tempopb.SearchResponse{
Traces: combiner.Metadata(),
Metrics: metrics,
}, nil
}

// Search all other blocks (concurrently)
// Lock blocks mutex until all search tasks are finished and this function exists. This avoids
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved
// deadlocking with other activity (ingest, flushing), caused by releasing
// and then attempting to retake the lock.
i.blocksMtx.RLock()
defer i.blocksMtx.RUnlock()

wg := sync.WaitGroup{}

for _, b := range i.completingBlocks {
if !includeBlock(b.BlockMeta(), req) {
continue
}

wg.Add(1)
go func(b common.WALBlock) {
defer wg.Done()
search(b.BlockMeta().BlockID, b, "completingBlock")
}(b)
}

for _, b := range i.completeBlocks {
if !includeBlock(b.BlockMeta(), req) {
continue
}
wg.Add(1)
go func(b *localBlock) {
defer wg.Done()
search(b.BlockMeta().BlockID, b, "completeBlock")
}(b)
}

wg.Wait()

if err := anyErr.Load(); err != nil {
return nil, err
}
return &tempopb.SearchResponse{
Traces: combiner.Metadata(),
Metrics: metrics,
}, nil
}

func (i *instance) SearchTags(ctx context.Context, scope string) (*tempopb.SearchTagsResponse, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "instance.SearchTags")
defer span.Finish()

userID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
Expand All @@ -178,7 +189,10 @@ func (i *instance) SearchTags(ctx context.Context, scope string) (*tempopb.Searc
limit := i.limiter.limits.MaxBytesPerTagValuesQuery(userID)
distinctValues := util.NewDistinctStringCollector(limit)

search := func(s common.Searcher, dv *util.DistinctStringCollector) error {
search := func(ctx context.Context, s common.Searcher, dv *util.DistinctStringCollector, spanName string) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "instance.SearchTags."+spanName)
defer span.Finish()

if s == nil {
return nil
}
Expand All @@ -194,22 +208,24 @@ func (i *instance) SearchTags(ctx context.Context, scope string) (*tempopb.Searc
}

i.headBlockMtx.RLock()
err = search(i.headBlock, distinctValues)
span.LogFields(ot_log.String("msg", "acquired headblock mtx"))
err = search(ctx, i.headBlock, distinctValues, "headBlock")
i.headBlockMtx.RUnlock()
if err != nil {
return nil, fmt.Errorf("unexpected error searching head block (%s): %w", i.headBlock.BlockMeta().BlockID, err)
}

i.blocksMtx.RLock()
defer i.blocksMtx.RUnlock()
span.LogFields(ot_log.String("msg", "acquired blocks mtx"))

for _, b := range i.completingBlocks {
if err = search(b, distinctValues); err != nil {
if err = search(ctx, b, distinctValues, "completingBlock"); err != nil {
return nil, fmt.Errorf("unexpected error searching completing block (%s): %w", b.BlockMeta().BlockID, err)
}
}
for _, b := range i.completeBlocks {
if err = search(b, distinctValues); err != nil {
if err = search(ctx, b, distinctValues, "completeBlock"); err != nil {
return nil, fmt.Errorf("unexpected error searching complete block (%s): %w", b.BlockMeta().BlockID, err)
}
}
Expand All @@ -225,6 +241,9 @@ func (i *instance) SearchTags(ctx context.Context, scope string) (*tempopb.Searc

// SearchTagsV2 calls SearchTags for each scope and returns the results.
func (i *instance) SearchTagsV2(ctx context.Context, scope string) (*tempopb.SearchTagsV2Response, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "instance.SearchTagsV2")
defer span.Finish()

scopes := []string{scope}
if scope == "" {
// start with intrinsic scope and all traceql attribute scopes
Expand Down Expand Up @@ -346,6 +365,9 @@ func (i *instance) SearchTagValuesV2(ctx context.Context, req *tempopb.SearchTag
return nil, err
}

span, ctx := opentracing.StartSpanFromContext(ctx, "instance.SearchTagValuesV2")
defer span.Finish()

limit := i.limiter.limits.MaxBytesPerTagValuesQuery(userID)
valueCollector := util.NewDistinctValueCollector[tempopb.TagValue](limit, func(v tempopb.TagValue) int { return len(v.Type) + len(v.Value) })

Expand All @@ -366,11 +388,11 @@ func (i *instance) SearchTagValuesV2(ctx context.Context, req *tempopb.SearchTag

query := extractMatchers(req.Query)

var searchBlock func(common.Searcher) error
var searchBlock func(context.Context, common.Searcher) error
if !i.autocompleteFilteringEnabled || isEmptyQuery(query) {
// If filtering is disabled or query is empty,
// we can use the more efficient SearchTagValuesV2 method.
searchBlock = func(s common.Searcher) error {
searchBlock = func(ctx context.Context, s common.Searcher) error {
if anyErr.Load() != nil {
return nil // Early exit if any error has occurred
}
Expand All @@ -382,7 +404,7 @@ func (i *instance) SearchTagValuesV2(ctx context.Context, req *tempopb.SearchTag
return s.SearchTagValuesV2(ctx, tag, traceql.MakeCollectTagValueFunc(valueCollector.Collect), common.DefaultSearchOptions())
}
} else {
searchBlock = func(s common.Searcher) error {
searchBlock = func(ctx context.Context, s common.Searcher) error {
if anyErr.Load() != nil {
return nil // Early exit if any error has occurred
}
Expand All @@ -406,26 +428,32 @@ func (i *instance) SearchTagValuesV2(ctx context.Context, req *tempopb.SearchTag
// then headblockMtx. Even if the likelihood is low it is a statistical certainly
// that eventually a deadlock will occur.
i.headBlockMtx.RLock()
span.LogFields(ot_log.String("msg", "acquired headblock mtx"))
if i.headBlock != nil {
wg.Add(1)
go func() {
span, ctx := opentracing.StartSpanFromContext(ctx, "instance.SearchTagValuesV2.headBlock")
defer span.Finish()
defer i.headBlockMtx.RUnlock()
defer wg.Done()
if err := searchBlock(i.headBlock); err != nil {
if err := searchBlock(ctx, i.headBlock); err != nil {
anyErr.Store(fmt.Errorf("unexpected error searching head block (%s): %w", i.headBlock.BlockMeta().BlockID, err))
}
}()
}

i.blocksMtx.RLock()
defer i.blocksMtx.RUnlock()
span.LogFields(ot_log.String("msg", "acquired blocks mtx"))

// completed blocks
for _, b := range i.completeBlocks {
wg.Add(1)
go func(b *localBlock) {
span, ctx := opentracing.StartSpanFromContext(ctx, "instance.SearchTagValuesV2.completedBlock")
defer span.Finish()
defer wg.Done()
if err := searchBlock(b); err != nil {
if err := searchBlock(ctx, b); err != nil {
anyErr.Store(fmt.Errorf("unexpected error searching complete block (%s): %w", b.BlockMeta().BlockID, err))
}
}(b)
Expand All @@ -435,8 +463,10 @@ func (i *instance) SearchTagValuesV2(ctx context.Context, req *tempopb.SearchTag
for _, b := range i.completingBlocks {
wg.Add(1)
go func(b common.WALBlock) {
span, ctx := opentracing.StartSpanFromContext(ctx, "instance.SearchTagValuesV2.headBlock")
defer span.Finish()
defer wg.Done()
if err := searchBlock(b); err != nil {
if err := searchBlock(ctx, b); err != nil {
anyErr.Store(fmt.Errorf("unexpected error searching completing block (%s): %w", b.BlockMeta().BlockID, err))
}
}(b)
Expand Down
Loading