Skip to content

Commit

Permalink
Wire up the query-frontend
Browse files Browse the repository at this point in the history
  • Loading branch information
zalegrala committed May 17, 2023
1 parent 82edc09 commit 176d2ae
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 7 deletions.
4 changes: 4 additions & 0 deletions cmd/tempo/app/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ func (t *App) initQueryFrontend() (services.Service, error) {

traceByIDHandler := middleware.Wrap(queryFrontend.TraceByIDHandler)
searchHandler := middleware.Wrap(queryFrontend.SearchHandler)
spanMetricsSummaryHandler := middleware.Wrap(queryFrontend.SpanMetricsSummaryHandler)

// register grpc server for queriers to connect to
frontend_v1pb.RegisterFrontendServer(t.Server.GRPC, t.frontend)
Expand All @@ -318,6 +319,9 @@ func (t *App) initQueryFrontend() (services.Service, error) {
t.Server.HTTP.Handle(addHTTPAPIPrefix(&t.cfg, api.PathSearchTagValues), searchHandler)
t.Server.HTTP.Handle(addHTTPAPIPrefix(&t.cfg, api.PathSearchTagValuesV2), searchHandler)

// http metrics endpoints
t.Server.HTTP.Handle(addHTTPAPIPrefix(&t.cfg, api.PathSpanMetricsSummary), spanMetricsSummaryHandler)

// the query frontend needs to have knowledge of the blocks so it can shard search jobs
t.store.EnablePolling(nil)

Expand Down
36 changes: 29 additions & 7 deletions modules/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@ import (
const (
traceByIDOp = "traces"
searchOp = "search"
metricsOp = "metrics"
)

type streamingSearchHandler func(req *tempopb.SearchRequest, srv tempopb.StreamingQuerier_SearchServer) error

type QueryFrontend struct {
TraceByIDHandler, SearchHandler http.Handler
streamingSearch streamingSearchHandler
logger log.Logger
TraceByIDHandler, SearchHandler, SpanMetricsSummaryHandler http.Handler
streamingSearch streamingSearchHandler
logger log.Logger
}

// New returns a new QueryFrontend
Expand Down Expand Up @@ -69,17 +70,21 @@ func New(cfg Config, next http.RoundTripper, o *overrides.Overrides, reader temp
// tracebyid middleware
traceByIDMiddleware := MergeMiddlewares(newTraceByIDMiddleware(cfg, logger), retryWare)
searchMiddleware := MergeMiddlewares(newSearchMiddleware(cfg, o, reader, logger), retryWare)
spanMetricsMiddleware := MergeMiddlewares(newSpanMetricsMiddleware(cfg, o, reader, logger), retryWare)

traceByIDCounter := queriesPerTenant.MustCurryWith(prometheus.Labels{"op": traceByIDOp})
searchCounter := queriesPerTenant.MustCurryWith(prometheus.Labels{"op": searchOp})
spanMetricsCounter := queriesPerTenant.MustCurryWith(prometheus.Labels{"op": metricsOp})

traces := traceByIDMiddleware.Wrap(next)
search := searchMiddleware.Wrap(next)
metrics := spanMetricsMiddleware.Wrap(next)
return &QueryFrontend{
TraceByIDHandler: newHandler(traces, traceByIDCounter, logger),
SearchHandler: newHandler(search, searchCounter, logger),
streamingSearch: newSearchStreamingHandler(cfg, o, retryWare.Wrap(next), reader, apiPrefix, logger),
logger: logger,
TraceByIDHandler: newHandler(traces, traceByIDCounter, logger),
SearchHandler: newHandler(search, searchCounter, logger),
SpanMetricsSummaryHandler: newHandler(metrics, spanMetricsCounter, logger),
streamingSearch: newSearchStreamingHandler(cfg, o, retryWare.Wrap(next), reader, apiPrefix, logger),
logger: logger,
}, nil
}

Expand Down Expand Up @@ -199,6 +204,23 @@ func newSearchMiddleware(cfg Config, o *overrides.Overrides, reader tempodb.Read
})
}

// newSpanMetricsMiddleware creates a new frontend middleware to handle search and search tags requests.
func newSpanMetricsMiddleware(cfg Config, o *overrides.Overrides, reader tempodb.Reader, logger log.Logger) Middleware {
return MiddlewareFunc(func(next http.RoundTripper) http.RoundTripper {
generatorRT := next

return RoundTripperFunc(func(r *http.Request) (*http.Response, error) {
// ingester search queries only need to be proxied to a single querier
orgID, _ := user.ExtractOrgID(r.Context())

r.Header.Set(user.OrgIDHeaderName, orgID)
r.RequestURI = buildUpstreamRequestURI(r.RequestURI, nil)

return generatorRT.RoundTrip(r)
})
})
}

// buildUpstreamRequestURI returns a uri based on the passed parameters
// we do this because weaveworks/common uses the RequestURI field to translate from http.Request to httpgrpc.Request
// https://github.com/weaveworks/common/blob/47e357f4e1badb7da17ad74bae63e228bdd76e8f/httpgrpc/server/server.go#L48
Expand Down

0 comments on commit 176d2ae

Please sign in to comment.