Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use minimal fallback managed channel when none is specified #6110

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public void setUp() {
"span",
new UpstreamGrpcSender<>(
MarshalerTraceServiceGrpc.newFutureStub(defaultGrpcChannel, null),
/* shutdownChannel= */ false,
10,
Collections::emptyMap),
MeterProvider::noop);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package io.opentelemetry.exporter.otlp.testing.internal;

import static org.assertj.core.api.Assertions.as;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand All @@ -23,8 +24,10 @@
import com.linecorp.armeria.testing.junit5.server.SelfSignedCertificateExtension;
import com.linecorp.armeria.testing.junit5.server.ServerExtension;
import io.github.netmikey.logunit.api.LogCapturer;
import io.grpc.ManagedChannel;
import io.opentelemetry.exporter.internal.TlsUtil;
import io.opentelemetry.exporter.internal.grpc.GrpcExporter;
import io.opentelemetry.exporter.internal.grpc.MarshalerServiceStub;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.internal.testing.slf4j.SuppressLogger;
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest;
Expand Down Expand Up @@ -61,6 +64,7 @@
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509KeyManager;
import javax.net.ssl.X509TrustManager;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.assertj.core.api.iterable.ThrowingExtractor;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -215,6 +219,23 @@ void reset() {
httpRequests.clear();
}

@Test
void minimalChannel() {
// Test that UpstreamGrpcSender uses minimal fallback managed channel, so skip for
// OkHttpGrpcSender
assumeThat(exporter.unwrap())
.extracting("delegate.grpcSender")
.matches(sender -> sender.getClass().getSimpleName().equals("UpstreamGrpcSender"));
// When no channel is explicitly set, should fall back to a minimally configured managed channel
TelemetryExporter<?> exporter = exporterBuilder().build();
assertThat(exporter.shutdown().join(10, TimeUnit.SECONDS).isSuccess()).isTrue();
assertThat(exporter.unwrap())
.extracting(
"delegate.grpcSender.stub",
as(InstanceOfAssertFactories.type(MarshalerServiceStub.class)))
.satisfies(stub -> assertThat(((ManagedChannel) stub.getChannel()).isShutdown()).isTrue());
}

@Test
void export() {
List<T> telemetry = Collections.singletonList(generateFakeTelemetry());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,16 +158,21 @@ public TelemetryExporterBuilder<T> setChannel(Object channel) {

@Override
public TelemetryExporter<T> build() {
requireNonNull(channelBuilder, "channel");
Runnable shutdownCallback;
if (channelBuilder != null) {
try {
setSslContext(channelBuilder, tlsConfigHelper);
} catch (SSLException e) {
throw new IllegalStateException(e);
}

try {
setSslContext(channelBuilder, tlsConfigHelper);
} catch (SSLException e) {
throw new IllegalStateException(e);
ManagedChannel channel = channelBuilder.build();
delegate.setChannel(channel);
shutdownCallback = channel::shutdownNow;
} else {
shutdownCallback = () -> {};
}

ManagedChannel channel = channelBuilder.build();
delegate.setChannel(channel);
TelemetryExporter<T> delegateExporter = delegate.build();
return new TelemetryExporter<T>() {
@Override
Expand All @@ -182,7 +187,7 @@ public CompletableResultCode export(Collection<T> items) {

@Override
public CompletableResultCode shutdown() {
channel.shutdownNow();
shutdownCallback.run();
return delegateExporter.shutdown();
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.stub.MetadataUtils;
Expand All @@ -32,16 +33,19 @@
public final class UpstreamGrpcSender<T extends Marshaler> implements GrpcSender<T> {

private final MarshalerServiceStub<T, ?, ?> stub;
private final boolean shutdownChannel;
private final long timeoutNanos;
private final Supplier<Map<String, List<String>>> headersSupplier;

/** Creates a new {@link UpstreamGrpcSender}. */
public UpstreamGrpcSender(
MarshalerServiceStub<T, ?, ?> stub,
boolean shutdownChannel,
long timeoutNanos,
Supplier<Map<String, List<String>>> headersSupplier) {
this.timeoutNanos = timeoutNanos;
this.stub = stub;
this.shutdownChannel = shutdownChannel;
this.timeoutNanos = timeoutNanos;
this.headersSupplier = headersSupplier;
}

Expand Down Expand Up @@ -82,6 +86,10 @@ public void onFailure(Throwable t) {

@Override
public CompletableResultCode shutdown() {
if (shutdownChannel) {
ManagedChannel channel = (ManagedChannel) stub.getChannel();
channel.shutdownNow();
}
return CompletableResultCode.ofSuccess();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import io.grpc.Channel;
import io.grpc.Codec;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.opentelemetry.exporter.internal.grpc.GrpcSender;
import io.opentelemetry.exporter.internal.grpc.GrpcSenderProvider;
import io.opentelemetry.exporter.internal.grpc.MarshalerServiceStub;
Expand Down Expand Up @@ -41,6 +43,13 @@ public <T extends Marshaler> GrpcSender<T> createSender(
@Nullable RetryPolicy retryPolicy,
@Nullable SSLContext sslContext,
@Nullable X509TrustManager trustManager) {
boolean shutdownChannel = false;
if (managedChannel == null) {
// Shutdown the channel as part of the exporter shutdown sequence if
shutdownChannel = true;
managedChannel = minimalFallbackManagedChannel(endpoint);
}

String authorityOverride = null;
Map<String, List<String>> headers = headersSupplier.get();
if (headers != null) {
Expand All @@ -58,6 +67,27 @@ public <T extends Marshaler> GrpcSender<T> createSender(
.apply((Channel) managedChannel, authorityOverride)
.withCompression(codec.getMessageEncoding());

return new UpstreamGrpcSender<>(stub, timeoutNanos, headersSupplier);
return new UpstreamGrpcSender<>(stub, shutdownChannel, timeoutNanos, headersSupplier);
}

/**
* If {@link ManagedChannel} is not explicitly set, provide a minimally configured fallback
* channel to avoid failing initialization.
*
* <p>This is required to accommodate autoconfigure with {@code
* opentelemetry-exporter-sender-grpc-managed-channel} which will always fail to initialize
* without a fallback channel since there isn't an opportunity to explicitly set the channel.
*
* <p>This only incorporates the target address, port, and whether to use plain text. All
* additional settings are intentionally ignored and must be configured with an explicitly set
* {@link ManagedChannel}.
*/
private static ManagedChannel minimalFallbackManagedChannel(URI endpoint) {
ManagedChannelBuilder<?> channelBuilder =
ManagedChannelBuilder.forAddress(endpoint.getHost(), endpoint.getPort());
if (!endpoint.getScheme().equals("https")) {
channelBuilder.usePlaintext();
}
return channelBuilder.build();
}
}
Loading