Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
electron0zero committed Sep 20, 2024
1 parent 561f345 commit 708508f
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 137 deletions.
4 changes: 0 additions & 4 deletions modules/frontend/combiner/search_tag_values.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,6 @@ func NewSearchTagValuesV2(limitBytes int) Combiner {
return final, nil
},
quit: func(_ *tempopb.SearchTagValuesV2Response) bool {
// this has now a changed behavior, it will return true once we have hit the limit of the distinct collector
// instead of the value?
// so either we need to set a flag when we hit the limit or
// we can let it collect a little longer and then return the values
return d.Exceeded()
},
diff: func(response *tempopb.SearchTagValuesV2Response) (*tempopb.SearchTagValuesV2Response, error) {
Expand Down
1 change: 0 additions & 1 deletion modules/ingester/instance_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,6 @@ func (i *instance) SearchTagValuesV2(ctx context.Context, req *tempopb.SearchTag
return nil, err
}

// do we need to check this? this now has a difference meaning
if valueCollector.Exceeded() {
level.Warn(log.Logger).Log("msg", "size of tag values in instance exceeded limit, reduce cardinality or size of tags", "tag", req.TagName, "userID", userID, "limit", limit, "total", valueCollector.TotalDataSize())
}
Expand Down
82 changes: 12 additions & 70 deletions modules/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,22 +83,11 @@ type Querier struct {
subservicesWatcher *services.FailureWatcher
}

type responseFromIngesters struct {
addr string
response interface{}
}

type responseFromGenerators struct {
addr string
response interface{}
}

// FIXME: make this typed via golang generics?
type ResponseCollector interface {
Collect(response interface{}) error
Values() interface{}
}

// New makes a new Querier.
func New(
cfg Config,
Expand Down Expand Up @@ -379,7 +368,6 @@ func (q *Querier) forIngesterRings(ctx context.Context, userID string, getReplic
var mtx sync.Mutex
var wg sync.WaitGroup

// var responses []responseFromIngesters
var responseErr error

for i, ring := range q.ingesterRings {
Expand All @@ -401,7 +389,6 @@ func (q *Querier) forIngesterRings(ctx context.Context, userID string, getReplic
wg.Add(1)
go func() {
defer wg.Done()
// this response will be empty
err := forOneIngesterRing(ctx, replicationSet, f, pool, q.cfg.ExtraQueryDelay)

if err != nil {
Expand All @@ -411,11 +398,6 @@ func (q *Querier) forIngesterRings(ctx context.Context, userID string, getReplic
responseErr = multierr.Combine(responseErr, err)
return
}

// for _, r := range res {
// // instead of building the response here, we can add these to collector?
// responses = append(responses, r.(responseFromIngesters))
// }
}()
}

Expand Down Expand Up @@ -448,17 +430,13 @@ func forOneIngesterRing(ctx context.Context, replicationSet ring.ReplicationSet,
return nil, fmt.Errorf("failed to execute f() for %s: %w", ingester.Addr, err)
}

// FIXME: figure out the correct way to collect responses
// TBH, I think we can stop returning the resp here from the forEachFn

// we are returning the empty response here because responses are collected by the collector
// we are returning the empty response here because response is collected by
// the collector inside forEachFn
return nil, nil
}

// ignore response because we are using a collector and nil from the forEachFn
// need to return nil because doFunc returns an interface
// FIXME: does this work? i think it should? replicationSet.Do will collect a bunch of
// nil responses and return them, which is fine because we are ignoring them
// ignore response because it's nil, and we are using a collector inside forEachFn
// need to return nil because doFunc expects us to return a response
_, err := replicationSet.Do(ctx, extraQueryDelay, doFunc)

return err
Expand Down Expand Up @@ -516,8 +494,7 @@ func (q *Querier) SearchRecent(ctx context.Context, req *tempopb.SearchRequest)
return nil, fmt.Errorf("error extracting org id in Querier.Search: %w", err)
}

// FIXME: placeholder, will make a proper collector here
coll := collector.NewGenericCollector()
coll := collector.NewGenericCollector[*tempopb.SearchResponse]()

forEach := func(ctx context.Context, client tempopb.QuerierClient) error {
resp, err := client.SearchRecent(ctx, req)
Expand All @@ -532,11 +509,7 @@ func (q *Querier) SearchRecent(ctx context.Context, req *tempopb.SearchRequest)
return nil, fmt.Errorf("error querying ingesters in Querier.Search: %w", err)
}

// call collect.Values here and pass the responses to it for prcoessing??

// maybe typcaset here? or do it in the method we are using??
resp := coll.Results()
return q.postProcessIngesterSearchResults(req, resp), nil
return q.postProcessIngesterSearchResults(req, coll.Values()), nil
}

func (q *Querier) SearchTagsBlocks(ctx context.Context, req *tempopb.SearchTagsBlockRequest) (*tempopb.SearchTagsResponse, error) {
Expand Down Expand Up @@ -599,11 +572,6 @@ func (q *Querier) SearchTags(ctx context.Context, req *tempopb.SearchTagsRequest
if err != nil {
return nil, fmt.Errorf("error querying ingesters in Querier.SearchTags: %w", err)
}
// for _, resp := range lookupResults {
// for _, res := range resp.response.(*tempopb.SearchTagsResponse).TagNames {
// distinctValues.Collect(res)
// }
// }

if distinctValues.Exceeded() {
level.Warn(log.Logger).Log("msg", "size of tags in instance exceeded limit, reduce cardinality or size of tags", "userID", userID, "limit", limit, "total", distinctValues.TotalDataSize())
Expand Down Expand Up @@ -645,14 +613,6 @@ func (q *Querier) SearchTagsV2(ctx context.Context, req *tempopb.SearchTagsReque
return nil, fmt.Errorf("error querying ingesters in Querier.SearchTags: %w", err)
}

// for _, resp := range lookupResults {
// for _, res := range resp.response.(*tempopb.SearchTagsV2Response).Scopes {
// for _, tag := range res.Tags {
// distinctValues.Collect(res.Name, tag)
// }
// }
// }

if distinctValues.Exceeded() {
level.Warn(log.Logger).Log("msg", "size of tags in instance exceeded limit, reduce cardinality or size of tags", "userID", userID, "limit", limit)
}
Expand Down Expand Up @@ -701,11 +661,6 @@ func (q *Querier) SearchTagValues(ctx context.Context, req *tempopb.SearchTagVal
if err != nil {
return nil, fmt.Errorf("error querying ingesters in Querier.SearchTagValues: %w", err)
}
// for _, resp := range lookupResults {
// for _, res := range resp.response.(*tempopb.SearchTagValuesResponse).TagValues {
// distinctValues.Collect(res)
// }
// }

if distinctValues.Exceeded() {
level.Warn(log.Logger).Log("msg", "size of tag values in instance exceeded limit, reduce cardinality or size of tags", "tag", req.TagName, "userID", userID, "limit", limit, "total", distinctValues.TotalDataSize())
Expand All @@ -726,7 +681,7 @@ func (q *Querier) SearchTagValuesV2(ctx context.Context, req *tempopb.SearchTagV

limit := q.limits.MaxBytesPerTagValuesQuery(userID)
// collection without tracking diffs is fast, use that because we don't need to track diffs here
distinctValues := collector.NewDistinctValueWithoutDiff(limit, func(v tempopb.TagValue) int { return len(v.Type) + len(v.Value) })
distinctValues := collector.NewDistinctValue(limit, func(v tempopb.TagValue) int { return len(v.Type) + len(v.Value) })

// Virtual tags values. Get these first.
virtualVals := search.GetVirtualTagValuesV2(req.TagName)
Expand Down Expand Up @@ -757,19 +712,6 @@ func (q *Querier) SearchTagValuesV2(ctx context.Context, req *tempopb.SearchTagV
return nil, fmt.Errorf("error querying ingesters in Querier.SearchTagValues: %w", err)
}

// and then we add them all to the collector in a nested loop
// O(n^2) and in our case n is the number of ingesters and the number of tag values returned by each ingester
// so for a cluster with 100 ingesters and 100 tag values per ingester this is 10,000 iterations
// for _, resp := range lookupResults {
// for _, res := range resp.response.(*tempopb.SearchTagValuesV2Response).TagValues {
// // stop when we hit the limit, because we are not adding these values to the collector anyway,
// // so early stopping is fine
// if distinctValues.Collect(*res) {
// break
// }
// }
// }

if distinctValues.Exceeded() {
level.Warn(log.Logger).Log("msg", "size of tag values in instance exceeded limit, reduce cardinality or size of tags", "tag", req.TagName, "userID", userID, "limit", limit, "total", distinctValues.TotalDataSize())
}
Expand Down Expand Up @@ -801,6 +743,8 @@ func (q *Querier) SpanMetricsSummary(
if err != nil {
return nil, fmt.Errorf("error finding generators in Querier.SpanMetricsSummary: %w", err)
}

// TODO: we can pass the collection down to the forEach function here as well
lookupResults, err := q.forGivenGenerators(
ctx,
replicationSet,
Expand All @@ -812,6 +756,7 @@ func (q *Querier) SpanMetricsSummary(
return nil, fmt.Errorf("error querying generators in Querier.SpanMetricsSummary: %w", err)
}

// TODO: we use a collector here as well instead of
// Assemble the results from the generators in the pool
results := make([]*tempopb.SpanMetricsResponse, 0, len(lookupResults))
for _, result := range lookupResults {
Expand Down Expand Up @@ -1161,17 +1106,14 @@ func (q *Querier) internalTagValuesSearchBlockV2(ctx context.Context, req *tempo
return valuesToV2Response(valueCollector), nil
}

// FIXME: fix this to make use of collect.Results
func (q *Querier) postProcessIngesterSearchResults(req *tempopb.SearchRequest, rr []tempopb.SearchResponse) *tempopb.SearchResponse {
func (q *Querier) postProcessIngesterSearchResults(req *tempopb.SearchRequest, rr []*tempopb.SearchResponse) *tempopb.SearchResponse {
response := &tempopb.SearchResponse{
Metrics: &tempopb.SearchMetrics{},
}

traces := map[string]*tempopb.TraceSearchMetadata{}

// FIXME: make use of the collect.Results
for _, r := range rr {
sr := r.response.(*tempopb.SearchResponse)
for _, sr := range rr {
for _, t := range sr.Traces {
// Just simply take first result for each trace
if _, ok := traces[t.TraceID]; !ok {
Expand Down
62 changes: 0 additions & 62 deletions modules/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,65 +184,3 @@ func TestVirtualTagsDoesntHitBackend(t *testing.T) {
})
require.Error(t, err)
}

// type MockQuerier struct {
// *Querier
// forIngesterRingsFunc func(ctx context.Context, userID string, getReplicationSet replicationSetFn, f forEachFn) ([]responseFromIngesters, error)
// }
//
// func (m *MockQuerier) forIngesterRings(ctx context.Context, userID string, getReplicationSet replicationSetFn, f forEachFn) ([]responseFromIngesters, error) {
// return m.forIngesterRingsFunc(ctx, userID, getReplicationSet, f)
// }
//
// func BenchmarkSearchTagValuesV2(b *testing.B) {
// ctx := context.Background()
// ctx = user.InjectOrgID(ctx, "test")
//
// o, err := overrides.NewOverrides(overrides.Config{}, nil, prometheus.DefaultRegisterer)
// require.NoError(b, err)
//
// ogq, err := New(Config{}, ingester_client.Config{}, nil, generator_client.Config{}, nil, nil, o)
// if err != nil {
// b.Fatal(err)
// }
//
// q := &MockQuerier{Querier: ogq}
//
// // Mock forIngesterRings
// q.forIngesterRingsFunc = func(ctx context.Context, userID string, getReplicationSet replicationSetFn, f forEachFn) ([]responseFromIngesters, error) {
// numIngesters := 100
// responses := make([]responseFromIngesters, numIngesters)
//
// for i := 0; i < numIngesters; i++ {
// tagValues := make([]*tempopb.TagValue, 100)
// for j := 0; j < 100; j++ {
// tagValues[j] = &tempopb.TagValue{
// Type: "type",
// Value: "value",
// }
// }
//
// resp := &tempopb.SearchTagValuesV2Response{
// TagValues: tagValues,
// }
//
// responses[i] = responseFromIngesters{
// addr: fmt.Sprintf("ingester-%d", i),
// response: resp,
// }
// }
// return responses, nil
// }
//
// req := &tempopb.SearchTagValuesRequest{
// TagName: "test-tag",
// }
//
// b.ResetTimer()
// for n := 0; n < b.N; n++ {
// _, err := q.SearchTagValuesV2(ctx, req)
// if err != nil {
// b.Fatal(err)
// }
// }
// }

0 comments on commit 708508f

Please sign in to comment.