From e96a3038a43c7eee63005e8fb6524b172d29fcc8 Mon Sep 17 00:00:00 2001 From: Jack Berg Date: Wed, 3 Jan 2024 11:19:21 -0600 Subject: [PATCH 1/3] Use minimal fallback managed channel when none is specified --- .../otlp/trace/OltpExporterBenchmark.java | 1 + .../AbstractGrpcTelemetryExporterTest.java | 21 ++++++++++++ ...anagedChannelTelemetryExporterBuilder.java | 21 +++++++----- .../internal/UpstreamGrpcSender.java | 10 +++++- .../internal/UpstreamGrpcSenderProvider.java | 32 ++++++++++++++++++- 5 files changed, 75 insertions(+), 10 deletions(-) diff --git a/exporters/otlp/all/src/jmh/java/io/opentelemetry/exporter/otlp/trace/OltpExporterBenchmark.java b/exporters/otlp/all/src/jmh/java/io/opentelemetry/exporter/otlp/trace/OltpExporterBenchmark.java index 1ad124271a3..ef620333474 100644 --- a/exporters/otlp/all/src/jmh/java/io/opentelemetry/exporter/otlp/trace/OltpExporterBenchmark.java +++ b/exporters/otlp/all/src/jmh/java/io/opentelemetry/exporter/otlp/trace/OltpExporterBenchmark.java @@ -85,6 +85,7 @@ public void setUp() { "span", new UpstreamGrpcSender<>( MarshalerTraceServiceGrpc.newFutureStub(defaultGrpcChannel, null), + /* shutdownChannel= */ false, 10, Collections::emptyMap), MeterProvider::noop); diff --git a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractGrpcTelemetryExporterTest.java b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractGrpcTelemetryExporterTest.java index c2490029aac..0b3d79a2690 100644 --- a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractGrpcTelemetryExporterTest.java +++ b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractGrpcTelemetryExporterTest.java @@ -5,6 +5,7 @@ package io.opentelemetry.exporter.otlp.testing.internal; +import static org.assertj.core.api.Assertions.as; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -23,8 +24,10 @@ import com.linecorp.armeria.testing.junit5.server.SelfSignedCertificateExtension; import com.linecorp.armeria.testing.junit5.server.ServerExtension; import io.github.netmikey.logunit.api.LogCapturer; +import io.grpc.ManagedChannel; import io.opentelemetry.exporter.internal.TlsUtil; import io.opentelemetry.exporter.internal.grpc.GrpcExporter; +import io.opentelemetry.exporter.internal.grpc.MarshalerServiceStub; import io.opentelemetry.exporter.internal.marshal.Marshaler; import io.opentelemetry.internal.testing.slf4j.SuppressLogger; import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest; @@ -61,6 +64,7 @@ import javax.net.ssl.TrustManager; import javax.net.ssl.X509KeyManager; import javax.net.ssl.X509TrustManager; +import org.assertj.core.api.InstanceOfAssertFactories; import org.assertj.core.api.iterable.ThrowingExtractor; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; @@ -215,6 +219,23 @@ void reset() { httpRequests.clear(); } + @Test + void minimalChannel() { + // Test that UpstreamGrpcSender uses minimal fallback managed channel, so skip for + // OkHttpGrpcSender + assumeThat(exporter.unwrap()) + .extracting("delegate.grpcSender") + .matches(sender -> sender.getClass().getSimpleName().equals("UpstreamGrpcSender")); + // When no channel is explicitly set, should fall back to a minimally configured managed channel + TelemetryExporter exporter = exporterBuilder().build(); + assertThat(exporter.shutdown().join(10, TimeUnit.SECONDS).isSuccess()).isTrue(); + assertThat(exporter.unwrap()) + .extracting( + "delegate.grpcSender.stub", + as(InstanceOfAssertFactories.type(MarshalerServiceStub.class))) + .satisfies(stub -> assertThat(((ManagedChannel) stub.getChannel()).isShutdown()).isTrue()); + } + @Test void export() { List telemetry = Collections.singletonList(generateFakeTelemetry()); diff --git a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/ManagedChannelTelemetryExporterBuilder.java b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/ManagedChannelTelemetryExporterBuilder.java index be7d97b9678..bc0ecef22c3 100644 --- a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/ManagedChannelTelemetryExporterBuilder.java +++ b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/ManagedChannelTelemetryExporterBuilder.java @@ -22,7 +22,9 @@ import java.time.Duration; import java.util.Collection; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import javax.annotation.Nullable; import javax.net.ssl.SSLContext; @@ -158,16 +160,19 @@ public TelemetryExporterBuilder setChannel(Object channel) { @Override public TelemetryExporter build() { - requireNonNull(channelBuilder, "channel"); + AtomicReference shutdownCallbackRef = new AtomicReference<>(() -> {}); + if (channelBuilder != null) { + try { + setSslContext(channelBuilder, tlsConfigHelper); + } catch (SSLException e) { + throw new IllegalStateException(e); + } - try { - setSslContext(channelBuilder, tlsConfigHelper); - } catch (SSLException e) { - throw new IllegalStateException(e); + ManagedChannel channel = channelBuilder.build(); + delegate.setChannel(channel); + shutdownCallbackRef.set(channel::shutdownNow); } - ManagedChannel channel = channelBuilder.build(); - delegate.setChannel(channel); TelemetryExporter delegateExporter = delegate.build(); return new TelemetryExporter() { @Override @@ -182,7 +187,7 @@ public CompletableResultCode export(Collection items) { @Override public CompletableResultCode shutdown() { - channel.shutdownNow(); + Optional.ofNullable(shutdownCallbackRef.get()).ifPresent(Runnable::run); return delegateExporter.shutdown(); } }; diff --git a/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSender.java b/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSender.java index 28f813accc7..f8fa234e4e6 100644 --- a/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSender.java +++ b/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSender.java @@ -8,6 +8,7 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.MoreExecutors; +import io.grpc.ManagedChannel; import io.grpc.Metadata; import io.grpc.Status; import io.grpc.stub.MetadataUtils; @@ -32,16 +33,19 @@ public final class UpstreamGrpcSender implements GrpcSender { private final MarshalerServiceStub stub; + private final boolean shutdownChannel; private final long timeoutNanos; private final Supplier>> headersSupplier; /** Creates a new {@link UpstreamGrpcSender}. */ public UpstreamGrpcSender( MarshalerServiceStub stub, + boolean shutdownChannel, long timeoutNanos, Supplier>> headersSupplier) { - this.timeoutNanos = timeoutNanos; this.stub = stub; + this.shutdownChannel = shutdownChannel; + this.timeoutNanos = timeoutNanos; this.headersSupplier = headersSupplier; } @@ -82,6 +86,10 @@ public void onFailure(Throwable t) { @Override public CompletableResultCode shutdown() { + if (shutdownChannel) { + ManagedChannel channel = (ManagedChannel) stub.getChannel(); + channel.shutdownNow(); + } return CompletableResultCode.ofSuccess(); } } diff --git a/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSenderProvider.java b/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSenderProvider.java index 34200681980..a4effdbd09c 100644 --- a/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSenderProvider.java +++ b/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSenderProvider.java @@ -7,6 +7,8 @@ import io.grpc.Channel; import io.grpc.Codec; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; import io.opentelemetry.exporter.internal.grpc.GrpcSender; import io.opentelemetry.exporter.internal.grpc.GrpcSenderProvider; import io.opentelemetry.exporter.internal.grpc.MarshalerServiceStub; @@ -41,6 +43,13 @@ public GrpcSender createSender( @Nullable RetryPolicy retryPolicy, @Nullable SSLContext sslContext, @Nullable X509TrustManager trustManager) { + boolean shutdownChannel = false; + if (managedChannel == null) { + // Shutdown the channel as part of the exporter shutdown sequence if + shutdownChannel = true; + managedChannel = minimalFallbackManagedChannel(endpoint); + } + String authorityOverride = null; Map> headers = headersSupplier.get(); if (headers != null) { @@ -58,6 +67,27 @@ public GrpcSender createSender( .apply((Channel) managedChannel, authorityOverride) .withCompression(codec.getMessageEncoding()); - return new UpstreamGrpcSender<>(stub, timeoutNanos, headersSupplier); + return new UpstreamGrpcSender<>(stub, shutdownChannel, timeoutNanos, headersSupplier); + } + + /** + * If {@link ManagedChannel} is not explicitly set, provide a minimally configured fallback + * channel to avoid failing initialization. + * + *

This is required to accommodate autoconfigure with {@code + * opentelemetry-exporter-sender-grpc-managed-channel} which will always fail to initialize + * without a fallback channel since there isn't an opportunity to explicitly set the channel. + * + *

This only incorporates the target address, port, and whether to use plain text. All + * additional settings are intentionally ignored and must be configured with an explicitly set + * {@link ManagedChannel}. + */ + private static ManagedChannel minimalFallbackManagedChannel(URI endpoint) { + ManagedChannelBuilder channelBuilder = + ManagedChannelBuilder.forAddress(endpoint.getHost(), endpoint.getPort()); + if (!endpoint.getScheme().equals("https")) { + channelBuilder.usePlaintext(); + } + return channelBuilder.build(); } } From ad241d74c8e8430d512b81f1a62f29908b687e3c Mon Sep 17 00:00:00 2001 From: Jack Berg Date: Wed, 3 Jan 2024 15:33:26 -0600 Subject: [PATCH 2/3] Remove atomic ref --- .../ManagedChannelTelemetryExporterBuilder.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/ManagedChannelTelemetryExporterBuilder.java b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/ManagedChannelTelemetryExporterBuilder.java index bc0ecef22c3..4d2bbd28ce5 100644 --- a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/ManagedChannelTelemetryExporterBuilder.java +++ b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/ManagedChannelTelemetryExporterBuilder.java @@ -22,9 +22,7 @@ import java.time.Duration; import java.util.Collection; import java.util.Map; -import java.util.Optional; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import javax.annotation.Nullable; import javax.net.ssl.SSLContext; @@ -160,7 +158,7 @@ public TelemetryExporterBuilder setChannel(Object channel) { @Override public TelemetryExporter build() { - AtomicReference shutdownCallbackRef = new AtomicReference<>(() -> {}); + Runnable callbackRef; if (channelBuilder != null) { try { setSslContext(channelBuilder, tlsConfigHelper); @@ -170,7 +168,9 @@ public TelemetryExporter build() { ManagedChannel channel = channelBuilder.build(); delegate.setChannel(channel); - shutdownCallbackRef.set(channel::shutdownNow); + callbackRef = channel::shutdownNow; + } else { + callbackRef = () -> {}; } TelemetryExporter delegateExporter = delegate.build(); @@ -187,7 +187,7 @@ public CompletableResultCode export(Collection items) { @Override public CompletableResultCode shutdown() { - Optional.ofNullable(shutdownCallbackRef.get()).ifPresent(Runnable::run); + callbackRef.run(); return delegateExporter.shutdown(); } }; From 42cd12e8f694547a7544e0187935ff35bf6be2e3 Mon Sep 17 00:00:00 2001 From: Jack Berg Date: Wed, 3 Jan 2024 17:08:25 -0600 Subject: [PATCH 3/3] Rename to shutdownCallback --- .../internal/ManagedChannelTelemetryExporterBuilder.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/ManagedChannelTelemetryExporterBuilder.java b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/ManagedChannelTelemetryExporterBuilder.java index 4d2bbd28ce5..e5545aa41bd 100644 --- a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/ManagedChannelTelemetryExporterBuilder.java +++ b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/ManagedChannelTelemetryExporterBuilder.java @@ -158,7 +158,7 @@ public TelemetryExporterBuilder setChannel(Object channel) { @Override public TelemetryExporter build() { - Runnable callbackRef; + Runnable shutdownCallback; if (channelBuilder != null) { try { setSslContext(channelBuilder, tlsConfigHelper); @@ -168,9 +168,9 @@ public TelemetryExporter build() { ManagedChannel channel = channelBuilder.build(); delegate.setChannel(channel); - callbackRef = channel::shutdownNow; + shutdownCallback = channel::shutdownNow; } else { - callbackRef = () -> {}; + shutdownCallback = () -> {}; } TelemetryExporter delegateExporter = delegate.build(); @@ -187,7 +187,7 @@ public CompletableResultCode export(Collection items) { @Override public CompletableResultCode shutdown() { - callbackRef.run(); + shutdownCallback.run(); return delegateExporter.shutdown(); } };