-
Notifications
You must be signed in to change notification settings - Fork 305
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'main' into 1997-multiple-connection-support
- Loading branch information
Showing
6 changed files
with
250 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
using System.Diagnostics; | ||
|
||
namespace OpenTelemetry.Extensions.Internal; | ||
|
||
internal sealed class RateLimiter | ||
{ | ||
private readonly Stopwatch stopwatch = Stopwatch.StartNew(); | ||
private readonly double creditsPerTick; | ||
private readonly long maxBalance; // max balance in ticks | ||
private long currentBalance; // last op ticks less remaining balance, using long directly with Interlocked for thread safety | ||
|
||
public RateLimiter(double creditsPerSecond, double maxBalance) | ||
{ | ||
this.creditsPerTick = creditsPerSecond / Stopwatch.Frequency; | ||
this.maxBalance = (long)(maxBalance / this.creditsPerTick); | ||
this.currentBalance = this.stopwatch.ElapsedTicks - this.maxBalance; | ||
} | ||
|
||
public bool TrySpend(double itemCost) | ||
{ | ||
long cost = (long)(itemCost / this.creditsPerTick); | ||
long currentTicks; | ||
long currentBalanceTicks; | ||
long availableBalanceAfterWithdrawal; | ||
do | ||
{ | ||
currentBalanceTicks = Interlocked.Read(ref this.currentBalance); | ||
currentTicks = this.stopwatch.ElapsedTicks; | ||
long currentAvailableBalance = currentTicks - currentBalanceTicks; | ||
if (currentAvailableBalance > this.maxBalance) | ||
{ | ||
currentAvailableBalance = this.maxBalance; | ||
} | ||
|
||
availableBalanceAfterWithdrawal = currentAvailableBalance - cost; | ||
if (availableBalanceAfterWithdrawal < 0) | ||
{ | ||
return false; | ||
} | ||
} | ||
|
||
// CompareExchange will fail if currentBalance has changed since the last read, implying another thread has updated the balance | ||
while (Interlocked.CompareExchange(ref this.currentBalance, currentTicks - availableBalanceAfterWithdrawal, currentBalanceTicks) != currentBalanceTicks); | ||
return true; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
// Based on the jaeger remote sampler for Java from https://github.com/open-telemetry/opentelemetry-java/blob/main/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/RateLimitingSampler.java | ||
|
||
using System.Globalization; | ||
using OpenTelemetry.Extensions.Internal; | ||
using OpenTelemetry.Trace; | ||
|
||
namespace OpenTelemetry; | ||
|
||
/// <summary> | ||
/// Rate limiting sampler that can be used to sample traces at a constant rate. | ||
/// </summary> | ||
public class RateLimitingSampler : Sampler | ||
{ | ||
private const string SAMPLERTYPE = "ratelimiting"; | ||
private const string SAMPLERTYPEKEY = "sampler.type"; | ||
private const string SAMPLERPARAMKEY = "sampler.param"; | ||
|
||
private readonly RateLimiter rateLimiter; | ||
private readonly SamplingResult onSamplingResult; | ||
private readonly SamplingResult offSamplingResult; | ||
|
||
/// <summary> | ||
/// Initializes a new instance of the <see cref="RateLimitingSampler"/> class. | ||
/// </summary> | ||
/// <param name="maxTracesPerSecond">The maximum number of traces that will be emitted each second.</param> | ||
public RateLimitingSampler(int maxTracesPerSecond) | ||
{ | ||
double maxBalance = maxTracesPerSecond < 1.0 ? 1.0 : maxTracesPerSecond; | ||
this.rateLimiter = new RateLimiter(maxTracesPerSecond, maxBalance); | ||
var attributes = new Dictionary<string, object>() | ||
{ | ||
{ SAMPLERTYPEKEY, SAMPLERTYPE }, | ||
{ SAMPLERPARAMKEY, (double)maxTracesPerSecond }, | ||
}; | ||
this.onSamplingResult = new SamplingResult(SamplingDecision.RecordAndSample, attributes); | ||
this.offSamplingResult = new SamplingResult(SamplingDecision.Drop, attributes); | ||
this.Description = $"RateLimitingSampler{{{DecimalFormat(maxTracesPerSecond)}}}"; | ||
} | ||
|
||
/// <summary> | ||
/// Checks whether activity needs to be created and tracked. | ||
/// </summary> | ||
/// <param name="samplingParameters"> | ||
/// The OpenTelemetry.Trace.SamplingParameters used by the OpenTelemetry.Trace.Sampler | ||
/// to decide if the System.Diagnostics.Activity to be created is going to be sampled | ||
/// or not. | ||
/// </param> | ||
/// <returns> | ||
/// Sampling decision on whether activity needs to be sampled or not. | ||
/// </returns> | ||
public override SamplingResult ShouldSample(in SamplingParameters samplingParameters) | ||
{ | ||
return this.rateLimiter.TrySpend(1.0) ? this.onSamplingResult : this.offSamplingResult; | ||
} | ||
|
||
private static string DecimalFormat(double value) | ||
{ | ||
NumberFormatInfo numberFormatInfo = new NumberFormatInfo | ||
{ | ||
NumberDecimalSeparator = ".", | ||
}; | ||
|
||
return value.ToString("0.00", numberFormatInfo); | ||
} | ||
} |
102 changes: 102 additions & 0 deletions
102
test/OpenTelemetry.Extensions.Tests/Trace/RateLimitingSamplerTests.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
using OpenTelemetry.Trace; | ||
using Xunit; | ||
|
||
namespace OpenTelemetry.Extensions.Tests.Trace; | ||
|
||
public class RateLimitingSamplerTests | ||
{ | ||
[Fact] | ||
public void ShouldSample_ReturnsRecordAndSample_WhenWithinRateLimit() | ||
{ | ||
// Arrange | ||
var samplingParameters = new SamplingParameters( | ||
parentContext: default, | ||
traceId: default, | ||
name: "TestOperation", | ||
kind: default, | ||
tags: null, | ||
links: null); | ||
|
||
var sampler = new RateLimitingSampler(5); // 5 trace per second | ||
int sampleIn = 0, sampleOut = 0; | ||
|
||
// Fire in 3 traces with a second, should all be sampled in | ||
|
||
for (var i = 0; i < 3; i++) | ||
{ | ||
// Act | ||
var result = sampler.ShouldSample(in samplingParameters); | ||
switch (result.Decision) | ||
{ | ||
case SamplingDecision.RecordAndSample: | ||
sampleIn++; | ||
break; | ||
case SamplingDecision.RecordOnly: | ||
Assert.Fail("Unexpected decision"); | ||
break; | ||
case SamplingDecision.Drop: | ||
sampleOut++; | ||
break; | ||
} | ||
|
||
Thread.Sleep(333); | ||
} | ||
|
||
// Assert | ||
Assert.Equal(3, sampleIn); | ||
Assert.Equal(0, sampleOut); | ||
} | ||
|
||
[Fact] | ||
public async Task ShouldFilter_WhenAboveRateLimit() | ||
{ | ||
const int SAMPLE_RATE = 5; // 5 trace per second | ||
const int CYCLES = 500; | ||
|
||
var samplingParameters = new SamplingParameters( | ||
parentContext: default, | ||
traceId: default, | ||
name: "TestOperation", | ||
kind: default, | ||
tags: null, | ||
links: null); | ||
var sampler = new RateLimitingSampler(SAMPLE_RATE); | ||
int sampleIn = 0, sampleOut = 0; | ||
|
||
var startTime = DateTime.UtcNow; | ||
|
||
for (var i = 0; i < CYCLES; i++) | ||
{ | ||
var result = sampler.ShouldSample(in samplingParameters); | ||
switch (result.Decision) | ||
{ | ||
case SamplingDecision.RecordAndSample: | ||
sampleIn++; | ||
break; | ||
case SamplingDecision.RecordOnly: | ||
Assert.Fail("Unexpected decision"); | ||
break; | ||
case SamplingDecision.Drop: | ||
sampleOut++; | ||
break; | ||
} | ||
|
||
// Task.Delay is limited by the OS Scheduler, so we can't guarantee the exact time | ||
await Task.Delay(5); | ||
} | ||
|
||
var timeTakenSeconds = (DateTime.UtcNow - startTime).TotalSeconds; | ||
|
||
// Approximate the number of samples we should have taken | ||
// Account for the fact that the initial balance is the SampleRate, so they will all be sampled in | ||
var approxSamples = Math.Floor(timeTakenSeconds * SAMPLE_RATE) + SAMPLE_RATE; | ||
|
||
// Assert - We should have sampled in 5 traces per second over duration | ||
// Adding in a fudge factor | ||
Assert.True(sampleIn > (approxSamples * 0.9) && sampleIn < (approxSamples * 1.1)); | ||
Assert.True(sampleOut == (CYCLES - sampleIn)); | ||
} | ||
} |