Skip to content

Commit

Permalink
Make OTLP receiver listen on all IPs again (#5739)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
- Resolves #5737

## Description of the changes
- Revert #5734
- Default OTLP receiver endpoints to `:port` instead of relying on OTEL
Collector defaults
- Clean up tests to use ephemeral ports when possible

## How was this change tested?
- Tested similar to #5734 by using `COLLECTOR_OTLP_HTTP_HOST_PORT=:4318`

---------

Signed-off-by: Yuri Shkuro <[email protected]>
  • Loading branch information
yurishkuro authored Jul 13, 2024
1 parent eae8e66 commit a54e2a5
Show file tree
Hide file tree
Showing 16 changed files with 156 additions and 174 deletions.
4 changes: 2 additions & 2 deletions cmd/collector/app/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,9 @@ func AddFlags(flags *flag.FlagSet) {
addGRPCFlags(flags, grpcServerFlagsCfg, ports.PortToHostPort(ports.CollectorGRPC))

flags.Bool(flagCollectorOTLPEnabled, true, "Enables OpenTelemetry OTLP receiver on dedicated HTTP and gRPC ports")
addHTTPFlags(flags, otlpServerFlagsCfg.HTTP, "")
addHTTPFlags(flags, otlpServerFlagsCfg.HTTP, ":4318")
corsOTLPFlags.AddFlags(flags)
addGRPCFlags(flags, otlpServerFlagsCfg.GRPC, "")
addGRPCFlags(flags, otlpServerFlagsCfg.GRPC, ":4317")

flags.String(flagZipkinHTTPHostPort, "", "The host:port (e.g. 127.0.0.1:9411 or :9411) of the collector's Zipkin server (disabled by default)")
flags.Bool(flagZipkinKeepAliveEnabled, true, "KeepAlive configures allow Keep-Alive for Zipkin HTTP server (enabled by default)")
Expand Down
1 change: 0 additions & 1 deletion cmd/collector/app/server/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ func TestCollectorReflection(t *testing.T) {

grpctest.ReflectionServiceValidator{
HostPort: params.HostPortActual,
Server: server,
ExpectedServices: []string{
"jaeger.api_v2.CollectorService",
"jaeger.api_v2.SamplingManager",
Expand Down
12 changes: 8 additions & 4 deletions cmd/jaeger/internal/extension/jaegerquery/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package jaegerquery
import (
"github.com/asaskevich/govalidator"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confighttp"

queryApp "github.com/jaegertracing/jaeger/cmd/query/app"
Expand All @@ -18,10 +19,13 @@ var _ component.ConfigValidator = (*Config)(nil)
type Config struct {
queryApp.QueryOptionsBase `mapstructure:",squash"`

TraceStoragePrimary string `valid:"required" mapstructure:"trace_storage"`
TraceStorageArchive string `valid:"optional" mapstructure:"trace_storage_archive"`
confighttp.ServerConfig `mapstructure:",squash"`
Tenancy tenancy.Options `mapstructure:"multi_tenancy"`
TraceStoragePrimary string `valid:"required" mapstructure:"trace_storage"`
TraceStorageArchive string `valid:"optional" mapstructure:"trace_storage_archive"`

HTTP confighttp.ServerConfig `mapstructure:",squash"`
GRPC configgrpc.ServerConfig `mapstructure:",squash"`

Tenancy tenancy.Options `mapstructure:"multi_tenancy"`
}

func (cfg *Config) Validate() error {
Expand Down
10 changes: 9 additions & 1 deletion cmd/jaeger/internal/extension/jaegerquery/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/extension"

"github.com/jaegertracing/jaeger/ports"
Expand All @@ -25,9 +27,15 @@ func NewFactory() extension.Factory {

func createDefaultConfig() component.Config {
return &Config{
ServerConfig: confighttp.ServerConfig{
HTTP: confighttp.ServerConfig{
Endpoint: ports.PortToHostPort(ports.QueryHTTP),
},
GRPC: configgrpc.ServerConfig{
NetAddr: confignet.AddrConfig{
Endpoint: ports.PortToHostPort(ports.QueryGRPC),
Transport: confignet.TransportTypeTCP,
},
},
}
}

Expand Down
8 changes: 4 additions & 4 deletions cmd/jaeger/internal/extension/jaegerquery/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/jaegertracing/jaeger/pkg/telemetery"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/plugin/metrics/disabled"
"github.com/jaegertracing/jaeger/ports"
)

var (
Expand Down Expand Up @@ -127,9 +126,10 @@ func (s *server) makeQueryOptions() *queryApp.QueryOptions {
return &queryApp.QueryOptions{
QueryOptionsBase: s.config.QueryOptionsBase,

// TODO expose via config
HTTPHostPort: ports.PortToHostPort(ports.QueryHTTP),
GRPCHostPort: ports.PortToHostPort(ports.QueryGRPC),
// TODO utilize OTEL helpers for creating HTTP/GRPC servers
HTTPHostPort: s.config.HTTP.Endpoint,
GRPCHostPort: s.config.GRPC.NetAddr.Endpoint,
// TODO handle TLS
}
}

Expand Down
33 changes: 31 additions & 2 deletions cmd/jaeger/internal/extension/jaegerquery/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ package jaegerquery
import (
"context"
"fmt"
"net/http"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -17,6 +19,7 @@ import (

"github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage"
"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/internal/grpctest"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/storage"
Expand Down Expand Up @@ -160,14 +163,40 @@ func TestServerStart(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
telemetrySettings := component.TelemetrySettings{
Logger: zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())),
Logger: zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())),
ReportStatus: func(*component.StatusEvent) {},
}
tt.config.HTTP.Endpoint = ":0"
tt.config.GRPC.NetAddr.Endpoint = ":0"
server := newServer(tt.config, telemetrySettings)
err := server.Start(context.Background(), host)

if tt.expectedErr == "" {
require.NoError(t, err)
defer server.Shutdown(context.Background())
// We need to wait for servers to become available.
// Otherwise, we could call shutdown before the servers are even started,
// which could cause flaky code coverage by going through error cases.
require.Eventually(t,
func() bool {
resp, err := http.Get(fmt.Sprintf("http://%s/", server.server.HTTPAddr()))
if err != nil {
return false
}
defer resp.Body.Close()
return resp.StatusCode == http.StatusOK
},
10*time.Second,
100*time.Millisecond,
"server not started")
grpctest.ReflectionServiceValidator{
HostPort: server.server.GRPCAddr(),
ExpectedServices: []string{
"jaeger.api_v2.QueryService",
"jaeger.api_v3.QueryService",
"jaeger.api_v2.metrics.MetricsQueryService",
"grpc.health.v1.Health",
},
}.Execute(t)
} else {
require.ErrorContains(t, err, tt.expectedErr)
}
Expand Down
14 changes: 7 additions & 7 deletions cmd/query/app/handler_archive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestGetArchivedTrace_NotFound(t *testing.T) {
} {
tc := tc // capture loop var
t.Run(tc.name, func(t *testing.T) {
withTestServer(func(ts *testServer) {
withTestServer(t, func(ts *testServer) {
ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
Return(nil, spanstore.ErrTraceNotFound).Once()
var response structuredResponse
Expand All @@ -66,7 +66,7 @@ func TestGetArchivedTraceSuccess(t *testing.T) {
mockReader := &spanstoremocks.Reader{}
mockReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
Return(mockTrace, nil).Once()
withTestServer(func(ts *testServer) {
withTestServer(t, func(ts *testServer) {
// make main reader return NotFound
ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
Return(nil, spanstore.ErrTraceNotFound).Once()
Expand All @@ -81,7 +81,7 @@ func TestGetArchivedTraceSuccess(t *testing.T) {

// Test failure in parsing trace ID.
func TestArchiveTrace_BadTraceID(t *testing.T) {
withTestServer(func(ts *testServer) {
withTestServer(t, func(ts *testServer) {
var response structuredResponse
err := postJSON(ts.server.URL+"/api/archive/badtraceid", []string{}, &response)
require.Error(t, err)
Expand All @@ -95,7 +95,7 @@ func TestArchiveTrace_TraceNotFound(t *testing.T) {
Return(nil, spanstore.ErrTraceNotFound).Once()
mockWriter := &spanstoremocks.Writer{}
// Not actually going to write the trace, so no need to define mockWriter action
withTestServer(func(ts *testServer) {
withTestServer(t, func(ts *testServer) {
// make main reader return NotFound
ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
Return(nil, spanstore.ErrTraceNotFound).Once()
Expand All @@ -106,7 +106,7 @@ func TestArchiveTrace_TraceNotFound(t *testing.T) {
}

func TestArchiveTrace_NoStorage(t *testing.T) {
withTestServer(func(ts *testServer) {
withTestServer(t, func(ts *testServer) {
var response structuredResponse
err := postJSON(ts.server.URL+"/api/archive/"+mockTraceID.String(), []string{}, &response)
require.EqualError(t, err, `500 error from server: {"data":null,"total":0,"limit":0,"offset":0,"errors":[{"code":500,"msg":"archive span storage was not configured"}]}`+"\n")
Expand All @@ -117,7 +117,7 @@ func TestArchiveTrace_Success(t *testing.T) {
mockWriter := &spanstoremocks.Writer{}
mockWriter.On("WriteSpan", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*model.Span")).
Return(nil).Times(2)
withTestServer(func(ts *testServer) {
withTestServer(t, func(ts *testServer) {
ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
Return(mockTrace, nil).Once()
var response structuredResponse
Expand All @@ -130,7 +130,7 @@ func TestArchiveTrace_WriteErrors(t *testing.T) {
mockWriter := &spanstoremocks.Writer{}
mockWriter.On("WriteSpan", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*model.Span")).
Return(errors.New("cannot save")).Times(2)
withTestServer(func(ts *testServer) {
withTestServer(t, func(ts *testServer) {
ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
Return(mockTrace, nil).Once()
var response structuredResponse
Expand Down
19 changes: 8 additions & 11 deletions cmd/query/app/handler_deps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,7 @@ func TestFilterDependencies(t *testing.T) {
}

func TestGetDependenciesSuccess(t *testing.T) {
ts := initializeTestServer()
defer ts.server.Close()
ts := initializeTestServer(t)
expectedDependencies := []model.DependencyLink{{Parent: "killer", Child: "queen", CallCount: 12}}
endTs := time.Unix(0, 1476374248550*millisToNanosMultiplier)
ts.dependencyReader.On("GetDependencies",
Expand All @@ -332,29 +331,27 @@ func TestGetDependenciesSuccess(t *testing.T) {
}

func TestGetDependenciesCassandraFailure(t *testing.T) {
ts := initializeTestServer()
defer ts.server.Close()
ts := initializeTestServer(t)
endTs := time.Unix(0, 1476374248550*millisToNanosMultiplier)
ts.dependencyReader.On("GetDependencies", endTs, defaultDependencyLookbackDuration).Return(nil, errStorage).Times(1)
ts.dependencyReader.On("GetDependencies",
mock.Anything, // context
endTs,
defaultDependencyLookbackDuration).Return(nil, errStorage).Times(1)

var response structuredResponse
err := getJSON(ts.server.URL+"/api/dependencies?endTs=1476374248550&service=testing", &response)
require.Error(t, err)
}

func TestGetDependenciesEndTimeParsingFailure(t *testing.T) {
ts := initializeTestServer()
defer ts.server.Close()

ts := initializeTestServer(t)
var response structuredResponse
err := getJSON(ts.server.URL+"/api/dependencies?endTs=shazbot&service=testing", &response)
require.Error(t, err)
}

func TestGetDependenciesLookbackParsingFailure(t *testing.T) {
ts := initializeTestServer()
defer ts.server.Close()

ts := initializeTestServer(t)
var response structuredResponse
err := getJSON(ts.server.URL+"/api/dependencies?endTs=1476374248550&service=testing&lookback=shazbot", &response)
require.Error(t, err)
Expand Down
Loading

0 comments on commit a54e2a5

Please sign in to comment.