diff --git a/cmd/tempo-vulture/main.go b/cmd/tempo-vulture/main.go index 2001263bdd8..ca051a481d2 100644 --- a/cmd/tempo-vulture/main.go +++ b/cmd/tempo-vulture/main.go @@ -2,6 +2,7 @@ package main import ( "bytes" + "context" "crypto/tls" "errors" "flag" @@ -12,10 +13,12 @@ import ( "net/url" "os" "reflect" + "strconv" "time" "github.com/go-test/deep" jaeger_grpc "github.com/jaegertracing/jaeger/cmd/agent/app/reporter/grpc" + thrift "github.com/jaegertracing/jaeger/thrift-gen/jaeger" zaplogfmt "github.com/jsternberg/zap-logfmt" "github.com/prometheus/client_golang/prometheus/promhttp" "go.uber.org/zap" @@ -24,6 +27,9 @@ import ( "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" + "github.com/grafana/dskit/user" + + testUtil "github.com/grafana/tempo/integration/util" "github.com/grafana/tempo/pkg/httpclient" "github.com/grafana/tempo/pkg/model/trace" "github.com/grafana/tempo/pkg/tempopb" @@ -60,6 +66,10 @@ type traceMetrics struct { const ( defaultJaegerGRPCEndpoint = 14250 + specialSpanName = "specialVultureSpanName" + specialServiceName = "specialVultureServiceName" + specialAttributeKey = "specialVultureAttributeKey" + specialAttributeValue = "specialVultureAttributeValue" ) type vultureConfiguration struct { @@ -119,17 +129,18 @@ func main() { } httpClient := httpclient.New(vultureConfig.tempoQueryURL, vultureConfig.tempoOrgID) - tickerWrite, tickerRead, tickerSearch, err := initTickers(vultureConfig.tempoWriteBackoffDuration, vultureConfig.tempoReadBackoffDuration, vultureConfig.tempoSearchBackoffDuration) - if err != nil { - panic(err) - } + //tickerWrite, tickerRead, tickerSearch, err := initTickers(vultureConfig.tempoWriteBackoffDuration, vultureConfig.tempoReadBackoffDuration, vultureConfig.tempoSearchBackoffDuration) + // if err != nil { + // panic(err) + // } startTime := time.Now() r := rand.New(rand.NewSource(startTime.Unix())) - interval := vultureConfig.tempoWriteBackoffDuration + //interval := vultureConfig.tempoWriteBackoffDuration - doWrite(jaegerClient, tickerWrite, interval, vultureConfig, logger) - doRead(httpClient, tickerRead, startTime, interval, r, vultureConfig, logger) - doSearch(httpClient, tickerSearch, startTime, interval, r, vultureConfig, logger) + //doWrite(jaegerClient, tickerWrite, interval, vultureConfig, logger) + //doRead(httpClient, tickerRead, startTime, interval, r, vultureConfig, logger) + //doSearch(httpClient, tickerSearch, startTime, interval, r, vultureConfig, logger) + doLongTests(jaegerClient, httpClient, vultureConfig, *r, logger) http.Handle(prometheusPath, promhttp.Handler()) log.Fatal(http.ListenAndServe(prometheusListenAddress, nil)) @@ -310,6 +321,284 @@ func doSearch(httpClient httpclient.TempoHTTPClient, tickerSearch *time.Ticker, } } +type SpanTracker struct { + count int + timeStamps []int64 + spanCountSameName []int + spanCountSameService []int + spanCountSameAttribute []int + spanName string + serviceName string + attributeKey string + attributeValue string +} + +func newSpanTracker() *SpanTracker { + timeNow := strconv.Itoa(int(time.Now().UnixMilli())) + return &SpanTracker{ + serviceName: specialServiceName + timeNow, + spanName: specialSpanName + timeNow, + attributeKey: specialAttributeKey, + attributeValue: specialAttributeValue + timeNow, + } +} + +func (st *SpanTracker) MakeBatch(r rand.Rand, l *zap.Logger) *thrift.Batch { + // make starting batch with service name + startingSpanCount := r.Intn(4) + 1 + batch := testUtil.MakeThriftBatchWithSpanCountResourceAndSpanAttr(startingSpanCount, st.serviceName, "span-name", "vulture", "vulture", "key", st.attributeKey) + + // reuse trace id & start time + traceIDHigh := batch.Spans[0].TraceIdHigh + traceIDLow := batch.Spans[0].TraceIdLow + startTime := batch.Spans[0].StartTime + + // inject more spans with special name + spanNameSpanCount := r.Intn(4) + 1 + for i := 0; i < spanNameSpanCount; i++ { + batch.Spans = append(batch.Spans, &thrift.Span{ + TraceIdLow: traceIDLow, + TraceIdHigh: traceIDHigh, + SpanId: rand.Int63(), + ParentSpanId: 0, + OperationName: st.spanName, + Flags: 0, + StartTime: startTime, + Duration: 1, + }) + } + + // inject more spans with special attribute + spanAttributeSpanCount := r.Intn(4) + 1 + for i := 0; i < spanAttributeSpanCount; i++ { + batch.Spans = append(batch.Spans, &thrift.Span{ + TraceIdLow: traceIDLow, + TraceIdHigh: traceIDHigh, + SpanId: rand.Int63(), + ParentSpanId: 0, + OperationName: "my operation", + Flags: 0, + StartTime: startTime, + Duration: 1, + Tags: []*thrift.Tag{ + { + Key: st.attributeKey, + VStr: &st.attributeValue, + }, + }, + }) + } + st.count++ + st.timeStamps = append(st.timeStamps, startTime) + st.spanCountSameName = append(st.spanCountSameName, spanNameSpanCount) + st.spanCountSameAttribute = append(st.spanCountSameAttribute, spanAttributeSpanCount) + st.spanCountSameService = append(st.spanCountSameService, startingSpanCount+spanNameSpanCount+spanAttributeSpanCount) + return batch +} + +func (st *SpanTracker) GetSpanCount(scenario string, start, end int) int { + tracker := st.spanCountSameName + if scenario == "service" { + tracker = st.spanCountSameService + } else if scenario == "attribute" { + tracker = st.spanCountSameAttribute + } + + spanCount := 0 + for i := start; i <= end; i++ { + spanCount += tracker[i] + } + return spanCount +} + +func (st *SpanTracker) GetSpanCounts(scenario string, start, end int) []int { + tracker := st.spanCountSameName + if scenario == "service" { + tracker = st.spanCountSameService + } else if scenario == "attribute" { + tracker = st.spanCountSameAttribute + } + + spanCounts := make([]int, end-start + 1) + for i := start; i <= end; i++ { + spanCounts[i-start] = tracker[i] + } + return spanCounts +} + +func (st *SpanTracker) GetTraceCount(start, end int) int { + count := 0 + + for i := start; i <= end; i++ { + if st.spanCountSameService[i] != 0 { + count++ + } + } + return count +} + +func (st *SpanTracker) GetRandomStartEndPosition(r rand.Rand) (int, int) { + // with rhythm we are ingesting spans at a higher latency than before + // so we are allowing 3 minutes slack time (by excluding the last 3 values) + // choosing a random start and end time with at least 20 data points + count := len(st.timeStamps) - 3 + start := r.Intn(count - 20) // ensuring 20 counts + end := 20 + start + return start, end // the end is inclusive +} + +func (st *SpanTracker) ValidateTraceQLSearches(tempoClient httpclient.TempoHTTPClient, startPosition, endPosition int, logger *zap.Logger) error { + serviceQuery := fmt.Sprintf(`{resource.service.name = "%s"}`, st.serviceName) + nameQuery := fmt.Sprintf(`{span:name = "%s"}`, st.spanName) + attributeQuery := fmt.Sprintf(`{span.%s = "%s"}`, st.attributeKey, st.attributeValue) + + scenarios := []string{serviceQuery, nameQuery, attributeQuery} + + for _, scenario := range scenarios { + queryType := "service" + if scenario == nameQuery { + queryType = "name" + } else if scenario == attributeQuery { + queryType = "attribute" + } + + // add some slack to start and end time in the query + start := time.UnixMicro(st.timeStamps[startPosition]).Add(-1 * time.Second).Unix() + end := time.UnixMicro(st.timeStamps[endPosition]).Add(1 * time.Second).Unix() + resp, err := tempoClient.SearchTraceQLWithRangeAndLimit(scenario, start, end, 5000, 100) + if err != nil { + logger.Error("error searching Tempo traceql query", zap.Error(err)) + return err + } + pass := true + expectedCount := st.GetTraceCount(startPosition, endPosition) + if len(resp.Traces) != expectedCount { + pass = false + logger.Error("incorrect number of traces returned", zap.String("scenarios", scenario), zap.Int("expected", expectedCount), zap.Int("actual", len(resp.Traces))) + } + actualSpanCount := 0 + for _, trace := range resp.Traces { + for _, spanset := range trace.SpanSets{ + actualSpanCount += len(spanset.Spans) + } + } + expectedSpanCount := st.GetSpanCount(queryType, startPosition, endPosition) + if actualSpanCount != expectedSpanCount { + pass = false + logger.Error("incorrect number of spans returned", zap.String("scenarios", scenario), zap.Int("expected", expectedSpanCount), zap.Int("actual", actualSpanCount)) + // metricTracesErrors.WithLabelValues("notfound_search_attribute").Add(float64(metrics.notFoundSearchAttribute)) + } + + if !pass { + metricTracesErrors.WithLabelValues("traceql_incorrect_result").Add(float64(1)) + } + } + return nil +} + +func (st *SpanTracker) ValidateTraceQLMetricsSearches(tempoClient httpclient.TempoHTTPClient, startPosition, endPosition int, ticketDuration time.Duration, logger *zap.Logger) error{ + serviceQuery := fmt.Sprintf(`{resource.service.name = "%s"} | rate()`, st.serviceName) + nameQuery := fmt.Sprintf(`{span:name = "%s"} | rate()`, st.spanName) + attributeQuery := fmt.Sprintf(`{span.%s = "%s"} | rate()`, st.attributeKey, st.attributeValue) + + scenarios := []string{serviceQuery, nameQuery, attributeQuery} + + for _, scenario := range scenarios { + queryType := "service" + if scenario == nameQuery { + queryType = "name" + } else if scenario == attributeQuery { + queryType = "attribute" + } + spanCounts := st.GetSpanCounts(queryType, startPosition, endPosition) + start := time.UnixMicro(st.timeStamps[startPosition]).Add(-1 * time.Second).Unix() + end := time.UnixMicro(st.timeStamps[endPosition]).Add(1 * time.Second).Unix() + if end > time.Now().Unix() { + end = time.Now().Unix() + } + stepSecond := ticketDuration.Seconds() + step := int64(ticketDuration) + + resp, err := tempoClient.SearchQueryRange(scenario, start, end, step) + if err != nil { + logger.Error("error searching Tempo query range query", zap.Error(err), zap.String("query", scenario)) + return err + } + // since we send and record count every 30 seconds and we set the step to 30 seconds + // we expect the count to be the same between span tracker count and time series + pass := true + for i, sample := range resp.Series[0].Samples { + if i >= len(spanCounts) { continue } // for when start/end time creates additional samples + expectedSpanCountRate := float64(spanCounts[i])/stepSecond + if (sample.Value != 0 && sample.Value != expectedSpanCountRate) || (sample.Value == 0 && spanCounts[i] != 0) { + logger.Error("incorrect number of spans returned for query range test", zap.String("scenarios", scenario), zap.Float64("expected", expectedSpanCountRate), zap.Float64("actual", sample.Value)) + } + } + if !pass { + metricTracesErrors.WithLabelValues("metrics_query_incorrect_result").Add(float64(1)) + } + + + } + return nil +} + +func doLongTests(jaegerClient util.JaegerClient, tempoClient httpclient.TempoHTTPClient, config vultureConfiguration, r rand.Rand, l *zap.Logger) { + + // run every 30 seconds + ticketDuration := time.Duration(30) * time.Second + ticker := time.NewTicker(ticketDuration) + spanTracker := newSpanTracker() + + go func() { + for range ticker.C { + // create a new span tracker every 500 times to clear out old data + if len(spanTracker.timeStamps) >= 500 { + spanTracker = newSpanTracker() + } + + // emit traces and keep track of span counts + ctx := user.InjectOrgID(context.Background(), config.tempoOrgID) + ctx, err := user.InjectIntoGRPCRequest(ctx) + if err != nil { + logger.Error("error injecting org id", zap.Error(err)) + continue + } + + batch := spanTracker.MakeBatch(r, l) + err = jaegerClient.EmitBatch(ctx, batch) + if err != nil { + logger.Error("error pushing batch to Tempo", zap.Error(err)) + // don't record the last span count if it failed but still record the timestamps for metrics queries + spanTracker.spanCountSameName[len(spanTracker.spanCountSameName)-1] = 0 + spanTracker.spanCountSameService[len(spanTracker.spanCountSameService)-1] = 0 + spanTracker.spanCountSameAttribute[len(spanTracker.spanCountSameAttribute)-1] = 0 + spanTracker.count-- + continue + } + logger.Info("pushed batch to Tempo", zap.Int("count", spanTracker.count)) + + // only search after at least 30 pushes + if spanTracker.count < 30 { + logger.Info("pushed only", zap.Int("count", spanTracker.count)) + continue + } + + // choose random start/end for searches (the end position is inclusive) + startPosition, endPosition := spanTracker.GetRandomStartEndPosition(r) + logger.Info("random positions", zap.Int("start", startPosition), zap.Int("end", endPosition)) + + // traceql + spanTracker.ValidateTraceQLSearches(tempoClient, startPosition, endPosition, l) + + // metrics searches + spanTracker.ValidateTraceQLMetricsSearches(tempoClient, startPosition, endPosition, ticketDuration, l) + + } + }() + +} + func pushMetrics(metrics traceMetrics) { metricTracesInspected.Add(float64(metrics.requested)) metricTracesErrors.WithLabelValues("incorrectresult").Add(float64(metrics.incorrectResult)) diff --git a/cmd/tempo-vulture/mocks.go b/cmd/tempo-vulture/mocks.go index 955d771d27b..eb801f574d2 100644 --- a/cmd/tempo-vulture/mocks.go +++ b/cmd/tempo-vulture/mocks.go @@ -198,3 +198,8 @@ func (m *MockHTTPClient) SetOverrides(limits *userconfigurableoverrides.Limits, func (m *MockHTTPClient) WithTransport(t http.RoundTripper) { panic("unimplemented") } + +//nolint:all +func (m *MockHTTPClient) SearchQueryRange(query string, start int64, end int64, step int64) (*tempopb.QueryRangeResponse, error) { + panic("unimplemented") +} \ No newline at end of file diff --git a/integration/util/util.go b/integration/util/util.go index cd3248b21ec..a80bb0e58e7 100644 --- a/integration/util/util.go +++ b/integration/util/util.go @@ -505,8 +505,7 @@ func MakeThriftBatch() *thrift.Batch { func MakeThriftBatchWithSpanCount(n int) *thrift.Batch { return MakeThriftBatchWithSpanCountAttributeAndName(n, "my operation", "", "y", "xx", "x") } - -func MakeThriftBatchWithSpanCountAttributeAndName(n int, name, resourceValue, spanValue, resourceTag, spanTag string) *thrift.Batch { +func MakeThriftBatchWithSpanCountResourceAndSpanAttr(n int, serviceName, spanName, resourceValue, spanValue, resourceTag, spanTag string) *thrift.Batch { var spans []*thrift.Span traceIDLow := rand.Int63() @@ -517,7 +516,7 @@ func MakeThriftBatchWithSpanCountAttributeAndName(n int, name, resourceValue, sp TraceIdHigh: traceIDHigh, SpanId: rand.Int63(), ParentSpanId: 0, - OperationName: name, + OperationName: spanName, References: nil, Flags: 0, StartTime: time.Now().UnixNano() / 1000, // microsecconds @@ -534,7 +533,7 @@ func MakeThriftBatchWithSpanCountAttributeAndName(n int, name, resourceValue, sp return &thrift.Batch{ Process: &thrift.Process{ - ServiceName: "my-service", + ServiceName: serviceName, Tags: []*thrift.Tag{ { Key: resourceTag, @@ -546,6 +545,9 @@ func MakeThriftBatchWithSpanCountAttributeAndName(n int, name, resourceValue, sp Spans: spans, } } +func MakeThriftBatchWithSpanCountAttributeAndName(n int, name, resourceValue, spanValue, resourceTag, spanTag string) *thrift.Batch { + return MakeThriftBatchWithSpanCountResourceAndSpanAttr(n, "my-service", name, resourceValue, spanValue, resourceTag, spanTag) +} func CallFlush(t *testing.T, ingester *e2e.HTTPService) { fmt.Printf("Calling /flush on %s\n", ingester.Name()) diff --git a/pkg/httpclient/client.go b/pkg/httpclient/client.go index 34bf40e81f1..639d654abb6 100644 --- a/pkg/httpclient/client.go +++ b/pkg/httpclient/client.go @@ -10,6 +10,7 @@ import ( "net/url" "strconv" "strings" + "time" "github.com/golang/protobuf/jsonpb" //nolint:all "github.com/golang/protobuf/proto" //nolint:all @@ -52,6 +53,7 @@ type TempoHTTPClient interface { SearchTraceQLWithRange(query string, start int64, end int64) (*tempopb.SearchResponse, error) SearchTraceQLWithRangeAndLimit(query string, start int64, end int64, limit int64, spss int64) (*tempopb.SearchResponse, error) MetricsSummary(query string, groupBy string, start int64, end int64) (*tempopb.SpanMetricsSummaryResponse, error) + SearchQueryRange(query string, start int64, end int64, step int64) (*tempopb.QueryRangeResponse, error) GetOverrides() (*userconfigurableoverrides.Limits, string, error) SetOverrides(limits *userconfigurableoverrides.Limits, version string) (string, error) PatchOverrides(limits *userconfigurableoverrides.Limits) (*userconfigurableoverrides.Limits, string, error) @@ -354,6 +356,16 @@ func (c *Client) MetricsSummary(query string, groupBy string, start int64, end i return m, nil } +func (c *Client) SearchQueryRange(query string, start int64, end int64, step int64) (*tempopb.QueryRangeResponse, error) { + m := &tempopb.QueryRangeResponse{} + _, err := c.getFor(c.buildQueryRangeURL("q", query, start, end, step), m) + if err != nil { + return m, err + } + + return m, nil +} + func (c *Client) buildSearchQueryURL(queryType string, query string, start int64, end int64, limit int64, spss int64) string { joinURL, _ := url.Parse(c.BaseURL + "/api/search?") q := joinURL.Query() @@ -397,6 +409,20 @@ func (c *Client) buildTagsV2QueryURL(start int64, end int64) string { return fmt.Sprint(joinURL) } +func (c *Client) buildQueryRangeURL(queryType string, query string, start int64, end int64, step int64) string { + joinURL, _ := url.Parse(c.BaseURL + api.PathMetricsQueryRange + "?") + q := joinURL.Query() + if start != 0 && end != 0 { + q.Set("start", strconv.FormatInt(start, 10)) + q.Set("end", strconv.FormatInt(end, 10)) + q.Set("step", time.Duration(step).String()) + } + q.Set(queryType, query) + joinURL.RawQuery = q.Encode() + + return fmt.Sprint(joinURL) +} + func (c *Client) buildTagValuesV2QueryURL(key string, start int64, end int64) string { urlPath := fmt.Sprintf(`/api/v2/search/tag/%s/values`, key) joinURL, _ := url.Parse(c.BaseURL + urlPath + "?")