Skip to content

Commit

Permalink
Merge branch 'main' into 1997-multiple-connection-support
Browse files Browse the repository at this point in the history
  • Loading branch information
YayBurritos authored Aug 13, 2024
2 parents fa5fc46 + cacbfbb commit 131b673
Show file tree
Hide file tree
Showing 27 changed files with 758 additions and 130 deletions.
9 changes: 9 additions & 0 deletions src/OpenTelemetry.Exporter.OneCollector/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,19 @@

## Unreleased

## 1.9.2

Released 2024-Aug-12

* Fixed `PlatformNotSupportedException`s being thrown during export when running
on mobile platforms which caused telemetry to be dropped silently.
([#1992](https://github.com/open-telemetry/opentelemetry-dotnet-contrib/pull/1992))

* Fixed a bug which caused remaining records in a batch to be dropped silently
once the max payload size for a transmission (default 4 KiB) has been
reached.
([#1999](https://github.com/open-telemetry/opentelemetry-dotnet-contrib/pull/1999))

## 1.9.1

Released 2024-Aug-01
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,13 @@ public void SinkDataWritten(string itemType, int numberOfRecords, string sinkDes
this.WriteEvent(3, itemType, numberOfRecords, sinkDescription);
}

[Event(4, Message = "Dropped {1} '{0}' item(s).", Level = EventLevel.Warning)]
public void DataDropped(string itemType, int numberOfRecords)
[Event(4, Message = "Dropped {1} '{0}' item(s). {2} item(s) dropped during serialization. {3} item(s) dropped due to transmission failure.", Level = EventLevel.Warning)]
#if NET
[UnconditionalSuppressMessage("Trimming", "IL2026", Justification = "Parameters passed to WriteEvent are all primitive values.")]
#endif
public void DataDropped(string itemType, int numberOfRecords, int numberOfRecordsDroppedDuringSerialization, int numberOfRecordsDroppedDuringTransmission)
{
this.WriteEvent(4, itemType, numberOfRecords);
this.WriteEvent(4, itemType, numberOfRecords, numberOfRecordsDroppedDuringSerialization, numberOfRecordsDroppedDuringTransmission);
}

[Event(5, Message = "Exception thrown by '{0}' transport: {1}", Level = EventLevel.Error)]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// Note: StyleCop doesn't understand the C#11 "required" modifier yet. Remove
// this in the future once StyleCop is updated. See:
// https://github.com/DotNetAnalyzers/StyleCopAnalyzers/issues/3527

#pragma warning disable SA1206 // Declaration keywords should follow order

namespace OpenTelemetry.Exporter.OneCollector;

internal readonly struct BatchSerializationResult
{
#if NET7_0_OR_GREATER
public required int NumberOfItemsSerialized { get; init; }

public required int NumberOfItemsDropped { get; init; }

public required long PayloadSizeInBytes { get; init; }
#else
public int NumberOfItemsSerialized { get; init; }

public int NumberOfItemsDropped { get; init; }

public long PayloadSizeInBytes { get; init; }
#endif

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// Note: StyleCop doesn't understand the C#11 "required" modifier yet. Remove
// this in the future once StyleCop is updated. See:
// https://github.com/DotNetAnalyzers/StyleCopAnalyzers/issues/3527

#if NETSTANDARD2_1_OR_GREATER || NET
using System.Diagnostics.CodeAnalysis;
#endif

namespace OpenTelemetry.Exporter.OneCollector;

internal ref struct BatchSerializationState<T>
where T : class
{
private Batch<T>.Enumerator enumerator;

public BatchSerializationState(in Batch<T> batch)
{
this.enumerator = batch.GetEnumerator();
}

public bool TryGetNextItem(
#if NETSTANDARD2_1_OR_GREATER || NET
[NotNullWhen(true)]
#endif
out T? item)
{
if (this.enumerator.MoveNext())
{
item = this.enumerator.Current;
return true;
}

item = null;
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,40 +34,59 @@ protected CommonSchemaJsonSerializer(

protected JsonEncodedText TenantTokenWithTenancySystemSymbol { get; }

public void SerializeBatchOfItemsToStream(Resource resource, in Batch<T> batch, Stream stream, int initialSizeOfPayloadInBytes, out BatchSerializationResult result)
public void SerializeBatchOfItemsToStream(
Resource resource,
ref BatchSerializationState<T> state,
Stream stream,
int initialSizeOfPayloadInBytes,
out BatchSerializationResult result)
{
Guard.ThrowIfNull(stream);

var numberOfSerializedItems = 0;
var numberOfDroppedItems = 0;
long payloadSizeInBytes = initialSizeOfPayloadInBytes;

var state = ThreadStorageHelper.GetCommonSchemaJsonSerializationState(this.itemType, stream);
var jsonSerializerState = ThreadStorageHelper.GetCommonSchemaJsonSerializationState(this.itemType, stream);

var writer = state.Writer;
var writer = jsonSerializerState.Writer;

foreach (var item in batch)
while (state.TryGetNextItem(out var item))
{
this.SerializeItemToJson(resource, item, state);
this.SerializeItemToJson(resource, item!, jsonSerializerState);

var currentItemSizeInBytes = writer.BytesCommitted + writer.BytesPending + 1;

payloadSizeInBytes += currentItemSizeInBytes;

writer.Flush();
writer.Reset();

stream.Write(CommonSchemaJsonSerializationHelper.NewLine, 0, 1);

if (currentItemSizeInBytes >= this.maxPayloadSizeInBytes)
{
// Note: If an individual item cannot fit inside the max size it
// is dropped.
numberOfDroppedItems++;
stream.SetLength(stream.Position - currentItemSizeInBytes);
continue;
}

payloadSizeInBytes += currentItemSizeInBytes;

if (++numberOfSerializedItems >= this.maxNumberOfItemsPerPayload)
{
break;
}

if (payloadSizeInBytes >= this.maxPayloadSizeInBytes)
{
// Note: If the last item written doesn't fit into the max size
// it is kept in the buffer and becomes the first item in the
// next transmission.
result = new BatchSerializationResult
{
NumberOfItemsSerialized = numberOfSerializedItems,
NumberOfItemsDropped = numberOfDroppedItems,
PayloadSizeInBytes = payloadSizeInBytes,
PayloadOverflowItemSizeInBytes = currentItemSizeInBytes,
};
Expand All @@ -78,6 +97,7 @@ public void SerializeBatchOfItemsToStream(Resource resource, in Batch<T> batch,
result = new BatchSerializationResult
{
NumberOfItemsSerialized = numberOfSerializedItems,
NumberOfItemsDropped = numberOfDroppedItems,
PayloadSizeInBytes = payloadSizeInBytes,
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ internal interface ISerializer<T>

void SerializeBatchOfItemsToStream(
Resource resource,
in Batch<T> batch,
ref BatchSerializationState<T> state,
Stream stream,
int initialSizeOfPayloadInBytes,
out BatchSerializationResult serializationResult);
out BatchSerializationResult result);
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#if NETFRAMEWORK || NETSTANDARD2_0
using System.Buffers;
#endif
using System.Diagnostics;
using OpenTelemetry.Internal;
using OpenTelemetry.Resources;
Expand Down Expand Up @@ -40,43 +37,74 @@ public WriteDirectlyToTransportSink(

public void Dispose()
{
this.TrySendRemainingData();

(this.serializer as IDisposable)?.Dispose();
(this.Transport as IDisposable)?.Dispose();
}

public int Write(Resource resource, in Batch<T> batch)
{
Span<byte> remainingData = default;

var totalNumberOfItemsSerialized = 0;
var totalNumberOfItemsDroppedDuringSerialization = 0;
var totalNumberOfItemsDroppedDueToTransmissionFailure = 0;
var buffer = this.buffer;
ArraySegment<byte> remainingDataFromPreviousTransmission = default;
var state = new BatchSerializationState<T>(in batch);

try
while (true)
{
this.serializer.SerializeBatchOfItemsToStream(resource, in batch, buffer, (int)buffer.Length, out var serializationResult);

var numberOfItemsSerialized = serializationResult.NumberOfItemsSerialized;
int numberOfItemsToSend;

if (numberOfItemsSerialized <= 0)
if (remainingDataFromPreviousTransmission.Count > 0)
{
return 0;
buffer.Position = 0;
buffer.Write(
remainingDataFromPreviousTransmission.Array!,
remainingDataFromPreviousTransmission.Offset,
remainingDataFromPreviousTransmission.Count);
buffer.SetLength(remainingDataFromPreviousTransmission.Count);
numberOfItemsToSend = 1;
remainingDataFromPreviousTransmission = default;
}
else
{
buffer.SetLength(0);
numberOfItemsToSend = 0;
}

this.serializer.SerializeBatchOfItemsToStream(
resource,
ref state,
buffer,
(int)buffer.Length,
out var result);

OneCollectorExporterEventSource.Log.WriteSinkDataWrittenEventIfEnabled(this.typeName, numberOfItemsSerialized, this.Description);
totalNumberOfItemsDroppedDuringSerialization += result.NumberOfItemsDropped;

var numberOfItemsToSend = numberOfItemsSerialized;
int numberOfItemsSerialized = result.NumberOfItemsSerialized;

if (serializationResult.PayloadOverflowItemSizeInBytes.HasValue)
if (numberOfItemsSerialized > 0)
{
OneCollectorExporterEventSource.Log.WriteSinkDataWrittenEventIfEnabled(this.typeName, numberOfItemsSerialized, this.Description);

totalNumberOfItemsSerialized += numberOfItemsSerialized;
numberOfItemsToSend += numberOfItemsSerialized;
}
else if (numberOfItemsToSend <= 0)
{
break;
}

if (result.PayloadOverflowItemSizeInBytes.HasValue)
{
var hasUnderlyingBuffer = buffer.TryGetBuffer(out var underlyingBuffer);
Debug.Assert(hasUnderlyingBuffer, "Could not access underlying buffer");

var endPositionOfValidMessages = (int)(serializationResult.PayloadSizeInBytes - serializationResult.PayloadOverflowItemSizeInBytes);
var endPositionOfValidMessages = (int)(result.PayloadSizeInBytes - result.PayloadOverflowItemSizeInBytes);

remainingData = underlyingBuffer.AsSpan().Slice(
remainingDataFromPreviousTransmission = new ArraySegment<byte>(
underlyingBuffer.Array!,
endPositionOfValidMessages,
(int)serializationResult.PayloadOverflowItemSizeInBytes.Value);
(int)result.PayloadOverflowItemSizeInBytes.Value);

buffer.SetLength(endPositionOfValidMessages);

Expand All @@ -94,69 +122,19 @@ public int Write(Resource resource, in Batch<T> batch)
NumberOfItems = numberOfItemsToSend,
}))
{
OneCollectorExporterEventSource.Log.DataDropped(this.typeName, numberOfItemsToSend);
totalNumberOfItemsDroppedDueToTransmissionFailure += numberOfItemsToSend;
}

return numberOfItemsSerialized;
}
finally
{
if (remainingData.Length > 0)
{
buffer.Position = 0;
#if NETFRAMEWORK || NETSTANDARD2_0
var rentedBuffer = ArrayPool<byte>.Shared.Rent(remainingData.Length);
try
{
remainingData.CopyTo(rentedBuffer);
buffer.Write(rentedBuffer, 0, remainingData.Length);
}
finally
{
ArrayPool<byte>.Shared.Return(rentedBuffer);
}
#else
buffer.Write(remainingData);
#endif
buffer.SetLength(remainingData.Length);
}
else
{
buffer.SetLength(0);
}
}
}

private void TrySendRemainingData()
{
var buffer = this.buffer;
if (buffer != null && buffer.Length > 0)
if (totalNumberOfItemsDroppedDuringSerialization > 0 || totalNumberOfItemsDroppedDueToTransmissionFailure > 0)
{
buffer.Position = 0;

try
{
if (!this.Transport.Send(
new TransportSendRequest
{
ItemType = this.typeName,
ItemSerializationFormat = this.serializer.SerializationFormat,
ItemStream = buffer,
NumberOfItems = 1,
}))
{
OneCollectorExporterEventSource.Log.DataDropped(this.typeName, 1);
}
}
catch (Exception ex)
{
OneCollectorExporterEventSource.Log.DataDropped(this.typeName, 1);
OneCollectorExporterEventSource.Log.WriteExportExceptionThrownEventIfEnabled(this.typeName, ex);
}
finally
{
buffer.SetLength(0);
}
OneCollectorExporterEventSource.Log.DataDropped(
this.typeName,
totalNumberOfItemsDroppedDuringSerialization + totalNumberOfItemsDroppedDueToTransmissionFailure,
totalNumberOfItemsDroppedDuringSerialization,
totalNumberOfItemsDroppedDueToTransmissionFailure);
}

return totalNumberOfItemsSerialized;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
this at the call site but there is a bug. This could possibly be cleaned up
in the future (hopefully .NET 9) see https://github.com/dotnet/runtime/issues/92509 -->
<NoWarn>$(NoWarn);SYSLIB1100;SYSLIB1101</NoWarn>
<PackageValidationBaselineVersion>1.9.1</PackageValidationBaselineVersion>
<PackageValidationBaselineVersion>1.9.2</PackageValidationBaselineVersion>
</PropertyGroup>

<PropertyGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@ OpenTelemetry.Instrumentation.AWS.AWSClientInstrumentationOptions.SuppressDownst
OpenTelemetry.Trace.TracerProviderBuilderExtensions
static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddAWSInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder! builder) -> OpenTelemetry.Trace.TracerProviderBuilder!
static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddAWSInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder! builder, System.Action<OpenTelemetry.Instrumentation.AWS.AWSClientInstrumentationOptions!>? configure) -> OpenTelemetry.Trace.TracerProviderBuilder!
OpenTelemetry.Metrics.MeterProviderBuilderExtensions
static OpenTelemetry.Metrics.MeterProviderBuilderExtensions.AddAWSInstrumentation(this OpenTelemetry.Metrics.MeterProviderBuilder! builder) -> OpenTelemetry.Metrics.MeterProviderBuilder!
2 changes: 2 additions & 0 deletions src/OpenTelemetry.Instrumentation.AWS/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## Unreleased

* Add AWS metrics instrumentation.
([#1980](https://github.com/open-telemetry/opentelemetry-dotnet-contrib/pull/1980))
* Added `rpc.system`, `rpc.service`, and `rpc.method` to activity tags based on
[semantic convention v1.26.0](https://github.com/open-telemetry/semantic-conventions/blob/v1.26.0/docs/cloud-providers/aws-sdk.md#common-attributes).
([#1865](https://github.com/open-telemetry/opentelemetry-dotnet-contrib/pull/1865))
Expand Down
Loading

0 comments on commit 131b673

Please sign in to comment.