Skip to content

Commit

Permalink
[CHANGE] tempo-query: switch from grpcPlugin to standalone (grafana#3840
Browse files Browse the repository at this point in the history
)

* tempo-query: switch from grpcPlugin to standalone

Jaeger 1.58 no longer supports gRPC storage plugins.

Signed-off-by: Benedikt Bongartz <[email protected]>

* add changelog entry

Signed-off-by: Benedikt Bongartz <[email protected]>

* update vendor

Signed-off-by: Benedikt Bongartz <[email protected]>

* fix: panic when requesting dependencies

Signed-off-by: Benedikt Bongartz <[email protected]>

* integration/e2e: adapt tempo-query test

Signed-off-by: Benedikt Bongartz <[email protected]>

* changelog: update

Signed-off-by: Benedikt Bongartz <[email protected]>

---------

Signed-off-by: Benedikt Bongartz <[email protected]>
  • Loading branch information
frzifus authored and pavolloffay committed Oct 10, 2024
1 parent b262af0 commit 9cfa39f
Show file tree
Hide file tree
Showing 22 changed files with 145 additions and 1,330 deletions.
3 changes: 0 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
## main / unreleased

* [FEATURE] TraceQL support for instrumentation scope [#3967](https://github.com/grafana/tempo/pull/3967) (@ie-pham)
* [ENHANCEMENT] Add bytes and spans received to usage stats [#3983](https://github.com/grafana/tempo/pull/3983) (@joe-elliott)

# v2.6.0-rc.0

* [CHANGE] **BREAKING CHANGE** Remove `autocomplete_filtering_enabled` feature flag [#3729](https://github.com/grafana/tempo/pull/3729) (@mapno)
Expand Down
15 changes: 9 additions & 6 deletions cmd/tempo-query/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
FROM jaegertracing/jaeger-query:1.57.0
FROM alpine:3.20 as certs
RUN apk --update add ca-certificates
ARG TARGETARCH
COPY bin/linux/tempo-query-${TARGETARCH} /tempo-query

ENV SPAN_STORAGE_TYPE=grpc-plugin \
GRPC_STORAGE_PLUGIN_BINARY=/tempo-query
RUN addgroup -g 10001 -S tempo && \
adduser -u 10001 -S tempo -G tempo

# Ensure /tmp dir exists, hashicorp plugins need a /tmp dir to exist.
RUN mkdir -p /tmp
USER 10001:10001

ARG TARGETARCH
COPY bin/linux/tempo-query-${TARGETARCH} /tempo-query

ENTRYPOINT ["/tempo-query"]
66 changes: 26 additions & 40 deletions cmd/tempo-query/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,25 @@ package main

import (
"flag"
"io"
"net"
"strings"

"github.com/hashicorp/go-hclog"
hcplugin "github.com/hashicorp/go-plugin"
"github.com/jaegertracing/jaeger/plugin/storage/grpc"
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/proto-gen/storage_v1"
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
"github.com/spf13/viper"
jaeger_config "github.com/uber/jaeger-client-go/config"
google_grpc "google.golang.org/grpc"
"google.golang.org/grpc/credentials"

"github.com/grafana/tempo/cmd/tempo-query/tempo"
)

func main() {
logger := hclog.New(&hclog.LoggerOptions{
Name: "jaeger-tempo",
Level: hclog.Error, // Jaeger only captures >= Warn, so don't bother logging below Warn
Level: hclog.Error,
JSONFormat: true,
})

Expand All @@ -44,52 +41,41 @@ func main() {
}
}

closer, err := initJaeger("tempo-grpc-plugin")
if err != nil {
logger.Error("failed to init tracer", "error", err)
}
defer closer.Close()

cfg := &tempo.Config{}
cfg.InitFromViper(v)

backend, err := tempo.New(cfg)
if err != nil {
logger.Error("failed to init tracer backend", "error", err)
}
plugin := &plugin{backend: backend}
grpc.ServeWithGRPCServer(&shared.PluginServices{
Store: plugin,
}, func(options []google_grpc.ServerOption) *google_grpc.Server {
return hcplugin.DefaultGRPCServer([]google_grpc.ServerOption{
google_grpc.UnaryInterceptor(otgrpc.OpenTracingServerInterceptor(opentracing.GlobalTracer())),
google_grpc.StreamInterceptor(otgrpc.OpenTracingStreamServerInterceptor(opentracing.GlobalTracer())),
})
})
}

type plugin struct {
backend *tempo.Backend
}
grpcOpts := []google_grpc.ServerOption{
google_grpc.UnaryInterceptor(otgrpc.OpenTracingServerInterceptor(opentracing.GlobalTracer())),
google_grpc.StreamInterceptor(otgrpc.OpenTracingStreamServerInterceptor(opentracing.GlobalTracer())),
}

func (p *plugin) DependencyReader() dependencystore.Reader {
return p.backend
}
if cfg.TLSEnabled {
creds, err := credentials.NewClientTLSFromFile(cfg.TLS.CertPath, cfg.TLS.ServerName)
if err != nil {
logger.Error("failed to load TLS credentials", "error", err)
} else {
grpcOpts = append(grpcOpts, google_grpc.Creds(creds))
}
}

func (p *plugin) SpanReader() spanstore.Reader {
return p.backend
}
srv := hcplugin.DefaultGRPCServer(grpcOpts)

func (p *plugin) SpanWriter() spanstore.Writer {
return p.backend
}
storage_v1.RegisterSpanReaderPluginServer(srv, backend)
storage_v1.RegisterDependenciesReaderPluginServer(srv, backend)
storage_v1.RegisterSpanWriterPluginServer(srv, backend)

func initJaeger(service string) (io.Closer, error) {
// .FromEnv() uses standard environment variables to allow for easy configuration
cfg, err := jaeger_config.FromEnv()
lis, err := net.Listen("tcp", cfg.Address)
if err != nil {
return nil, err
logger.Error("failed to listen", "error", err)
}

return cfg.InitGlobalTracer(service)
logger.Info("Server starts serving", "address", cfg.Address)
if err := srv.Serve(lis); err != nil {
logger.Error("failed to serve", "error", err)
}
}
6 changes: 6 additions & 0 deletions cmd/tempo-query/tempo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

// Config holds the configuration for redbull.
type Config struct {
Address string `yaml:"address"`
Backend string `yaml:"backend"`
TLSEnabled bool `yaml:"tls_enabled" category:"advanced"`
TLS tls.ClientConfig `yaml:",inline"`
Expand All @@ -17,6 +18,11 @@ type Config struct {

// InitFromViper initializes the options struct with values from Viper
func (c *Config) InitFromViper(v *viper.Viper) {
address := v.GetString("address")
if address == "" {
address = "0.0.0.0:7777"
}
c.Address = address
c.Backend = v.GetString("backend")
c.TLSEnabled = v.GetBool("tls_enabled")
c.TLS.CertPath = v.GetString("tls_cert_path")
Expand Down
99 changes: 70 additions & 29 deletions cmd/tempo-query/tempo/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
tlsCfg "github.com/grafana/dskit/crypto/tls"
"github.com/grafana/dskit/user"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/jaegertracing/jaeger/proto-gen/storage_v1"
"github.com/opentracing/opentracing-go"
ot_log "github.com/opentracing/opentracing-go/log"
"go.opentelemetry.io/collector/pdata/ptrace"
Expand Down Expand Up @@ -52,6 +53,12 @@ var tlsVersions = map[string]uint16{
"VersionTLS13": tls.VersionTLS13,
}

var (
_ storage_v1.SpanReaderPluginServer = (*Backend)(nil)
_ storage_v1.DependenciesReaderPluginServer = (*Backend)(nil)
_ storage_v1.SpanWriterPluginServer = (*Backend)(nil)
)

type Backend struct {
tempoBackend string
tlsEnabled bool
Expand Down Expand Up @@ -167,8 +174,8 @@ func tlsCipherSuites() map[string]uint16 {
return cipherSuites
}

func (b *Backend) GetDependencies(context.Context, time.Time, time.Duration) ([]jaeger.DependencyLink, error) {
return nil, nil
func (b *Backend) GetDependencies(context.Context, *storage_v1.GetDependenciesRequest) (*storage_v1.GetDependenciesResponse, error) {
return &storage_v1.GetDependenciesResponse{}, nil
}

func (b *Backend) apiSchema() string {
Expand All @@ -178,7 +185,21 @@ func (b *Backend) apiSchema() string {
return "http"
}

func (b *Backend) GetTrace(ctx context.Context, traceID jaeger.TraceID) (*jaeger.Trace, error) {
func (b *Backend) GetTrace(req *storage_v1.GetTraceRequest, stream storage_v1.SpanReaderPlugin_GetTraceServer) error {
jt, err := b.getTrace(stream.Context(), req.TraceID)
if err != nil {
return err
}

spans := make([]jaeger.Span, len(jt.Spans))
for i, span := range jt.Spans {
spans[i] = *span
}

return stream.Send(&storage_v1.SpansResponseChunk{Spans: spans})
}

func (b *Backend) getTrace(ctx context.Context, traceID jaeger.TraceID) (*jaeger.Trace, error) {
url := fmt.Sprintf("%s://%s/api/traces/%s", b.apiSchema(), b.tempoBackend, traceID)

span, ctx := opentracing.StartSpanFromContext(ctx, "tempo-query.GetTrace")
Expand Down Expand Up @@ -249,14 +270,18 @@ func (b *Backend) calculateTimeRange() (int64, int64) {
return start.Unix(), now.Unix()
}

func (b *Backend) GetServices(ctx context.Context) ([]string, error) {
func (b *Backend) GetServices(ctx context.Context, _ *storage_v1.GetServicesRequest) (*storage_v1.GetServicesResponse, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "tempo-query.GetOperations")
defer span.Finish()

return b.lookupTagValues(ctx, span, serviceSearchTag)
services, err := b.lookupTagValues(ctx, span, serviceSearchTag)
if err != nil {
return nil, err
}
return &storage_v1.GetServicesResponse{Services: services}, nil
}

func (b *Backend) GetOperations(ctx context.Context, _ jaeger_spanstore.OperationQueryParameters) ([]jaeger_spanstore.Operation, error) {
func (b *Backend) GetOperations(ctx context.Context, _ *storage_v1.GetOperationsRequest) (*storage_v1.GetOperationsResponse, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "tempo-query.GetOperations")
defer span.Finish()

Expand All @@ -265,32 +290,35 @@ func (b *Backend) GetOperations(ctx context.Context, _ jaeger_spanstore.Operatio
return nil, err
}

var operations []jaeger_spanstore.Operation
var operations []*storage_v1.Operation
for _, value := range tagValues {
operations = append(operations, jaeger_spanstore.Operation{
operations = append(operations, &storage_v1.Operation{
Name: value,
SpanKind: "",
})
}

return operations, nil
return &storage_v1.GetOperationsResponse{
OperationNames: tagValues,
Operations: operations,
}, nil
}

func (b *Backend) FindTraces(ctx context.Context, query *jaeger_spanstore.TraceQueryParameters) ([]*jaeger.Trace, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "tempo-query.FindTraces")
func (b *Backend) FindTraces(req *storage_v1.FindTracesRequest, stream storage_v1.SpanReaderPlugin_FindTracesServer) error {
span, ctx := opentracing.StartSpanFromContext(stream.Context(), "tempo-query.FindTraces")
defer span.Finish()

traceIDs, err := b.FindTraceIDs(ctx, query)
resp, err := b.FindTraceIDs(ctx, &storage_v1.FindTraceIDsRequest{Query: req.Query})
if err != nil {
return nil, err
return err
}

span.LogFields(ot_log.String("msg", fmt.Sprintf("Found %d trace IDs", len(traceIDs))))
span.LogFields(ot_log.String("msg", fmt.Sprintf("Found %d trace IDs", len(resp.TraceIDs))))

// for every traceID, get the full trace
var jaegerTraces []*jaeger.Trace
for _, traceID := range traceIDs {
trace, err := b.GetTrace(ctx, traceID)
for _, traceID := range resp.TraceIDs {
trace, err := b.getTrace(ctx, traceID)
if err != nil {
// TODO this seems to be an internal inconsistency error, ignore so we can still show the rest
span.LogFields(ot_log.Error(fmt.Errorf("could not get trace for traceID %v: %w", traceID, err)))
Expand All @@ -302,10 +330,19 @@ func (b *Backend) FindTraces(ctx context.Context, query *jaeger_spanstore.TraceQ

span.LogFields(ot_log.String("msg", fmt.Sprintf("Returning %d traces", len(jaegerTraces))))

return jaegerTraces, nil
for _, jt := range jaegerTraces {
spans := make([]jaeger.Span, len(jt.Spans))
for i, span := range jt.Spans {
spans[i] = *span
}
if err := stream.Send(&storage_v1.SpansResponseChunk{Spans: spans}); err != nil {
return err
}
}
return nil
}

func (b *Backend) FindTraceIDs(ctx context.Context, query *jaeger_spanstore.TraceQueryParameters) ([]jaeger.TraceID, error) {
func (b *Backend) FindTraceIDs(ctx context.Context, r *storage_v1.FindTraceIDsRequest) (*storage_v1.FindTraceIDsResponse, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "tempo-query.FindTraceIDs")
defer span.Finish()

Expand All @@ -315,16 +352,16 @@ func (b *Backend) FindTraceIDs(ctx context.Context, query *jaeger_spanstore.Trac
Path: "api/search",
}
urlQuery := url.Query()
urlQuery.Set(minDurationSearchTag, query.DurationMin.String())
urlQuery.Set(maxDurationSearchTag, query.DurationMax.String())
urlQuery.Set(numTracesSearchTag, strconv.Itoa(query.NumTraces))
urlQuery.Set(startTimeMaxTag, fmt.Sprintf("%d", query.StartTimeMax.Unix()))
urlQuery.Set(startTimeMinTag, fmt.Sprintf("%d", query.StartTimeMin.Unix()))
urlQuery.Set(minDurationSearchTag, r.Query.DurationMin.String())
urlQuery.Set(maxDurationSearchTag, r.Query.DurationMax.String())
urlQuery.Set(numTracesSearchTag, strconv.Itoa(int(r.Query.GetNumTraces())))
urlQuery.Set(startTimeMaxTag, fmt.Sprintf("%d", r.Query.StartTimeMax.Unix()))
urlQuery.Set(startTimeMinTag, fmt.Sprintf("%d", r.Query.StartTimeMin.Unix()))

queryParam, err := createTagsQueryParam(
query.ServiceName,
query.OperationName,
query.Tags,
r.Query.ServiceName,
r.Query.OperationName,
r.Query.Tags,
)
if err != nil {
return nil, fmt.Errorf("failed to create tags query parameter: %w", err)
Expand Down Expand Up @@ -373,7 +410,7 @@ func (b *Backend) FindTraceIDs(ctx context.Context, query *jaeger_spanstore.Trac
jaegerTraceIDs[i] = jaegerTraceID
}

return jaegerTraceIDs, nil
return &storage_v1.FindTraceIDsResponse{TraceIDs: jaegerTraceIDs}, nil
}

func createTagsQueryParam(service string, operation string, tags map[string]string) (string, error) {
Expand Down Expand Up @@ -441,8 +478,12 @@ func (b *Backend) lookupTagValues(ctx context.Context, span opentracing.Span, ta
return searchLookupResponse.TagValues, nil
}

func (b *Backend) WriteSpan(context.Context, *jaeger.Span) error {
return nil
func (b *Backend) WriteSpan(context.Context, *storage_v1.WriteSpanRequest) (*storage_v1.WriteSpanResponse, error) {
return nil, nil
}

func (b *Backend) Close(context.Context, *storage_v1.CloseWriterRequest) (*storage_v1.CloseWriterResponse, error) {
return nil, nil
}

func (b *Backend) newGetRequest(ctx context.Context, url string, span opentracing.Span) (*http.Request, error) {
Expand Down
1 change: 1 addition & 0 deletions integration/e2e/config-tempo-query.yaml
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
address: "0.0.0.0:7777"
backend: tempo:3200
services_query_duration: 1h
Loading

0 comments on commit 9cfa39f

Please sign in to comment.