Skip to content

Commit

Permalink
cleanup comments and TODOs
Browse files Browse the repository at this point in the history
  • Loading branch information
electron0zero committed Sep 23, 2024
1 parent fb5e974 commit b9450d7
Showing 1 changed file with 28 additions and 48 deletions.
76 changes: 28 additions & 48 deletions modules/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
oteltrace "go.opentelemetry.io/otel/trace"
"go.uber.org/atomic"
"go.uber.org/multierr"
"golang.org/x/sync/semaphore"

Expand Down Expand Up @@ -218,9 +219,6 @@ func (q *Querier) stopping(_ error) error {
return nil
}

// FIXME: most of the methods in this file are not tested,
// and should be tested to make sure we don't break them.

// FindTraceByID implements tempopb.Querier.
func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDRequest, timeStart int64, timeEnd int64) (*tempopb.TraceByIDResponse, error) {
if !validation.ValidTraceID(req.TraceID) {
Expand All @@ -240,8 +238,6 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque
maxBytes := q.limits.MaxBytesPerTrace(userID)
combiner := trace.NewCombiner(maxBytes, req.AllowPartialTrace)

// FIXME: do we need this? if yes, need to use the atomic counters here?
// var spanCount, spanCountTotal, traceCountTotal int
if req.QueryMode == QueryModeIngesters || req.QueryMode == QueryModeAll {
var getRSFn replicationSetFn
if q.cfg.QueryRelevantIngesters {
Expand All @@ -250,6 +246,8 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque
return r.Get(traceKey, ring.Read, nil, nil, nil)
}
}
var spanCountTotal, traceCountTotal atomic.Int64
var found atomic.Bool

// get responses from all ingesters in parallel
span.AddEvent("searching ingesters")
Expand All @@ -260,43 +258,25 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque
}
t := resp.Trace
if t != nil {
// spanCount is returned by Consume, maybe
_, err = combiner.Consume(t)
// we found a trace, consume and count it
spanCount, err := combiner.Consume(t)
if err != nil {
return err
}
spanCountTotal.Add(int64(spanCount))
traceCountTotal.Inc()
found.Store(true)
}

// FIXME: do we need this? counters are only used for logging
// spanCountTotal += spanCount
// traceCountTotal++
// found = true
return nil
}

err := q.forIngesterRings(ctx, userID, getRSFn, forEach)
if err != nil {
return nil, fmt.Errorf("error querying ingesters in Querier.FindTraceByID: %w", err)
}

// found := false
// for _, r := range responses {
// t := r.response.(*tempopb.TraceByIDResponse).Trace
// if t != nil {
// spanCount, err = combiner.Consume(t)
// if err != nil {
// return nil, err
// }
//
// spanCountTotal += spanCount
// traceCountTotal++
// found = true
// }
// }
// span.AddEvent("done searching ingesters", oteltrace.WithAttributes(
// attribute.Bool("found", found),
// attribute.Int("combinedSpans", spanCountTotal),
// attribute.Int("combinedTraces", traceCountTotal)))
span.AddEvent("done searching ingesters", oteltrace.WithAttributes(
attribute.Bool("found", found.Load()),
attribute.Int64("combinedSpans", spanCountTotal.Load()),
attribute.Int64("combinedTraces", traceCountTotal.Load())))
}

if req.QueryMode == QueryModeBlocks || req.QueryMode == QueryModeAll {
Expand Down Expand Up @@ -344,7 +324,6 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque
}

type (
// this is called forEachRespons? collect in this func?
forEachFn func(ctx context.Context, client tempopb.QuerierClient) error
replicationSetFn func(r ring.ReadRing) (ring.ReplicationSet, error)
)
Expand All @@ -361,7 +340,7 @@ func (q *Querier) forIngesterRings(ctx context.Context, userID string, getReplic
return errors.New("forIngesterRings: no ingester rings configured")
}

// if a nil replicationsetfn is passed, that means to just use a standard readring
// if a nil replicationSetFn is passed, that means to just use a standard Read ring
if getReplicationSet == nil {
getReplicationSet = func(r ring.ReadRing) (ring.ReplicationSet, error) {
return r.GetReplicationSetForOperation(ring.Read)
Expand All @@ -373,17 +352,17 @@ func (q *Querier) forIngesterRings(ctx context.Context, userID string, getReplic

var responseErr error

for i, ring := range q.ingesterRings {
for i, ingesterRing := range q.ingesterRings {
if q.cfg.ShuffleShardingIngestersEnabled {
ring = ring.ShuffleShardWithLookback(
ingesterRing = ingesterRing.ShuffleShardWithLookback(
userID,
q.limits.IngestionTenantShardSize(userID),
q.cfg.ShuffleShardingIngestersLookbackPeriod,
time.Now(),
)
}

replicationSet, err := getReplicationSet(ring)
replicationSet, err := getReplicationSet(ingesterRing)
if err != nil {
return fmt.Errorf("forIngesterRings: error getting replication set for ring (%d): %w", i, err)
}
Expand All @@ -393,10 +372,10 @@ func (q *Querier) forIngesterRings(ctx context.Context, userID string, getReplic
go func() {
defer wg.Done()
err := forOneIngesterRing(ctx, replicationSet, f, pool, q.cfg.ExtraQueryDelay)
mtx.Lock()
defer mtx.Unlock()

if err != nil {
// this response is generic?
mtx.Lock()
mtx.Unlock()
responseErr = multierr.Combine(responseErr, err)
return
}
Expand Down Expand Up @@ -437,8 +416,9 @@ func forOneIngesterRing(ctx context.Context, replicationSet ring.ReplicationSet,
return nil, nil
}

// ignore response because it's nil, and we are using a collector inside forEachFn
// need to return nil because doFunc expects us to return a response
// ignore response because it's nil, and we are using a collector inside forEachFn to
// collect the actual response. we need to return nil here and ignore it
// because doFunc expects us to return a response
_, err := replicationSet.Do(ctx, extraQueryDelay, doFunc)

return err
Expand Down Expand Up @@ -569,7 +549,8 @@ func (q *Querier) SearchTags(ctx context.Context, req *tempopb.SearchTagsRequest
return err
}
for _, tag := range resp.TagNames {
if distinctValues.Collect(tag) {
distinctValues.Collect(tag)
if distinctValues.Exceeded() {
break // stop early
}
}
Expand Down Expand Up @@ -719,7 +700,8 @@ func (q *Querier) SearchTagValuesV2(ctx context.Context, req *tempopb.SearchTagV
return err
}
for _, res := range resp.TagValues {
if distinctValues.Collect(*res) {
distinctValues.Collect(*res)
if distinctValues.Exceeded() {
break // stop early
}
}
Expand Down Expand Up @@ -762,7 +744,6 @@ func (q *Querier) SpanMetricsSummary(
return nil, fmt.Errorf("error finding generators in Querier.SpanMetricsSummary: %w", err)
}

// TODO: we can pass the collection down to the forEach function here as well
lookupResults, err := q.forGivenGenerators(
ctx,
replicationSet,
Expand All @@ -774,7 +755,6 @@ func (q *Querier) SpanMetricsSummary(
return nil, fmt.Errorf("error querying generators in Querier.SpanMetricsSummary: %w", err)
}

// TODO: we use a collector here as well instead of
// Assemble the results from the generators in the pool
results := make([]*tempopb.SpanMetricsResponse, 0, len(lookupResults))
for _, result := range lookupResults {
Expand Down Expand Up @@ -1124,14 +1104,14 @@ func (q *Querier) internalTagValuesSearchBlockV2(ctx context.Context, req *tempo
return valuesToV2Response(valueCollector), nil
}

func (q *Querier) postProcessIngesterSearchResults(req *tempopb.SearchRequest, rr []*tempopb.SearchResponse) *tempopb.SearchResponse {
func (q *Querier) postProcessIngesterSearchResults(req *tempopb.SearchRequest, collResp []*tempopb.SearchResponse) *tempopb.SearchResponse {
response := &tempopb.SearchResponse{
Metrics: &tempopb.SearchMetrics{},
}

traces := map[string]*tempopb.TraceSearchMetadata{}

for _, sr := range rr {
for _, sr := range collResp {
for _, t := range sr.Traces {
// Just simply take first result for each trace
if _, ok := traces[t.TraceID]; !ok {
Expand Down

0 comments on commit b9450d7

Please sign in to comment.