Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query Frontend Refactor: Metrics Query Range #3584

Merged
merged 19 commits into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,8 @@ issues:
- path: integration/e2e
linters:
- unused
# it's common to have multiple instances of a string in a test file.
# don't flag it with the linter
- path: _test\.go
linters:
- goconst
8 changes: 6 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
## main / unreleased

* [ENHANCEMENT] Add querier metrics for requests executed [#3524](https://github.com/grafana/tempo/pull/3524) (@electron0zero)
* [FEATURE] Added gRPC streaming endpoints for all tag queries. [#3460](https://github.com/grafana/tempo/pull/3460) (@joe-elliott)
* [FEATURE] Added gRPC streaming endpoints for Tempo APIs.
* Added gRPC streaming endpoints for all tag queries. [#3460](https://github.com/grafana/tempo/pull/3460) (@joe-elliott)
* Added gRPC streaming endpoints for metrics. [#3584](https://github.com/grafana/tempo/pull/3584) (@joe-elliott)
* Reduced memory consumption in the frontend for large traces. [#3522](https://github.com/grafana/tempo/pull/3522) (@joe-elliott)
* **Breaking Change** Remove trace by id hedging from the frontend. [#3522](https://github.com/grafana/tempo/pull/3522) (@joe-elliott)
* **Breaking Change** Dropped meta-tag for tenant from trace by id multitenant. [#3522](https://github.com/grafana/tempo/pull/3522) (@joe-elliott)
* [CHANGE] Align metrics query time ranges to the step parameter [#3490](https://github.com/grafana/tempo/pull/3490) (@mdisibio)
* [CHANGE] **Breaking Change** Remove trace by id hedging from the frontend [#3522](https://github.com/grafana/tempo/pull/3522) (@joe-elliott)
* [CHANGE] Change the UID and GID of the `tempo` user to avoid root [#2265](https://github.com/grafana/tempo/pull/2265) (@zalegrala)
**BREAKING CHANGE** Ownership of /var/tempo is changing. Historyically this
has been owned by root:root, and with this change it will now be owned by
Expand Down
135 changes: 135 additions & 0 deletions cmd/tempo-cli/cmd-query-metrics-query-range.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package main

import (
"bytes"
"context"
"errors"
"io"
"net/http"
"path"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/gogo/protobuf/jsonpb"
"github.com/grafana/dskit/user"
"github.com/grafana/tempo/pkg/api"
"github.com/grafana/tempo/pkg/tempopb"
)

type metricsQueryRangeCmd struct {
HostPort string `arg:"" help:"tempo host and port. scheme and path will be provided based on query type. e.g. localhost:3200"`
TraceQL string `arg:"" optional:"" help:"traceql query"`
Start string `arg:"" optional:"" help:"start time in ISO8601 format"`
End string `arg:"" optional:"" help:"end time in ISO8601 format"`

OrgID string `help:"optional orgID"`
UseGRPC bool `help:"stream search results over GRPC"`
PathPrefix string `help:"string to prefix all http paths with"`
}

func (cmd *metricsQueryRangeCmd) Run(_ *globalOptions) error {
startDate, err := time.Parse(time.RFC3339, cmd.Start)
if err != nil {
return err
}
start := startDate.Unix()

endDate, err := time.Parse(time.RFC3339, cmd.End)
if err != nil {
return err
}
end := endDate.Unix()

req := &tempopb.QueryRangeRequest{
Query: cmd.TraceQL,
Start: uint64(start),
End: uint64(end),
Step: uint64(5 * time.Second),
}

if cmd.UseGRPC {
return cmd.searchGRPC(req)
}

return cmd.searchHTTP(req)
}

func (cmd *metricsQueryRangeCmd) searchGRPC(req *tempopb.QueryRangeRequest) error {
ctx := user.InjectOrgID(context.Background(), cmd.OrgID)
ctx, err := user.InjectIntoGRPCRequest(ctx)
if err != nil {
return err
}

clientConn, err := grpc.DialContext(ctx, cmd.HostPort, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return err
}

client := tempopb.NewStreamingQuerierClient(clientConn)

resp, err := client.MetricsQueryRange(ctx, req)
if err != nil {
return err
}

for {
searchResp, err := resp.Recv()
if searchResp != nil {
err = printAsJSON(searchResp)
if err != nil {
return err
}
}
if errors.Is(err, io.EOF) {
return nil
}
if err != nil {
return err
}
}
}

// nolint: goconst // goconst wants us to make http:// a const
func (cmd *metricsQueryRangeCmd) searchHTTP(req *tempopb.QueryRangeRequest) error {
httpReq, err := http.NewRequest("GET", "http://"+path.Join(cmd.HostPort, cmd.PathPrefix, api.PathMetricsQueryRange), nil)
if err != nil {
return err
}

httpReq = api.BuildQueryRangeRequest(httpReq, req)
httpReq.Header = http.Header{}
err = user.InjectOrgIDIntoHTTPRequest(user.InjectOrgID(context.Background(), cmd.OrgID), httpReq)
if err != nil {
return err
}

httpResp, err := http.DefaultClient.Do(httpReq)
if err != nil {
return err
}
defer httpResp.Body.Close()

body, err := io.ReadAll(httpResp.Body)
if err != nil {
return err
}

if httpResp.StatusCode != http.StatusOK {
return errors.New("failed to query. body: " + string(body) + " status: " + httpResp.Status)
}

resp := &tempopb.QueryRangeResponse{}
err = jsonpb.Unmarshal(bytes.NewReader(body), resp)
if err != nil {
panic("failed to parse resp: " + err.Error())
}
err = printAsJSON(resp)
if err != nil {
return err
}

return nil
}
1 change: 1 addition & 0 deletions cmd/tempo-cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ var cli struct {
SearchTags querySearchTagsCmd `cmd:"" help:"query Tempo search tags"`
SearchTagValues querySearchTagValuesCmd `cmd:"" help:"query Tempo search tag values"`
Search querySearchCmd `cmd:"" help:"query Tempo search"`
Metrics metricsQueryRangeCmd `cmd:"" help:"query Tempo metrics query range"`
} `cmd:""`
Blocks queryBlocksCmd `cmd:"" help:"query for a traceid directly from backend blocks"`
TraceSummary queryTraceSummaryCmd `cmd:"" help:"query summary for a traceid directly from backend blocks"`
Expand Down
5 changes: 2 additions & 3 deletions cmd/tempo/app/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,9 +399,8 @@ func (t *App) initQueryFrontend() (services.Service, error) {
t.Server.HTTPRouter().Handle(addHTTPAPIPrefix(&t.cfg, api.PathSearchTagValuesV2), base.Wrap(queryFrontend.SearchTagsValuesV2Handler))

// http metrics endpoints
t.Server.HTTPRouter().Handle(addHTTPAPIPrefix(&t.cfg, api.PathSpanMetricsSummary), base.Wrap(queryFrontend.SpanMetricsSummaryHandler))
t.Server.HTTPRouter().Handle(addHTTPAPIPrefix(&t.cfg, api.PathMetricsQueryRange), base.Wrap(queryFrontend.QueryRangeHandler))
t.Server.HTTPRouter().Handle(addHTTPAPIPrefix(&t.cfg, api.PathPromQueryRange), base.Wrap(queryFrontend.QueryRangeHandler))
t.Server.HTTPRouter().Handle(addHTTPAPIPrefix(&t.cfg, api.PathSpanMetricsSummary), base.Wrap(queryFrontend.MetricsSummaryHandler))
t.Server.HTTPRouter().Handle(addHTTPAPIPrefix(&t.cfg, api.PathMetricsQueryRange), base.Wrap(queryFrontend.MetricsQueryRangeHandler))

// the query frontend needs to have knowledge of the blocks so it can shard search jobs
if t.cfg.Target == QueryFrontend {
Expand Down
1 change: 1 addition & 0 deletions docs/sources/tempo/api_docs/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -640,5 +640,6 @@ service StreamingQuerier {
rpc SearchTagsV2(SearchTagsRequest) returns (stream SearchTagsV2Response) {}
rpc SearchTagValues(SearchTagValuesRequest) returns (stream SearchTagValuesResponse) {}
rpc SearchTagValuesV2(SearchTagValuesRequest) returns (stream SearchTagValuesV2Response) {}
rpc MetricsQueryRange(QueryRangeRequest) returns (stream QueryRangeResponse) {}
}
```
21 changes: 21 additions & 0 deletions docs/sources/tempo/operations/tempo_cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,27 @@ Options:
Streaming over HTTP requires the `stream_over_http_enabled` flag to be set. For more information, refer to [Tempo GRPC API documentation]({{< relref "../api_docs" >}}).
{{% /admonition %}}

### Metrics
Call the Tempo API and generate metrics from traces using TraceQL.

```bash
tempo-cli query api metrics <host-port> <trace-ql metrics query> [<start> <end>]
```
Arguments:
- `host-port` A host/port combination for Tempo. The scheme will be inferred based on the options provided.
- `trace-ql metrics query` TraceQL metrics query.
- `start` Start of the time range to search: (YYYY-MM-DDThh:mm:ss)
- `end` End of the time range to search: (YYYY-MM-DDThh:mm:ss)

Options:
- `--org-id <value>` Organization ID (for use in multi-tenant setup).
- `--use-grpc` Use GRPC streaming
- `--path-prefix <value>` String to prefix search paths with

{{% admonition type="note" %}}
Streaming over HTTP requires the `stream_over_http_enabled` flag to be set. For more information, refer to [Tempo GRPC API documentation]({{< relref "../api_docs" >}}).
{{% /admonition %}}

## Query blocks command

Iterate over all backend blocks and dump all data found for a given trace id.
Expand Down
2 changes: 1 addition & 1 deletion example/docker-compose/local/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ services:
- "9090:9090"

grafana:
image: grafana/grafana:10.2.2
image: grafana/grafana:10.4.2
volumes:
- ../shared/grafana-datasources.yaml:/etc/grafana/provisioning/datasources/datasources.yaml
environment:
Expand Down
4 changes: 3 additions & 1 deletion example/docker-compose/shared/tempo.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ metrics_generator:
remote_write:
- url: http://prometheus:9090/api/v1/write
send_exemplars: true
traces_storage:
path: /tmp/tempo/generator/traces

storage:
trace:
Expand All @@ -54,4 +56,4 @@ storage:
overrides:
defaults:
metrics_generator:
processors: [service-graphs, span-metrics] # enables metrics generator
processors: [service-graphs, span-metrics, local-blocks] # enables metrics generator
12 changes: 9 additions & 3 deletions modules/frontend/combiner/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@ type TResponse interface {
proto.Message
}

type PipelineResponse interface {
HTTPResponse() *http.Response
AdditionalData() any
}

type genericCombiner[T TResponse] struct {
mu sync.Mutex

current T // todo: state mgmt is mixed between the combiner and the various implementations. put it in one spot.

new func() T
combine func(partial T, final T) error
combine func(partial T, final T, resp PipelineResponse) error
finalize func(T) (T, error)
diff func(T) (T, error) // currently only implemented by the search combiner. required for streaming
quit func(T) bool
Expand All @@ -35,10 +40,11 @@ type genericCombiner[T TResponse] struct {
}

// AddResponse is used to add a http response to the combiner.
func (c *genericCombiner[T]) AddResponse(res *http.Response) error {
func (c *genericCombiner[T]) AddResponse(r PipelineResponse) error {
c.mu.Lock()
defer c.mu.Unlock()

res := r.HTTPResponse()
if res == nil {
return nil
}
Expand Down Expand Up @@ -68,7 +74,7 @@ func (c *genericCombiner[T]) AddResponse(res *http.Response) error {
return fmt.Errorf("error unmarshalling response body: %w", err)
}

if err := c.combine(partial, c.current); err != nil {
if err := c.combine(partial, c.current, r); err != nil {
c.httpRespBody = internalErrorMsg
return fmt.Errorf("error combining in combiner: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion modules/frontend/combiner/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
// Implementations must be thread-safe.
// TODO: StatusCode() is only used for multi-tenant support. Can we remove it?
type Combiner interface {
AddResponse(r *http.Response) error
AddResponse(r PipelineResponse) error
StatusCode() int
ShouldQuit() bool

Expand Down
85 changes: 85 additions & 0 deletions modules/frontend/combiner/metrics_query_range.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package combiner

import (
"sort"
"strings"

"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/traceql"
)

var _ GRPCCombiner[*tempopb.QueryRangeResponse] = (*genericCombiner[*tempopb.QueryRangeResponse])(nil)

// NewQueryRange returns a query range combiner.
func NewQueryRange() Combiner {
combiner := traceql.QueryRangeCombiner{}

return &genericCombiner[*tempopb.QueryRangeResponse]{
httpStatusCode: 200,
new: func() *tempopb.QueryRangeResponse { return &tempopb.QueryRangeResponse{} },
current: &tempopb.QueryRangeResponse{Metrics: &tempopb.SearchMetrics{}},
combine: func(partial *tempopb.QueryRangeResponse, _ *tempopb.QueryRangeResponse, resp PipelineResponse) error {
if partial.Metrics != nil {
// this is a coordination between the sharder and combiner. the sharder returns one response with summary metrics
// only. the combiner correctly takes and accumulates that job. however, if the response has no jobs this is
// an indicator this is a "real" response so we set CompletedJobs to 1 to increment in the combiner.
if partial.Metrics.TotalJobs == 0 {
partial.Metrics.CompletedJobs = 1
}
}

samplingRate := resp.AdditionalData()
if samplingRate != nil {
fRate := samplingRate.(float64)

if fRate != 0.0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can skip this when rate == 1.0. Maybe switch to fRate < 1.0 ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the 0.0 check is just a sanity check to prevent panics. fRate < 1.0 && fRate > 0.0. would there ever be a time we deflated metrics with the rate?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the sampling rate will always be between 0 and 1 due to the check in the sharder: https://github.com/grafana/tempo/pull/3584/files#diff-85974350e62b77bee6b5d88d537e3946839ccb1bba5f6dc89450a1c5489b5d4fR364

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch. updated to the suggested condition

// Set final sampling rate after integer rounding
// Multiply up the sampling rate
for _, series := range partial.Series {
for i, sample := range series.Samples {
sample.Value *= 1.0 / fRate
series.Samples[i] = sample
}
}
}
}

combiner.Combine(partial)

return nil
},
finalize: func(_ *tempopb.QueryRangeResponse) (*tempopb.QueryRangeResponse, error) {
resp := combiner.Response()
if resp == nil {
resp = &tempopb.QueryRangeResponse{}
}
sortResponse(resp)
return resp, nil
},
// todo: the diff method still returns the full response every time. find a way to diff
diff: func(_ *tempopb.QueryRangeResponse) (*tempopb.QueryRangeResponse, error) {
resp := combiner.Response()
if resp == nil {
resp = &tempopb.QueryRangeResponse{}
}
sortResponse(resp)
return resp, nil
},
}
}

func NewTypedQueryRange() GRPCCombiner[*tempopb.QueryRangeResponse] {
return NewQueryRange().(GRPCCombiner[*tempopb.QueryRangeResponse])
}

func sortResponse(res *tempopb.QueryRangeResponse) {
// Sort all output, series alphabetically, samples by time
sort.SliceStable(res.Series, func(i, j int) bool {
return strings.Compare(res.Series[i].PromLabels, res.Series[j].PromLabels) == -1
})
for _, series := range res.Series {
sort.Slice(series.Samples, func(i, j int) bool {
return series.Samples[i].TimestampMs < series.Samples[j].TimestampMs
})
}
}
Loading
Loading