From b9450d71e83e70243b13dfdeb0c170d45c929e56 Mon Sep 17 00:00:00 2001 From: Suraj Nath <9503187+electron0zero@users.noreply.github.com> Date: Mon, 23 Sep 2024 20:45:11 +0530 Subject: [PATCH] cleanup comments and TODOs --- modules/querier/querier.go | 76 ++++++++++++++------------------------ 1 file changed, 28 insertions(+), 48 deletions(-) diff --git a/modules/querier/querier.go b/modules/querier/querier.go index dd888367b1c..5158021b5a5 100644 --- a/modules/querier/querier.go +++ b/modules/querier/querier.go @@ -21,6 +21,7 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" oteltrace "go.opentelemetry.io/otel/trace" + "go.uber.org/atomic" "go.uber.org/multierr" "golang.org/x/sync/semaphore" @@ -218,9 +219,6 @@ func (q *Querier) stopping(_ error) error { return nil } -// FIXME: most of the methods in this file are not tested, -// and should be tested to make sure we don't break them. - // FindTraceByID implements tempopb.Querier. func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDRequest, timeStart int64, timeEnd int64) (*tempopb.TraceByIDResponse, error) { if !validation.ValidTraceID(req.TraceID) { @@ -240,8 +238,6 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque maxBytes := q.limits.MaxBytesPerTrace(userID) combiner := trace.NewCombiner(maxBytes, req.AllowPartialTrace) - // FIXME: do we need this? if yes, need to use the atomic counters here? - // var spanCount, spanCountTotal, traceCountTotal int if req.QueryMode == QueryModeIngesters || req.QueryMode == QueryModeAll { var getRSFn replicationSetFn if q.cfg.QueryRelevantIngesters { @@ -250,6 +246,8 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque return r.Get(traceKey, ring.Read, nil, nil, nil) } } + var spanCountTotal, traceCountTotal atomic.Int64 + var found atomic.Bool // get responses from all ingesters in parallel span.AddEvent("searching ingesters") @@ -260,43 +258,25 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque } t := resp.Trace if t != nil { - // spanCount is returned by Consume, maybe - _, err = combiner.Consume(t) + // we found a trace, consume and count it + spanCount, err := combiner.Consume(t) if err != nil { return err } + spanCountTotal.Add(int64(spanCount)) + traceCountTotal.Inc() + found.Store(true) } - - // FIXME: do we need this? counters are only used for logging - // spanCountTotal += spanCount - // traceCountTotal++ - // found = true return nil } - err := q.forIngesterRings(ctx, userID, getRSFn, forEach) if err != nil { return nil, fmt.Errorf("error querying ingesters in Querier.FindTraceByID: %w", err) } - - // found := false - // for _, r := range responses { - // t := r.response.(*tempopb.TraceByIDResponse).Trace - // if t != nil { - // spanCount, err = combiner.Consume(t) - // if err != nil { - // return nil, err - // } - // - // spanCountTotal += spanCount - // traceCountTotal++ - // found = true - // } - // } - // span.AddEvent("done searching ingesters", oteltrace.WithAttributes( - // attribute.Bool("found", found), - // attribute.Int("combinedSpans", spanCountTotal), - // attribute.Int("combinedTraces", traceCountTotal))) + span.AddEvent("done searching ingesters", oteltrace.WithAttributes( + attribute.Bool("found", found.Load()), + attribute.Int64("combinedSpans", spanCountTotal.Load()), + attribute.Int64("combinedTraces", traceCountTotal.Load()))) } if req.QueryMode == QueryModeBlocks || req.QueryMode == QueryModeAll { @@ -344,7 +324,6 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque } type ( - // this is called forEachRespons? collect in this func? forEachFn func(ctx context.Context, client tempopb.QuerierClient) error replicationSetFn func(r ring.ReadRing) (ring.ReplicationSet, error) ) @@ -361,7 +340,7 @@ func (q *Querier) forIngesterRings(ctx context.Context, userID string, getReplic return errors.New("forIngesterRings: no ingester rings configured") } - // if a nil replicationsetfn is passed, that means to just use a standard readring + // if a nil replicationSetFn is passed, that means to just use a standard Read ring if getReplicationSet == nil { getReplicationSet = func(r ring.ReadRing) (ring.ReplicationSet, error) { return r.GetReplicationSetForOperation(ring.Read) @@ -373,9 +352,9 @@ func (q *Querier) forIngesterRings(ctx context.Context, userID string, getReplic var responseErr error - for i, ring := range q.ingesterRings { + for i, ingesterRing := range q.ingesterRings { if q.cfg.ShuffleShardingIngestersEnabled { - ring = ring.ShuffleShardWithLookback( + ingesterRing = ingesterRing.ShuffleShardWithLookback( userID, q.limits.IngestionTenantShardSize(userID), q.cfg.ShuffleShardingIngestersLookbackPeriod, @@ -383,7 +362,7 @@ func (q *Querier) forIngesterRings(ctx context.Context, userID string, getReplic ) } - replicationSet, err := getReplicationSet(ring) + replicationSet, err := getReplicationSet(ingesterRing) if err != nil { return fmt.Errorf("forIngesterRings: error getting replication set for ring (%d): %w", i, err) } @@ -393,10 +372,10 @@ func (q *Querier) forIngesterRings(ctx context.Context, userID string, getReplic go func() { defer wg.Done() err := forOneIngesterRing(ctx, replicationSet, f, pool, q.cfg.ExtraQueryDelay) + mtx.Lock() + defer mtx.Unlock() + if err != nil { - // this response is generic? - mtx.Lock() - mtx.Unlock() responseErr = multierr.Combine(responseErr, err) return } @@ -437,8 +416,9 @@ func forOneIngesterRing(ctx context.Context, replicationSet ring.ReplicationSet, return nil, nil } - // ignore response because it's nil, and we are using a collector inside forEachFn - // need to return nil because doFunc expects us to return a response + // ignore response because it's nil, and we are using a collector inside forEachFn to + // collect the actual response. we need to return nil here and ignore it + // because doFunc expects us to return a response _, err := replicationSet.Do(ctx, extraQueryDelay, doFunc) return err @@ -569,7 +549,8 @@ func (q *Querier) SearchTags(ctx context.Context, req *tempopb.SearchTagsRequest return err } for _, tag := range resp.TagNames { - if distinctValues.Collect(tag) { + distinctValues.Collect(tag) + if distinctValues.Exceeded() { break // stop early } } @@ -719,7 +700,8 @@ func (q *Querier) SearchTagValuesV2(ctx context.Context, req *tempopb.SearchTagV return err } for _, res := range resp.TagValues { - if distinctValues.Collect(*res) { + distinctValues.Collect(*res) + if distinctValues.Exceeded() { break // stop early } } @@ -762,7 +744,6 @@ func (q *Querier) SpanMetricsSummary( return nil, fmt.Errorf("error finding generators in Querier.SpanMetricsSummary: %w", err) } - // TODO: we can pass the collection down to the forEach function here as well lookupResults, err := q.forGivenGenerators( ctx, replicationSet, @@ -774,7 +755,6 @@ func (q *Querier) SpanMetricsSummary( return nil, fmt.Errorf("error querying generators in Querier.SpanMetricsSummary: %w", err) } - // TODO: we use a collector here as well instead of // Assemble the results from the generators in the pool results := make([]*tempopb.SpanMetricsResponse, 0, len(lookupResults)) for _, result := range lookupResults { @@ -1124,14 +1104,14 @@ func (q *Querier) internalTagValuesSearchBlockV2(ctx context.Context, req *tempo return valuesToV2Response(valueCollector), nil } -func (q *Querier) postProcessIngesterSearchResults(req *tempopb.SearchRequest, rr []*tempopb.SearchResponse) *tempopb.SearchResponse { +func (q *Querier) postProcessIngesterSearchResults(req *tempopb.SearchRequest, collResp []*tempopb.SearchResponse) *tempopb.SearchResponse { response := &tempopb.SearchResponse{ Metrics: &tempopb.SearchMetrics{}, } traces := map[string]*tempopb.TraceSearchMetadata{} - for _, sr := range rr { + for _, sr := range collResp { for _, t := range sr.Traces { // Just simply take first result for each trace if _, ok := traces[t.TraceID]; !ok {