-
Notifications
You must be signed in to change notification settings - Fork 785
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Do not Merge] [Otlp Exporter] Remove Google.Protobuf and Grpc.Net.Client dependency #5731
Changes from all commits
ed56da2
14414a8
59871e5
4e3f137
4025fe2
cf28621
034cc77
8e7b9dd
96efcea
e0ff831
76f7257
2f64916
64218f8
2f7a95a
c889b49
01b4686
c410d90
4133155
4e29ef0
c7784b7
99e3721
5364244
0794fa8
59251ac
f19e1e5
5b3f8a0
7fa55b6
2b753a2
940b19d
f1e9aab
b484e75
04a90ec
2b4756c
62f30f0
3b10436
5e78598
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
#if NETFRAMEWORK | ||
using System.Net.Http; | ||
#endif | ||
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient; | ||
|
||
namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Custom.ExportClient; | ||
|
||
/// <summary>Export client interface.</summary> | ||
internal interface IExportClient | ||
{ | ||
/// <summary> | ||
/// Method for sending export request to the server. | ||
/// </summary> | ||
/// <param name="request">The request to send to the server.</param> | ||
/// <param name="contentLength">length of the content.</param> | ||
/// <param name="deadlineUtc">The deadline time in utc for export request to finish.</param> | ||
/// <param name="cancellationToken">An optional token for canceling the call.</param> | ||
/// <returns><see cref="ExportClientResponse"/>.</returns> | ||
ExportClientResponse SendExportRequest(byte[] request, int contentLength, DateTime deadlineUtc, CancellationToken cancellationToken = default); | ||
|
||
HttpRequestMessage CreateHttpRequest(byte[] request, int contentLength); | ||
|
||
/// <summary> | ||
/// Method for shutting down the export client. | ||
/// </summary> | ||
/// <param name="timeoutMilliseconds"> | ||
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to | ||
/// wait indefinitely. | ||
/// </param> | ||
/// <returns> | ||
/// Returns <c>true</c> if shutdown succeeded; otherwise, <c>false</c>. | ||
/// </returns> | ||
bool Shutdown(int timeoutMilliseconds); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,145 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
#if NETFRAMEWORK | ||
using System.Net.Http; | ||
#endif | ||
using System.Buffers.Binary; | ||
using System.Net.Http.Headers; | ||
using Grpc.Core; | ||
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient; | ||
using OpenTelemetry.Internal; | ||
|
||
namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Custom.ExportClient; | ||
|
||
/// <summary>Base class for sending OTLP export request over Grpc.</summary> | ||
internal class OtlpGrpcExportClient : IExportClient | ||
{ | ||
internal const string ErrorStartingCallMessage = "Error starting gRPC call."; | ||
private static readonly MediaTypeHeaderValue MediaHeaderValue = new MediaTypeHeaderValue("application/grpc"); | ||
private static readonly Version Http2RequestVersion = new Version(2, 0); | ||
private static readonly ExportClientHttpResponse SuccessExportResponse = new ExportClientHttpResponse(success: true, deadlineUtc: default, response: null, exception: null); | ||
|
||
internal OtlpGrpcExportClient(OtlpExporterOptions options, HttpClient httpClient, string signalPath) | ||
{ | ||
Guard.ThrowIfNull(options); | ||
Guard.ThrowIfNull(httpClient); | ||
Guard.ThrowIfNull(signalPath); | ||
Guard.ThrowIfInvalidTimeout(options.TimeoutMilliseconds); | ||
|
||
Uri exporterEndpoint = options.Endpoint.AppendPathIfNotPresent(signalPath); | ||
this.Endpoint = new UriBuilder(exporterEndpoint).Uri; | ||
this.Headers = options.GetHeaders<Dictionary<string, string>>((d, k, v) => d.Add(k, v)); | ||
this.HttpClient = httpClient; | ||
} | ||
|
||
internal HttpClient HttpClient { get; } | ||
|
||
internal Uri Endpoint { get; set; } | ||
|
||
internal IReadOnlyDictionary<string, string> Headers { get; } | ||
|
||
public ExportClientResponse SendExportRequest(byte[] exportRequest, int contentLength, DateTime deadlineUtc, CancellationToken cancellationToken = default) | ||
{ | ||
try | ||
{ | ||
using var httpRequest = this.CreateHttpRequest(exportRequest, contentLength); | ||
|
||
using var httpResponse = this.SendHttpRequest(httpRequest, cancellationToken); | ||
|
||
GrpcProtocolHelper.ProcessHttpResponse(httpResponse, out var rpcException); | ||
|
||
if (rpcException != null) | ||
{ | ||
OpenTelemetryProtocolExporterEventSource.Log.FailedToReachCollector(this.Endpoint, rpcException); | ||
|
||
return new ExportClientGrpcResponse(success: false, deadlineUtc: deadlineUtc, exception: rpcException); | ||
} | ||
|
||
// We do not need to return back response and deadline for successful response so using cached value. | ||
return SuccessExportResponse; | ||
} | ||
catch (Exception ex) | ||
{ | ||
// https://learn.microsoft.com/en-us/dotnet/api/system.net.http.httpclient.sendasync?view=net-8.0#remarks | ||
RpcException? rpcException = null; | ||
if (ex is HttpRequestException) | ||
{ | ||
var status = new Status(StatusCode.Unavailable, ErrorStartingCallMessage + " " + ex.Message, ex); | ||
|
||
rpcException = new RpcException(status); | ||
|
||
OpenTelemetryProtocolExporterEventSource.Log.FailedToReachCollector(this.Endpoint, rpcException); | ||
|
||
return new ExportClientGrpcResponse(success: false, deadlineUtc: deadlineUtc, exception: rpcException); | ||
} | ||
else if (ex is TaskCanceledException) | ||
{ | ||
// grpc-dotnet sets the timer for tracking deadline. | ||
// https://github.com/grpc/grpc-dotnet/blob/1416340c85bb5925b5fed0c101e7e6de71e367e0/src/Grpc.Net.Client/Internal/GrpcCall.cs#L799-L803 | ||
// Utilizing the inner exception here to determine deadline exceeded related failures. | ||
// https://learn.microsoft.com/en-us/dotnet/api/system.net.http.httpclient.sendasync?view=net-8.0#remarks | ||
if (ex.InnerException is TimeoutException) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using HttpClient.Timeout for deadline is probably ok. I assume you have global deadline duration that is applied to all requests so timetout works. For Grpc.Net.Client, the deadline can differ per call. |
||
{ | ||
var status = new Status(StatusCode.DeadlineExceeded, string.Empty); | ||
|
||
// TODO: pre-allocate | ||
rpcException = new RpcException(status); | ||
|
||
OpenTelemetryProtocolExporterEventSource.Log.FailedToReachCollector(this.Endpoint, rpcException); | ||
|
||
return new ExportClientGrpcResponse(success: false, deadlineUtc: deadlineUtc, exception: rpcException); | ||
} | ||
} | ||
|
||
return new ExportClientGrpcResponse(success: false, deadlineUtc: deadlineUtc, exception: ex); | ||
|
||
// TODO: Handle additional exception types (OperationCancelledException) | ||
} | ||
} | ||
|
||
public bool Shutdown(int timeoutMilliseconds) | ||
{ | ||
this.HttpClient.CancelPendingRequests(); | ||
return true; | ||
} | ||
|
||
public HttpRequestMessage CreateHttpRequest(byte[] exportRequest, int contentLength) | ||
{ | ||
var request = new HttpRequestMessage(HttpMethod.Post, this.Endpoint); | ||
request.Version = Http2RequestVersion; | ||
|
||
#if NET6_0_OR_GREATER | ||
request.VersionPolicy = HttpVersionPolicy.RequestVersionExact; | ||
#endif | ||
|
||
foreach (var header in this.Headers) | ||
{ | ||
request.Headers.Add(header.Key, header.Value); | ||
} | ||
|
||
// Grpc payload consists of 3 parts | ||
// byte 0 - Specifying if the payload is compressed. | ||
// 1-4 byte - Specifies the length of payload in big endian format. | ||
// 5 and above - Protobuf serialized data. | ||
Span<byte> data = new Span<byte>(exportRequest, 1, 4); | ||
var dataLength = contentLength - 5; | ||
BinaryPrimitives.WriteUInt32BigEndian(data, (uint)dataLength); | ||
|
||
// TODO: Support compression. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you ever send compressed gRPC request data today? If you don't, then it's not needed. You probably need to handle compressed responses. There might be an OTLP gRPC server that sends a compressed response for some reason. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My understanding is users can specify |
||
|
||
request.Content = new ByteArrayContent(exportRequest, 0, contentLength); | ||
request.Content.Headers.ContentType = MediaHeaderValue; | ||
|
||
return request; | ||
} | ||
|
||
protected HttpResponseMessage SendHttpRequest(HttpRequestMessage request, CancellationToken cancellationToken) | ||
{ | ||
// grpc-dotnet calls specifies the HttpCompletion.ResponseHeadersRead. | ||
// However, it is useful specifically for streaming calls? | ||
// https://github.com/grpc/grpc-dotnet/blob/1416340c85bb5925b5fed0c101e7e6de71e367e0/src/Grpc.Net.Client/Internal/GrpcCall.cs#L485-L486 | ||
return this.HttpClient.SendAsync(request, cancellationToken).GetAwaiter().GetResult(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is blocking so you need to worry about thread pool starvation. Are OTLP gRPC calls made on a dedicated thread? If so, that means a maximum of one thread is blocked and this isn't a bad. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes. (but there will be 3 such threads created by traces,logs,metrics) |
||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
#if NETFRAMEWORK | ||
using System.Net.Http; | ||
#endif | ||
using System.Net.Http.Headers; | ||
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient; | ||
using OpenTelemetry.Internal; | ||
|
||
namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Custom.ExportClient; | ||
|
||
/// <summary>Base class for sending OTLP export request over HTTP.</summary> | ||
internal class OtlpHttpExportClient : IExportClient | ||
{ | ||
private static readonly ExportClientHttpResponse SuccessExportResponse = new ExportClientHttpResponse(success: true, deadlineUtc: default, response: null, exception: null); | ||
private static readonly MediaTypeHeaderValue MediaHeaderValue = new MediaTypeHeaderValue("application/x-protobuf"); | ||
|
||
internal OtlpHttpExportClient(OtlpExporterOptions options, HttpClient httpClient, string signalPath) | ||
{ | ||
Guard.ThrowIfNull(options); | ||
Guard.ThrowIfNull(httpClient); | ||
Guard.ThrowIfNull(signalPath); | ||
Guard.ThrowIfInvalidTimeout(options.TimeoutMilliseconds); | ||
|
||
Uri exporterEndpoint = (options.AppendSignalPathToEndpoint || options.Protocol == OtlpExportProtocol.Grpc) | ||
? options.Endpoint.AppendPathIfNotPresent(signalPath) | ||
: options.Endpoint; | ||
this.Endpoint = new UriBuilder(exporterEndpoint).Uri; | ||
this.Headers = options.GetHeaders<Dictionary<string, string>>((d, k, v) => d.Add(k, v)); | ||
this.HttpClient = httpClient; | ||
} | ||
|
||
internal HttpClient HttpClient { get; } | ||
|
||
internal Uri Endpoint { get; set; } | ||
|
||
internal IReadOnlyDictionary<string, string> Headers { get; } | ||
|
||
public ExportClientResponse SendExportRequest(byte[] exportRequest, int contentLength, DateTime deadlineUtc, CancellationToken cancellationToken = default) | ||
{ | ||
try | ||
{ | ||
using var httpRequest = this.CreateHttpRequest(exportRequest, contentLength); | ||
|
||
using var httpResponse = this.SendHttpRequest(httpRequest, cancellationToken); | ||
|
||
try | ||
{ | ||
httpResponse.EnsureSuccessStatusCode(); | ||
} | ||
catch (HttpRequestException ex) | ||
{ | ||
return new ExportClientHttpResponse(success: false, deadlineUtc: deadlineUtc, response: httpResponse, ex); | ||
} | ||
|
||
// We do not need to return back response and deadline for successful response so using cached value. | ||
return SuccessExportResponse; | ||
} | ||
catch (HttpRequestException ex) | ||
{ | ||
OpenTelemetryProtocolExporterEventSource.Log.FailedToReachCollector(this.Endpoint, ex); | ||
|
||
return new ExportClientHttpResponse(success: false, deadlineUtc: deadlineUtc, response: null, exception: ex); | ||
} | ||
} | ||
|
||
public bool Shutdown(int timeoutMilliseconds) | ||
{ | ||
this.HttpClient.CancelPendingRequests(); | ||
return true; | ||
} | ||
|
||
public HttpRequestMessage CreateHttpRequest(byte[] exportRequest, int contentLength) | ||
{ | ||
var request = new HttpRequestMessage(HttpMethod.Post, this.Endpoint); | ||
foreach (var header in this.Headers) | ||
{ | ||
request.Headers.Add(header.Key, header.Value); | ||
} | ||
|
||
request.Content = new ByteArrayContent(exportRequest, 0, contentLength); | ||
request.Content.Headers.ContentType = MediaHeaderValue; | ||
|
||
return request; | ||
} | ||
|
||
protected HttpResponseMessage SendHttpRequest(HttpRequestMessage request, CancellationToken cancellationToken) | ||
{ | ||
#if NET6_0_OR_GREATER | ||
return this.HttpClient.Send(request, cancellationToken); | ||
#else | ||
return this.HttpClient.SendAsync(request, cancellationToken).GetAwaiter().GetResult(); | ||
#endif | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
// Includes work from: | ||
/* | ||
* Copyright 2019 The gRPC Authors | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
#if NETFRAMEWORK | ||
using System.Net.Http; | ||
#endif | ||
using System.Net.Http.Headers; | ||
|
||
namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Custom.ExportClient; | ||
|
||
internal static class TrailingHeadersHelpers | ||
{ | ||
public static readonly string ResponseTrailersKey = "__ResponseTrailers"; | ||
|
||
public static HttpHeaders TrailingHeaders(this HttpResponseMessage responseMessage) | ||
{ | ||
#if !NETSTANDARD2_0 && !NET462 | ||
return responseMessage.TrailingHeaders; | ||
#else | ||
if (responseMessage.RequestMessage.Properties.TryGetValue(ResponseTrailersKey, out var headers) && | ||
headers is HttpHeaders httpHeaders) | ||
{ | ||
return httpHeaders; | ||
} | ||
|
||
// App targets .NET Standard 2.0 and the handler hasn't set trailers | ||
// in RequestMessage.Properties with known key. Return empty collection. | ||
// Client call will likely fail because it is unable to get a grpc-status. | ||
return ResponseTrailers.Empty; | ||
#endif | ||
} | ||
Comment on lines
+30
to
+48
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you plan to support .NET Framework with WinHttpHandler? If not, then this can be simplified to always return TrailingHeaders. |
||
|
||
#if NETSTANDARD2_0 || NET462 | ||
public static void EnsureTrailingHeaders(this HttpResponseMessage responseMessage) | ||
{ | ||
if (!responseMessage.RequestMessage.Properties.ContainsKey(ResponseTrailersKey)) | ||
{ | ||
responseMessage.RequestMessage.Properties[ResponseTrailersKey] = new ResponseTrailers(); | ||
} | ||
} | ||
|
||
private class ResponseTrailers : HttpHeaders | ||
{ | ||
public static readonly ResponseTrailers Empty = new ResponseTrailers(); | ||
} | ||
#endif | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This whole catch block is a bit of a mess. Why not separate catch statements?