Skip to content

Commit

Permalink
Backport tempo-query changes to 2.6 (#4170)
Browse files Browse the repository at this point in the history
* [CHANGE] tempo-query: switch from grpcPlugin to standalone (#3840)

* tempo-query: switch from grpcPlugin to standalone

Jaeger 1.58 no longer supports gRPC storage plugins.

Signed-off-by: Benedikt Bongartz <[email protected]>

* add changelog entry

Signed-off-by: Benedikt Bongartz <[email protected]>

* update vendor

Signed-off-by: Benedikt Bongartz <[email protected]>

* fix: panic when requesting dependencies

Signed-off-by: Benedikt Bongartz <[email protected]>

* integration/e2e: adapt tempo-query test

Signed-off-by: Benedikt Bongartz <[email protected]>

* changelog: update

Signed-off-by: Benedikt Bongartz <[email protected]>

---------

Signed-off-by: Benedikt Bongartz <[email protected]>

* Backport tempo-query: concurrent trace search and opentelemetry

Signed-off-by: Pavol Loffay <[email protected]>

* Fix

Signed-off-by: Pavol Loffay <[email protected]>

---------

Signed-off-by: Benedikt Bongartz <[email protected]>
Signed-off-by: Pavol Loffay <[email protected]>
Co-authored-by: Ben B. <[email protected]>
  • Loading branch information
pavolloffay and frzifus authored Oct 10, 2024
1 parent 314960a commit 8981e19
Show file tree
Hide file tree
Showing 21 changed files with 260 additions and 1,382 deletions.
15 changes: 9 additions & 6 deletions cmd/tempo-query/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
FROM jaegertracing/jaeger-query:1.57.0
FROM alpine:3.20 as certs
RUN apk --update add ca-certificates
ARG TARGETARCH
COPY bin/linux/tempo-query-${TARGETARCH} /tempo-query

ENV SPAN_STORAGE_TYPE=grpc-plugin \
GRPC_STORAGE_PLUGIN_BINARY=/tempo-query
RUN addgroup -g 10001 -S tempo && \
adduser -u 10001 -S tempo -G tempo

# Ensure /tmp dir exists, hashicorp plugins need a /tmp dir to exist.
RUN mkdir -p /tmp
USER 10001:10001

ARG TARGETARCH
COPY bin/linux/tempo-query-${TARGETARCH} /tempo-query

ENTRYPOINT ["/tempo-query"]
87 changes: 38 additions & 49 deletions cmd/tempo-query/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,30 @@ package main

import (
"flag"
"io"
"net"
"os"
"strings"

"github.com/hashicorp/go-hclog"
hcplugin "github.com/hashicorp/go-plugin"
"github.com/jaegertracing/jaeger/plugin/storage/grpc"
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/proto-gen/storage_v1"
zaplogfmt "github.com/jsternberg/zap-logfmt"
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
"github.com/spf13/viper"
jaeger_config "github.com/uber/jaeger-client-go/config"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
google_grpc "google.golang.org/grpc"
"google.golang.org/grpc/credentials"

"github.com/grafana/tempo/cmd/tempo-query/tempo"
)

func main() {
logger := hclog.New(&hclog.LoggerOptions{
Name: "jaeger-tempo",
Level: hclog.Error, // Jaeger only captures >= Warn, so don't bother logging below Warn
JSONFormat: true,
})
config := zap.NewProductionEncoderConfig()
logger := zap.New(zapcore.NewCore(
zaplogfmt.NewEncoder(config),
os.Stdout,
zapcore.InfoLevel,
))

var configPath string
flag.StringVar(&configPath, "config", "", "A path to the plugin's configuration file")
Expand All @@ -40,56 +40,45 @@ func main() {

err := v.ReadInConfig()
if err != nil {
logger.Error("failed to parse configuration file", "error", err)
logger.Error("failed to parse configuration file", zap.Error(err))
}
}

closer, err := initJaeger("tempo-grpc-plugin")
if err != nil {
logger.Error("failed to init tracer", "error", err)
}
defer closer.Close()

cfg := &tempo.Config{}
cfg.InitFromViper(v)

backend, err := tempo.New(cfg)
backend, err := tempo.New(logger, cfg)
if err != nil {
logger.Error("failed to init tracer backend", "error", err)
logger.Error("failed to init tracer backend", zap.Error(err))
}
plugin := &plugin{backend: backend}
grpc.ServeWithGRPCServer(&shared.PluginServices{
Store: plugin,
}, func(options []google_grpc.ServerOption) *google_grpc.Server {
return hcplugin.DefaultGRPCServer([]google_grpc.ServerOption{
google_grpc.UnaryInterceptor(otgrpc.OpenTracingServerInterceptor(opentracing.GlobalTracer())),
google_grpc.StreamInterceptor(otgrpc.OpenTracingStreamServerInterceptor(opentracing.GlobalTracer())),
})
})
}

type plugin struct {
backend *tempo.Backend
}
grpcOpts := []google_grpc.ServerOption{
google_grpc.UnaryInterceptor(otgrpc.OpenTracingServerInterceptor(opentracing.GlobalTracer())),
google_grpc.StreamInterceptor(otgrpc.OpenTracingStreamServerInterceptor(opentracing.GlobalTracer())),
}

func (p *plugin) DependencyReader() dependencystore.Reader {
return p.backend
}
if cfg.TLSEnabled {
creds, err := credentials.NewClientTLSFromFile(cfg.TLS.CertPath, cfg.TLS.ServerName)
if err != nil {
logger.Error("failed to load TLS credentials", zap.Error(err))
} else {
grpcOpts = append(grpcOpts, google_grpc.Creds(creds))
}
}

func (p *plugin) SpanReader() spanstore.Reader {
return p.backend
}
srv := google_grpc.NewServer(grpcOpts...)

func (p *plugin) SpanWriter() spanstore.Writer {
return p.backend
}
storage_v1.RegisterSpanReaderPluginServer(srv, backend)
storage_v1.RegisterDependenciesReaderPluginServer(srv, backend)
storage_v1.RegisterSpanWriterPluginServer(srv, backend)

func initJaeger(service string) (io.Closer, error) {
// .FromEnv() uses standard environment variables to allow for easy configuration
cfg, err := jaeger_config.FromEnv()
lis, err := net.Listen("tcp", cfg.Address)
if err != nil {
return nil, err
logger.Error("failed to listen", zap.Error(err))
}

return cfg.InitGlobalTracer(service)
logger.Info("Server starts serving", zap.String("address", cfg.Address))
if err := srv.Serve(lis); err != nil {
logger.Error("failed to serve", zap.Error(err))
}
}
12 changes: 12 additions & 0 deletions cmd/tempo-query/tempo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,23 @@ import (

// Config holds the configuration for redbull.
type Config struct {
Address string `yaml:"address"`
Backend string `yaml:"backend"`
TLSEnabled bool `yaml:"tls_enabled" category:"advanced"`
TLS tls.ClientConfig `yaml:",inline"`
TenantHeaderKey string `yaml:"tenant_header_key"`
QueryServicesDuration string `yaml:"services_query_duration"`
// FindTracesConcurrentRequests defines how many concurrent requests trace search submits to get a trace.
FindTracesConcurrentRequests int `yaml:"find_traces_concurrent_requests"`
}

// InitFromViper initializes the options struct with values from Viper
func (c *Config) InitFromViper(v *viper.Viper) {
address := v.GetString("address")
if address == "" {
address = "0.0.0.0:7777"
}
c.Address = address
c.Backend = v.GetString("backend")
c.TLSEnabled = v.GetBool("tls_enabled")
c.TLS.CertPath = v.GetString("tls_cert_path")
Expand All @@ -27,7 +35,11 @@ func (c *Config) InitFromViper(v *viper.Viper) {
c.TLS.CipherSuites = v.GetString("tls_cipher_suites")
c.TLS.MinVersion = v.GetString("tls_min_version")
c.QueryServicesDuration = v.GetString("services_query_duration")
c.FindTracesConcurrentRequests = v.GetInt("find_traces_concurrent_requests")

if c.FindTracesConcurrentRequests == 0 {
c.FindTracesConcurrentRequests = 1
}
tenantHeader := v.GetString("tenant_header_key")
if tenantHeader == "" {
tenantHeader = shared.BearerTokenKey
Expand Down
Loading

0 comments on commit 8981e19

Please sign in to comment.