Skip to content

Commit

Permalink
Add low allocation support to OTLP log exporters
Browse files Browse the repository at this point in the history
  • Loading branch information
jack-berg committed May 9, 2024
1 parent cb5f4ac commit 039bdfa
Show file tree
Hide file tree
Showing 16 changed files with 192 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,17 @@

import io.opentelemetry.exporter.internal.http.HttpExporter;
import io.opentelemetry.exporter.internal.http.HttpExporterBuilder;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.internal.otlp.logs.LogsRequestMarshaler;
import io.opentelemetry.exporter.internal.otlp.logs.LowAllocationLogsRequestMarshaler;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Deque;
import java.util.StringJoiner;
import javax.annotation.concurrent.ThreadSafe;

/**
Expand All @@ -22,14 +28,18 @@
@ThreadSafe
public final class OtlpHttpLogRecordExporter implements LogRecordExporter {

private final HttpExporterBuilder<LogsRequestMarshaler> builder;
private final HttpExporter<LogsRequestMarshaler> delegate;
private final Deque<LowAllocationLogsRequestMarshaler> marshalerPool = new ArrayDeque<>();
private final HttpExporterBuilder<Marshaler> builder;
private final HttpExporter<Marshaler> delegate;
private final MemoryMode memoryMode;

OtlpHttpLogRecordExporter(
HttpExporterBuilder<LogsRequestMarshaler> builder,
HttpExporter<LogsRequestMarshaler> delegate) {
HttpExporterBuilder<Marshaler> builder,
HttpExporter<Marshaler> delegate,
MemoryMode memoryMode) {
this.builder = builder;
this.delegate = delegate;
this.memoryMode = memoryMode;
}

/**
Expand Down Expand Up @@ -61,7 +71,7 @@ public static OtlpHttpLogRecordExporterBuilder builder() {
* @since 1.29.0
*/
public OtlpHttpLogRecordExporterBuilder toBuilder() {
return new OtlpHttpLogRecordExporterBuilder(builder.copy());
return new OtlpHttpLogRecordExporterBuilder(builder.copy(), memoryMode);
}

/**
Expand All @@ -72,8 +82,24 @@ public OtlpHttpLogRecordExporterBuilder toBuilder() {
*/
@Override
public CompletableResultCode export(Collection<LogRecordData> logs) {
LogsRequestMarshaler exportRequest = LogsRequestMarshaler.create(logs);
return delegate.export(exportRequest, logs.size());
if (memoryMode == MemoryMode.REUSABLE_DATA) {
LowAllocationLogsRequestMarshaler marshaler = marshalerPool.poll();
if (marshaler == null) {
marshaler = new LowAllocationLogsRequestMarshaler();
}
LowAllocationLogsRequestMarshaler exportMarshaler = marshaler;
exportMarshaler.initialize(logs);
return delegate
.export(exportMarshaler, logs.size())
.whenComplete(
() -> {
exportMarshaler.reset();
marshalerPool.add(exportMarshaler);
});
}
// MemoryMode == MemoryMode.IMMUTABLE_DATA
LogsRequestMarshaler request = LogsRequestMarshaler.create(logs);
return delegate.export(request, logs.size());
}

@Override
Expand All @@ -89,6 +115,9 @@ public CompletableResultCode shutdown() {

@Override
public String toString() {
return "OtlpHttpLogRecordExporter{" + builder.toString(false) + "}";
StringJoiner joiner = new StringJoiner(", ", "OtlpHttpLogRecordExporter{", "}");
joiner.add(builder.toString(false));
joiner.add("memoryMode=" + memoryMode);
return joiner.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
import io.opentelemetry.exporter.internal.compression.CompressorProvider;
import io.opentelemetry.exporter.internal.compression.CompressorUtil;
import io.opentelemetry.exporter.internal.http.HttpExporterBuilder;
import io.opentelemetry.exporter.internal.otlp.logs.LogsRequestMarshaler;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.otlp.internal.OtlpUserAgent;
import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.common.export.ProxyOptions;
import io.opentelemetry.sdk.common.export.RetryPolicy;
import java.time.Duration;
Expand All @@ -33,16 +34,19 @@
public final class OtlpHttpLogRecordExporterBuilder {

private static final String DEFAULT_ENDPOINT = "http://localhost:4318/v1/logs";
private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.IMMUTABLE_DATA;

private final HttpExporterBuilder<LogsRequestMarshaler> delegate;
private final HttpExporterBuilder<Marshaler> delegate;
private MemoryMode memoryMode;

OtlpHttpLogRecordExporterBuilder(HttpExporterBuilder<LogsRequestMarshaler> delegate) {
OtlpHttpLogRecordExporterBuilder(HttpExporterBuilder<Marshaler> delegate, MemoryMode memoryMode) {
this.delegate = delegate;
this.memoryMode = memoryMode;
OtlpUserAgent.addUserAgentHeader(delegate::addConstantHeaders);
}

OtlpHttpLogRecordExporterBuilder() {
this(new HttpExporterBuilder<>("otlp", "log", DEFAULT_ENDPOINT));
this(new HttpExporterBuilder<>("otlp", "log", DEFAULT_ENDPOINT), DEFAULT_MEMORY_MODE);
}

/**
Expand Down Expand Up @@ -206,12 +210,19 @@ public OtlpHttpLogRecordExporterBuilder setMeterProvider(
return this;
}

/** Set the {@link MemoryMode}. */
OtlpHttpLogRecordExporterBuilder setMemoryMode(MemoryMode memoryMode) {
requireNonNull(memoryMode, "memoryMode");
this.memoryMode = memoryMode;
return this;
}

/**
* Constructs a new instance of the exporter based on the builder's values.
*
* @return a new exporter's instance
*/
public OtlpHttpLogRecordExporter build() {
return new OtlpHttpLogRecordExporter(delegate, delegate.build());
return new OtlpHttpLogRecordExporter(delegate, delegate.build(), memoryMode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@

import static io.opentelemetry.sdk.metrics.Aggregation.explicitBucketHistogram;

import io.opentelemetry.exporter.otlp.http.logs.OtlpHttpLogRecordExporterBuilder;
import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporterBuilder;
import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporterBuilder;
import io.opentelemetry.exporter.otlp.logs.OtlpGrpcLogRecordExporterBuilder;
import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder;
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporterBuilder;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
Expand Down Expand Up @@ -257,6 +259,26 @@ public static void setMemoryModeOnOtlpExporterBuilder(Object builder, MemoryMode
OtlpHttpSpanExporterBuilder.class.getDeclaredMethod("setMemoryMode", MemoryMode.class);
method.setAccessible(true);
method.invoke(builder, memoryMode);
} else if (builder instanceof OtlpGrpcLogRecordExporterBuilder) {
// Calling getDeclaredMethod causes all private methods to be read, which causes a
// ClassNotFoundException when running with the OkHttHttpProvider as the private
// setManagedChanel(io.grpc.ManagedChannel) is reached and io.grpc.ManagedChannel is not on
// the classpath. io.opentelemetry.exporter.otlp.logs.OtlpGrpcLogUtil provides a layer
// of indirection which avoids scanning the OtlpGrpcLogRecordExporterBuilder private
// methods.
Class<?> otlpGrpcMetricUtil =
Class.forName("io.opentelemetry.exporter.otlp.logs.OtlpGrpcLogUtil");
Method method =
otlpGrpcMetricUtil.getDeclaredMethod(
"setMemoryMode", OtlpGrpcLogRecordExporterBuilder.class, MemoryMode.class);
method.setAccessible(true);
method.invoke(null, builder, memoryMode);
} else if (builder instanceof OtlpHttpLogRecordExporterBuilder) {
Method method =
OtlpHttpLogRecordExporterBuilder.class.getDeclaredMethod(
"setMemoryMode", MemoryMode.class);
method.setAccessible(true);
method.invoke(builder, memoryMode);
} else {
throw new IllegalArgumentException(
"Cannot set memory mode. Unrecognized OTLP exporter builder");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static io.opentelemetry.exporter.otlp.internal.OtlpConfigUtil.PROTOCOL_HTTP_PROTOBUF;

import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.exporter.internal.ExporterBuilderUtil;
import io.opentelemetry.exporter.otlp.http.logs.OtlpHttpLogRecordExporter;
import io.opentelemetry.exporter.otlp.http.logs.OtlpHttpLogRecordExporterBuilder;
import io.opentelemetry.exporter.otlp.logs.OtlpGrpcLogRecordExporter;
Expand Down Expand Up @@ -53,6 +54,9 @@ public LogRecordExporter createExporter(ConfigProperties config) {
builder::setClientTls,
builder::setRetryPolicy);
builder.setMeterProvider(meterProviderRef::get);
ExporterBuilderUtil.configureExporterMemoryMode(
config,
memoryMode -> OtlpConfigUtil.setMemoryModeOnOtlpExporterBuilder(builder, memoryMode));

return builder.build();
} else if (protocol.equals(PROTOCOL_GRPC)) {
Expand All @@ -69,6 +73,9 @@ public LogRecordExporter createExporter(ConfigProperties config) {
builder::setClientTls,
builder::setRetryPolicy);
builder.setMeterProvider(meterProviderRef::get);
ExporterBuilderUtil.configureExporterMemoryMode(
config,
memoryMode -> OtlpConfigUtil.setMemoryModeOnOtlpExporterBuilder(builder, memoryMode));

return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import io.grpc.stub.ClientCalls;
import io.opentelemetry.exporter.internal.grpc.MarshalerInputStream;
import io.opentelemetry.exporter.internal.grpc.MarshalerServiceStub;
import io.opentelemetry.exporter.internal.otlp.logs.LogsRequestMarshaler;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import java.io.InputStream;
import javax.annotation.Nullable;

Expand All @@ -23,15 +23,15 @@ final class MarshalerLogsServiceGrpc {

private static final String SERVICE_NAME = "opentelemetry.proto.collector.logs.v1.LogsService";

private static final MethodDescriptor.Marshaller<LogsRequestMarshaler> REQUEST_MARSHALLER =
new MethodDescriptor.Marshaller<LogsRequestMarshaler>() {
private static final MethodDescriptor.Marshaller<Marshaler> REQUEST_MARSHALLER =
new MethodDescriptor.Marshaller<Marshaler>() {
@Override
public InputStream stream(LogsRequestMarshaler value) {
public InputStream stream(Marshaler value) {
return new MarshalerInputStream(value);
}

@Override
public LogsRequestMarshaler parse(InputStream stream) {
public Marshaler parse(InputStream stream) {
throw new UnsupportedOperationException("Only for serializing");
}
};
Expand All @@ -49,14 +49,13 @@ public ExportLogsServiceResponse parse(InputStream stream) {
}
};

private static final MethodDescriptor<LogsRequestMarshaler, ExportLogsServiceResponse>
getExportMethod =
MethodDescriptor.<LogsRequestMarshaler, ExportLogsServiceResponse>newBuilder()
.setType(MethodDescriptor.MethodType.UNARY)
.setFullMethodName(generateFullMethodName(SERVICE_NAME, "Export"))
.setRequestMarshaller(REQUEST_MARSHALLER)
.setResponseMarshaller(RESPONSE_MARSHALER)
.build();
private static final MethodDescriptor<Marshaler, ExportLogsServiceResponse> getExportMethod =
MethodDescriptor.<Marshaler, ExportLogsServiceResponse>newBuilder()
.setType(MethodDescriptor.MethodType.UNARY)
.setFullMethodName(generateFullMethodName(SERVICE_NAME, "Export"))
.setRequestMarshaller(REQUEST_MARSHALLER)
.setResponseMarshaller(RESPONSE_MARSHALER)
.build();

static LogsServiceFutureStub newFutureStub(Channel channel, @Nullable String authorityOverride) {
return LogsServiceFutureStub.newStub(
Expand All @@ -65,8 +64,7 @@ static LogsServiceFutureStub newFutureStub(Channel channel, @Nullable String aut
}

static final class LogsServiceFutureStub
extends MarshalerServiceStub<
LogsRequestMarshaler, ExportLogsServiceResponse, LogsServiceFutureStub> {
extends MarshalerServiceStub<Marshaler, ExportLogsServiceResponse, LogsServiceFutureStub> {
private LogsServiceFutureStub(Channel channel, CallOptions callOptions) {
super(channel, callOptions);
}
Expand All @@ -78,7 +76,7 @@ protected MarshalerLogsServiceGrpc.LogsServiceFutureStub build(
}

@Override
public ListenableFuture<ExportLogsServiceResponse> export(LogsRequestMarshaler request) {
public ListenableFuture<ExportLogsServiceResponse> export(Marshaler request) {
return ClientCalls.futureUnaryCall(
getChannel().newCall(getExportMethod, getCallOptions()), request);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,17 @@

import io.opentelemetry.exporter.internal.grpc.GrpcExporter;
import io.opentelemetry.exporter.internal.grpc.GrpcExporterBuilder;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.internal.otlp.logs.LogsRequestMarshaler;
import io.opentelemetry.exporter.internal.otlp.logs.LowAllocationLogsRequestMarshaler;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Deque;
import java.util.StringJoiner;
import javax.annotation.concurrent.ThreadSafe;

/**
Expand All @@ -22,8 +28,10 @@
@ThreadSafe
public final class OtlpGrpcLogRecordExporter implements LogRecordExporter {

private final GrpcExporterBuilder<LogsRequestMarshaler> builder;
private final GrpcExporter<LogsRequestMarshaler> delegate;
private final Deque<LowAllocationLogsRequestMarshaler> marshalerPool = new ArrayDeque<>();
private final GrpcExporterBuilder<Marshaler> builder;
private final GrpcExporter<Marshaler> delegate;
private final MemoryMode memoryMode;

/**
* Returns a new {@link OtlpGrpcLogRecordExporter} using the default values.
Expand All @@ -47,10 +55,12 @@ public static OtlpGrpcLogRecordExporterBuilder builder() {
}

OtlpGrpcLogRecordExporter(
GrpcExporterBuilder<LogsRequestMarshaler> builder,
GrpcExporter<LogsRequestMarshaler> delegate) {
GrpcExporterBuilder<Marshaler> builder,
GrpcExporter<Marshaler> delegate,
MemoryMode memoryMode) {
this.builder = builder;
this.delegate = delegate;
this.memoryMode = memoryMode;
}

/**
Expand All @@ -61,7 +71,7 @@ public static OtlpGrpcLogRecordExporterBuilder builder() {
* @since 1.29.0
*/
public OtlpGrpcLogRecordExporterBuilder toBuilder() {
return new OtlpGrpcLogRecordExporterBuilder(builder.copy());
return new OtlpGrpcLogRecordExporterBuilder(builder.copy(), memoryMode);
}

/**
Expand All @@ -72,6 +82,22 @@ public OtlpGrpcLogRecordExporterBuilder toBuilder() {
*/
@Override
public CompletableResultCode export(Collection<LogRecordData> logs) {
if (memoryMode == MemoryMode.REUSABLE_DATA) {
LowAllocationLogsRequestMarshaler marshaler = marshalerPool.poll();
if (marshaler == null) {
marshaler = new LowAllocationLogsRequestMarshaler();
}
LowAllocationLogsRequestMarshaler exportMarshaler = marshaler;
exportMarshaler.initialize(logs);
return delegate
.export(exportMarshaler, logs.size())
.whenComplete(
() -> {
exportMarshaler.reset();
marshalerPool.add(exportMarshaler);
});
}
// MemoryMode == MemoryMode.IMMUTABLE_DATA
LogsRequestMarshaler request = LogsRequestMarshaler.create(logs);
return delegate.export(request, logs.size());
}
Expand All @@ -92,6 +118,9 @@ public CompletableResultCode shutdown() {

@Override
public String toString() {
return "OtlpGrpcLogRecordExporter{" + builder.toString(false) + "}";
StringJoiner joiner = new StringJoiner(", ", "OtlpGrpcLogRecordExporter{", "}");
joiner.add(builder.toString(false));
joiner.add("memoryMode=" + memoryMode);
return joiner.toString();
}
}
Loading

0 comments on commit 039bdfa

Please sign in to comment.