Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding distinct activity for distributed tracing to YARP #2098

Merged
merged 10 commits into from
May 5, 2023
59 changes: 55 additions & 4 deletions docs/docfx/articles/distributed-tracing.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,64 @@

# Distributed tracing

As an ASP.NET Core component, YARP can easily integrate into different tracing systems the same as any other web application.
See detailed guides for setting up your application with:
- [OpenTelemetry] or
- [Application Insights]
samsp-msft marked this conversation as resolved.
Show resolved Hide resolved
As an ASP.NET Core component, YARP can easily integrate into different tracing systems the same as any other ASP.NET Core application.

.NET has built-in configurable support for distributed tracing that YARP takes advantage of to enable such scenarios out-of-the-box.

## Using Open Telemetry

YARP supports distributed tracing using Open Telemetry (OTEL). When a request comes in, and there is a listener for Activities, then ASP.NET Core will propagate the [Trace Context](https://www.w3.org/TR/trace-context) trace-id, or create one if necessary, and create new spans/activities for the work performed.
In addition YARP can create activities for:

- Forwarding Requests
- Active health checks for clusters

These will only be created if there is a listener for the [`ActivitySource`](https://learn.microsoft.com/dotnet/core/diagnostics/distributed-tracing-instrumentation-walkthroughs#activitysource) named `Yarp.ReverseProxy`.

For example, to monitor the traces with Application Insights, the proxy application needs to use the [Open Telemetry](https://github.com/open-telemetry/opentelemetry-dotnet/blob/main/src/OpenTelemetry/README.md) and [Azure Monitor](https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/monitor/Azure.Monitor.OpenTelemetry.AspNetCore/README.md) SDKs.

`application.csproj`:

``` xml
<ItemGroup>
<PackageReference Include="Azure.Monitor.OpenTelemetry.AspNetCore" Version="1.0.0-beta.3" />
</ItemGroup>
```

`Program.cs`:

``` c#
using Azure.Monitor.OpenTelemetry.AspNetCore;
using OpenTelemetry.Trace;
using System.Diagnostics;

var builder = WebApplication.CreateBuilder(args);
builder.Services.AddReverseProxy().LoadFromConfig(builder.Configuration.GetSection("ReverseProxy"));

builder.Services.AddOpenTelemetry()
// Use helper to configure Azure Monitor defaults
.UseAzureMonitor(o =>
{
o.ConnectionString = builder.Configuration["APPLICATIONINSIGHTS_CONNECTION_STRING"];
})
.WithTracing(t =>
{
// Listen to the YARP tracing activities
t.AddSource("Yarp.ReverseProxy");
});

var app = builder.Build();

app.MapReverseProxy();

app.Run();

```

Provided that the traces are being logged to the same store for the proxy and destination servers, then the tracing analysis tools can correlate the requests and provide gant charts etc covering the end-to-end processing of the requests as they transition across the servers.

The same pattern can be used with the built-in OTEL exporters for Jaeger and Zipkin, or with many of the [APM vendors](https://opentelemetry.io/ecosystem/vendors/) who are adopting OTEL.

## Using custom tracing headers

When using a propagation mechanism that is not built into .NET (e.g. [B3 propagation]), you should implement a custom [`DistributedContextPropagator`] for that scheme.
Expand Down
15 changes: 13 additions & 2 deletions src/ReverseProxy/Forwarder/ForwarderMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

using System;
using System.Threading.Tasks;
using System.Diagnostics;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Http.Connections;
samsp-msft marked this conversation as resolved.
Show resolved Hide resolved
using Microsoft.Extensions.Logging;
using Yarp.ReverseProxy.Model;
using Yarp.ReverseProxy.Utilities;
Expand Down Expand Up @@ -40,11 +42,18 @@ public async Task Invoke(HttpContext context)
var route = context.GetRouteModel();
var cluster = route.Cluster!;

var activity = context.GetYarpActivity();
activity?.AddTag("RouteId", route.Config.RouteId);
activity?.AddTag("ClusterId", cluster.ClusterId);


if (destinations.Count == 0)
{
Log.NoAvailableDestinations(_logger, cluster.ClusterId);
context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
context.Features.Set<IForwarderErrorFeature>(new ForwarderErrorFeature(ForwarderError.NoAvailableDestinations, ex: null));
activity?.SetStatus(ActivityStatusCode.Error);
activity?.AddTag("DestinationId", "No destinations available");
return;
}

Expand All @@ -57,6 +66,7 @@ public async Task Invoke(HttpContext context)
}

reverseProxyFeature.ProxiedDestination = destination;
activity?.AddTag("DestinationId", destination.DestinationId);

var destinationModel = destination.Model;
if (destinationModel is null)
Expand All @@ -68,12 +78,13 @@ public async Task Invoke(HttpContext context)
{
cluster.ConcurrencyCounter.Increment();
destination.ConcurrencyCounter.Increment();

ForwarderTelemetry.Log.ForwarderInvoke(cluster.ClusterId, route.Config.RouteId, destination.DestinationId);

var clusterConfig = reverseProxyFeature.Cluster;
await _forwarder.SendAsync(context, destinationModel.Config.Address, clusterConfig.HttpClient,
var result = await _forwarder.SendAsync(context, destinationModel.Config.Address, clusterConfig.HttpClient,
clusterConfig.Config.HttpRequest ?? ForwarderRequestConfig.Empty, route.Transformer);

activity?.SetStatus((result == ForwarderError.None) ? ActivityStatusCode.Ok : ActivityStatusCode.Error);
}
finally
{
Expand Down
17 changes: 17 additions & 0 deletions src/ReverseProxy/Health/ActiveHealthCheckMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -108,6 +109,10 @@ private async Task ProbeCluster(ClusterState cluster)
return;
}

// Creates an Activity to trace the active health checks
var activity = Observability.YarpActivitySource.StartActivity("Proxy cluster active health checks", ActivityKind.Consumer);
samsp-msft marked this conversation as resolved.
Show resolved Hide resolved
activity?.AddTag("ClusterId", cluster.ClusterId);

Log.StartingActiveHealthProbingOnCluster(_logger, cluster.ClusterId);

var allDestinations = cluster.DestinationsState.AllDestinations;
Expand All @@ -130,10 +135,12 @@ private async Task ProbeCluster(ClusterState cluster)
{
var policy = _policies.GetRequiredServiceById(config.Policy, HealthCheckConstants.ActivePolicy.ConsecutiveFailures);
policy.ProbingCompleted(cluster, probeResults);
activity?.SetStatus(ActivityStatusCode.Ok);
}
catch (Exception ex)
{
Log.ActiveHealthProbingFailedOnCluster(_logger, cluster.ClusterId, ex);
activity?.SetStatus(ActivityStatusCode.Error);
}
finally
{
Expand All @@ -150,6 +157,7 @@ private async Task ProbeCluster(ClusterState cluster)
}

Log.StoppedActiveHealthProbingOnCluster(_logger, cluster.ClusterId);
activity?.Stop();
}
}

Expand All @@ -167,19 +175,28 @@ private async Task<DestinationProbingResult> ProbeDestinationAsync(ClusterState
return new DestinationProbingResult(destination, null, ex);
}

var probeActivity = Observability.YarpActivitySource.StartActivity("Proxy destination health check", ActivityKind.Client);
samsp-msft marked this conversation as resolved.
Show resolved Hide resolved
probeActivity?.AddTag("ClusterId", cluster.ClusterId);
probeActivity?.AddTag("DestinationId", destination.DestinationId);
var cts = new CancellationTokenSource(timeout);
try
{
Log.SendingHealthProbeToEndpointOfDestination(_logger, request.RequestUri, destination.DestinationId, cluster.ClusterId);
var response = await cluster.Model.HttpClient.SendAsync(request, cts.Token);
Log.DestinationProbingCompleted(_logger, destination.DestinationId, cluster.ClusterId, (int)response.StatusCode);

probeActivity?.SetStatus(ActivityStatusCode.Ok);
probeActivity?.Stop();

return new DestinationProbingResult(destination, response, null);
}
catch (Exception ex)
{
Log.DestinationProbingFailed(_logger, destination.DestinationId, cluster.ClusterId, ex);

probeActivity?.SetStatus(ActivityStatusCode.Error);
probeActivity?.Stop();

return new DestinationProbingResult(destination, null, ex);
}
finally
Expand Down
15 changes: 11 additions & 4 deletions src/ReverseProxy/Model/ProxyPipelineInitializerMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
// Licensed under the MIT License.

using System;
using System.Diagnostics;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Yarp.ReverseProxy.Utilities;

namespace Yarp.ReverseProxy.Model;

Expand All @@ -23,7 +25,7 @@ public ProxyPipelineInitializerMiddleware(RequestDelegate next,
_next = next ?? throw new ArgumentNullException(nameof(next));
}

public Task Invoke(HttpContext context)
public async Task Invoke(HttpContext context)
{
var endpoint = context.GetEndpoint()
?? throw new InvalidOperationException($"Routing Endpoint wasn't set for the current request.");
Expand All @@ -37,7 +39,7 @@ public Task Invoke(HttpContext context)
{
Log.NoClusterFound(_logger, route.Config.RouteId);
context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
return Task.CompletedTask;
return;
}

var destinationsState = cluster.DestinationsState;
Expand All @@ -46,10 +48,15 @@ public Task Invoke(HttpContext context)
Route = route,
Cluster = cluster.Model,
AllDestinations = destinationsState.AllDestinations,
AvailableDestinations = destinationsState.AvailableDestinations
AvailableDestinations = destinationsState.AvailableDestinations,
});

return _next(context);
var activity = Observability.YarpActivitySource.StartActivity("Proxy Forwarder", ActivityKind.Server);
context.SetYarpActivity(activity);

await _next(context);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will now allocate an extra async state machine even if the request isn't being sampled.
Can you please change the logic to something along the lines of what I shared here: #2098 (comment)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will now allocate an extra async state machine even if the request isn't being sampled. Can you please change the logic to something along the lines of what I shared here: #2098 (comment)?

It only adds it if the activity is created, which depends on the listener. If the activity is null, nothing gets set.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is that the method is now async, which means you get an extra state machine allocation even if you don't have the activity.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want you can keep it as-is and I can clean it up in a follow up PR


activity?.Stop();
samsp-msft marked this conversation as resolved.
Show resolved Hide resolved
}

private static class Log
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,14 @@ private async Task InvokeInternal(HttpContext context, IReverseProxyFeature prox
var policy = _sessionAffinityPolicies.GetRequiredServiceById(config.Policy, SessionAffinityConstants.Policies.HashCookie);
var affinityResult = await policy.FindAffinitizedDestinationsAsync(context, cluster, config, destinations, context.RequestAborted);

// Used for Distributed Tracing as part of Open Telemetry, null if there are no listeners
var activity = context.GetYarpActivity();

switch (affinityResult.Status)
{
case AffinityStatus.OK:
proxyFeature.AvailableDestinations = affinityResult.Destinations!;
activity?.SetTag("SessionAffinitySuccess", policy.Name);
samsp-msft marked this conversation as resolved.
Show resolved Hide resolved
break;
case AffinityStatus.AffinityKeyNotSet:
// Nothing to do so just continue processing
Expand All @@ -75,10 +79,12 @@ private async Task InvokeInternal(HttpContext context, IReverseProxyFeature prox
// Policy reported the failure is unrecoverable and took the full responsibility for its handling,
// so we simply stop processing.
Log.AffinityResolutionFailedForCluster(_logger, cluster.ClusterId);
activity?.SetTag("SessionAffinityFailure", policy.Name);
return;
}

Log.AffinityResolutionFailureWasHandledProcessingWillBeContinued(_logger, cluster.ClusterId, failurePolicy.Name);
activity?.SetTag("SessionAffinityFailure", policy.Name);

break;
default:
Expand Down
34 changes: 34 additions & 0 deletions src/ReverseProxy/Utilities/Observability.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
using System;
samsp-msft marked this conversation as resolved.
Show resolved Hide resolved
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using Microsoft.AspNetCore.Http;

namespace Yarp.ReverseProxy.Utilities
samsp-msft marked this conversation as resolved.
Show resolved Hide resolved
{
samsp-msft marked this conversation as resolved.
Show resolved Hide resolved
internal static class Observability
{
public static readonly ActivitySource YarpActivitySource = new ActivitySource("Yarp.ReverseProxy");

public static Activity? GetYarpActivity(this HttpContext context)
{
return context.Features.Get<ActivityWrapper>()?.YarpActivity;
samsp-msft marked this conversation as resolved.
Show resolved Hide resolved
}

public static void SetYarpActivity(this HttpContext context, Activity? activity)
{
if (activity != null)
{
context.Features.Set(new ActivityWrapper { YarpActivity = activity });
}
}

private class ActivityWrapper
{
internal Activity? YarpActivity;
}
samsp-msft marked this conversation as resolved.
Show resolved Hide resolved
}
}