Skip to content

Commit

Permalink
Expose IConfigChangeListener callback service
Browse files Browse the repository at this point in the history
Fix #1619
  • Loading branch information
nulltoken committed Aug 19, 2022
1 parent 2f4f9e7 commit 7791617
Show file tree
Hide file tree
Showing 6 changed files with 510 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -36,12 +37,19 @@ private class MessageConfig : IProxyConfig
private readonly CancellationTokenSource _cts = new CancellationTokenSource();

public MessageConfig(IReadOnlyList<RouteConfig> routes, IReadOnlyList<ClusterConfig> clusters)
: this(routes, clusters, Guid.NewGuid().ToString())
{ }

public MessageConfig(IReadOnlyList<RouteConfig> routes, IReadOnlyList<ClusterConfig> clusters, string revisionId)
{
RevisionId = revisionId ?? throw new ArgumentNullException(nameof(revisionId));
Routes = routes;
Clusters = clusters;
ChangeToken = new CancellationChangeToken(_cts.Token);
}

public string RevisionId { get; }

public IReadOnlyList<RouteConfig> Routes { get; }

public IReadOnlyList<ClusterConfig> Clusters { get; }
Expand Down
39 changes: 39 additions & 0 deletions src/ReverseProxy/Configuration/IConfigChangeListener.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System;
using System.Collections.Generic;

namespace Yarp.ReverseProxy.Configuration;

/// <summary>
/// Allows subscribing to events notifying you when the configuration is loaded and applied, or when those actions fail.
/// </summary>
public interface IConfigChangeListener
{
/// <summary>
/// Invoked when an error occurs while loading the configuration.
/// </summary>
/// <param name="configProvider">The instance of the configuration provider that failed to provide the configuration.</param>
/// <param name="exception">The thrown exception.</param>
void ConfigurationLoadingFailed(IProxyConfigProvider configProvider, Exception exception);

/// <summary>
/// Invoked once the configuration have been successfully loaded.
/// </summary>
/// <param name="proxyConfigs">The list of instances that have been loaded.</param>
void ConfigurationLoaded(IReadOnlyList<IProxyConfig> proxyConfigs);

/// <summary>
/// Invoked when an error occurs while applying the configuration.
/// </summary>
/// <param name="proxyConfigs">The list of instances that were being processed.</param>
/// <param name="exception">The thrown exception.</param>
void ConfigurationApplyingFailed(IReadOnlyList<IProxyConfig> proxyConfigs, Exception exception);

/// <summary>
/// Invoked once the configuration has been successfully applied.
/// </summary>
/// <param name="proxyConfigs">The list of instances that have been applied.</param>
void ConfigurationApplied(IReadOnlyList<IProxyConfig> proxyConfigs);
}
9 changes: 9 additions & 0 deletions src/ReverseProxy/Configuration/IProxyConfig.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using Microsoft.Extensions.Primitives;

namespace Yarp.ReverseProxy.Configuration;
Expand All @@ -11,6 +13,13 @@ namespace Yarp.ReverseProxy.Configuration;
/// </summary>
public interface IProxyConfig
{
private static readonly ConditionalWeakTable<IProxyConfig, string> _revisionIdsTable = new();

/// <summary>
/// A unique identifier for this revision of the configuration.
/// </summary>
string RevisionId => _revisionIdsTable.GetValue(this, static _ => Guid.NewGuid().ToString());

/// <summary>
/// Routes matching requests to clusters.
/// </summary>
Expand Down
32 changes: 31 additions & 1 deletion src/ReverseProxy/Configuration/InMemoryConfigProvider.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.Threading;
using Microsoft.Extensions.Primitives;
Expand All @@ -19,8 +20,15 @@ public sealed class InMemoryConfigProvider : IProxyConfigProvider
/// Creates a new instance.
/// </summary>
public InMemoryConfigProvider(IReadOnlyList<RouteConfig> routes, IReadOnlyList<ClusterConfig> clusters)
: this(routes, clusters, Guid.NewGuid().ToString())
{ }

/// <summary>
/// Creates a new instance, specifying a revision id of the configuration.
/// </summary>
public InMemoryConfigProvider(IReadOnlyList<RouteConfig> routes, IReadOnlyList<ClusterConfig> clusters, string revisionId)
{
_config = new InMemoryConfig(routes, clusters);
_config = new InMemoryConfig(routes, clusters, revisionId);
}

/// <summary>
Expand All @@ -35,6 +43,20 @@ public InMemoryConfigProvider(IReadOnlyList<RouteConfig> routes, IReadOnlyList<C
public void Update(IReadOnlyList<RouteConfig> routes, IReadOnlyList<ClusterConfig> clusters)
{
var newConfig = new InMemoryConfig(routes, clusters);
UpdateInternal(newConfig);
}

/// <summary>
/// Swaps the config state with a new snapshot of the configuration, then signals that the old one is outdated.
/// </summary>
public void Update(IReadOnlyList<RouteConfig> routes, IReadOnlyList<ClusterConfig> clusters, string revisionId)
{
var newConfig = new InMemoryConfig(routes, clusters, revisionId);
UpdateInternal(newConfig);
}

private void UpdateInternal(InMemoryConfig newConfig)
{
var oldConfig = Interlocked.Exchange(ref _config, newConfig);
oldConfig.SignalChange();
}
Expand All @@ -48,12 +70,20 @@ private class InMemoryConfig : IProxyConfig
private readonly CancellationTokenSource _cts = new CancellationTokenSource();

public InMemoryConfig(IReadOnlyList<RouteConfig> routes, IReadOnlyList<ClusterConfig> clusters)
: this(routes, clusters, Guid.NewGuid().ToString())
{ }

public InMemoryConfig(IReadOnlyList<RouteConfig> routes, IReadOnlyList<ClusterConfig> clusters, string revisionId)
{
RevisionId = revisionId ?? throw new ArgumentNullException(nameof(revisionId));
Routes = routes;
Clusters = clusters;
ChangeToken = new CancellationChangeToken(_cts.Token);
}

/// <inheritdoc/>
public string RevisionId { get; }

/// <summary>
/// A snapshot of the list of routes for the proxy
/// </summary>
Expand Down
55 changes: 48 additions & 7 deletions src/ReverseProxy/Management/ProxyConfigManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ internal sealed class ProxyConfigManager : EndpointDataSource, IProxyStateLookup
private readonly List<Action<EndpointBuilder>> _conventions;
private readonly IActiveHealthCheckMonitor _activeHealthCheckMonitor;
private readonly IClusterDestinationsUpdater _clusterDestinationsUpdater;

private readonly IConfigChangeListener[] _configChangeListeners;
private List<Endpoint>? _endpoints;
private CancellationTokenSource _endpointsChangeSource = new();
private IChangeToken _endpointsChangeToken;
Expand All @@ -66,7 +66,8 @@ public ProxyConfigManager(
ITransformBuilder transformBuilder,
IForwarderHttpClientFactory httpClientFactory,
IActiveHealthCheckMonitor activeHealthCheckMonitor,
IClusterDestinationsUpdater clusterDestinationsUpdater)
IClusterDestinationsUpdater clusterDestinationsUpdater,
IEnumerable<IConfigChangeListener> configChangeListeners)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_providers = providers?.ToArray() ?? throw new ArgumentNullException(nameof(providers));
Expand All @@ -80,6 +81,8 @@ public ProxyConfigManager(
_activeHealthCheckMonitor = activeHealthCheckMonitor ?? throw new ArgumentNullException(nameof(activeHealthCheckMonitor));
_clusterDestinationsUpdater = clusterDestinationsUpdater ?? throw new ArgumentNullException(nameof(clusterDestinationsUpdater));

_configChangeListeners = configChangeListeners?.ToArray() ?? Array.Empty<IConfigChangeListener>();

if (_providers.Length == 0)
{
throw new ArgumentException($"At least one {nameof(IProxyConfigProvider)} is required.", nameof(providers));
Expand Down Expand Up @@ -141,6 +144,11 @@ private void CreateEndpoints()
/// <inheritdoc/>
public override IChangeToken GetChangeToken() => Volatile.Read(ref _endpointsChangeToken);

private static IReadOnlyList<IProxyConfig> ExtractListOfProxyConfigs(IEnumerable<ConfigState> configStates)
{
return configStates.Select(state => state.LatestConfig).ToList().AsReadOnly();
}

internal async Task<EndpointDataSource> InitialLoadAsync()
{
// Trigger the first load immediately and throw if it fails.
Expand All @@ -160,8 +168,20 @@ internal async Task<EndpointDataSource> InitialLoadAsync()
clusters.AddRange(config.Clusters ?? Array.Empty<ClusterConfig>());
}

var proxyConfigs = ExtractListOfProxyConfigs(_configs);

foreach (var configChangeListener in _configChangeListeners)
{
configChangeListener.ConfigurationLoaded(proxyConfigs);
}

await ApplyConfigAsync(routes, clusters);

foreach (var configChangeListener in _configChangeListeners)
{
configChangeListener.ConfigurationApplied(proxyConfigs);
}

ListenForConfigChanges();
}
catch (Exception ex)
Expand Down Expand Up @@ -199,17 +219,28 @@ private async Task ReloadConfigAsync()
{
instance.LoadFailed = true;
Log.ErrorReloadingConfig(_logger, ex);

foreach (var configChangeListener in _configChangeListeners)
{
configChangeListener.ConfigurationLoadingFailed(instance.Provider, ex);
}
}

// If we didn't/couldn't get a new config then re-use the last one.
routes.AddRange(instance.LatestConfig.Routes ?? Array.Empty<RouteConfig>());
clusters.AddRange(instance.LatestConfig.Clusters ?? Array.Empty<ClusterConfig>());
}

// Only reload if at least one provider changed.
if (sourcesChanged)
var proxyConfigs = ExtractListOfProxyConfigs(_configs);
foreach (var configChangeListener in _configChangeListeners)
{
try
configChangeListener.ConfigurationLoaded(proxyConfigs);
}

try
{
// Only reload if at least one provider changed.
if (sourcesChanged)
{
var hasChanged = await ApplyConfigAsync(routes, clusters);
lock (_syncRoot)
Expand All @@ -222,9 +253,19 @@ private async Task ReloadConfigAsync()
}
}
}
catch (Exception ex)

foreach (var configChangeListener in _configChangeListeners)
{
configChangeListener.ConfigurationApplied(proxyConfigs);
}
}
catch (Exception ex)
{
Log.ErrorApplyingConfig(_logger, ex);

foreach (var configChangeListener in _configChangeListeners)
{
Log.ErrorApplyingConfig(_logger, ex);
configChangeListener.ConfigurationApplyingFailed(proxyConfigs, ex);
}
}

Expand Down
Loading

0 comments on commit 7791617

Please sign in to comment.