From 9195ea497fa3c31e5c4eac3de385ea18c25c67f2 Mon Sep 17 00:00:00 2001 From: nulltoken Date: Thu, 19 May 2022 17:24:49 +0200 Subject: [PATCH] Expose IConfigNotifier callback service Fix #1619 --- .../KubernetesConfigProvider.cs | 8 + .../Configuration/IConfigNotifier.cs | 45 +++ .../Configuration/IProxyConfig.cs | 9 + .../Configuration/InMemoryConfigProvider.cs | 34 ++- .../Management/ProxyConfigManager.cs | 39 ++- .../Management/ProxyConfigManagerTests.cs | 276 +++++++++++++++++- 6 files changed, 399 insertions(+), 12 deletions(-) create mode 100644 src/ReverseProxy/Configuration/IConfigNotifier.cs diff --git a/src/Kubernetes.Controller/ConfigProvider/KubernetesConfigProvider.cs b/src/Kubernetes.Controller/ConfigProvider/KubernetesConfigProvider.cs index d468f060c7..54cf795a1b 100644 --- a/src/Kubernetes.Controller/ConfigProvider/KubernetesConfigProvider.cs +++ b/src/Kubernetes.Controller/ConfigProvider/KubernetesConfigProvider.cs @@ -1,6 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. +using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; @@ -36,12 +37,19 @@ private class MessageConfig : IProxyConfig private readonly CancellationTokenSource _cts = new CancellationTokenSource(); public MessageConfig(IReadOnlyList routes, IReadOnlyList clusters) + : this(routes, clusters, Guid.NewGuid().ToString()) + { } + + public MessageConfig(IReadOnlyList routes, IReadOnlyList clusters, string revisionId) { + RevisionId = revisionId; Routes = routes; Clusters = clusters; ChangeToken = new CancellationChangeToken(_cts.Token); } + public string RevisionId { get; } + public IReadOnlyList Routes { get; } public IReadOnlyList Clusters { get; } diff --git a/src/ReverseProxy/Configuration/IConfigNotifier.cs b/src/ReverseProxy/Configuration/IConfigNotifier.cs new file mode 100644 index 0000000000..faf57e8afe --- /dev/null +++ b/src/ReverseProxy/Configuration/IConfigNotifier.cs @@ -0,0 +1,45 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; + +namespace Yarp.ReverseProxy.Configuration; + +/// +/// Allows configuration management to optionaly notify of errors occuring during loading and applying. +/// In order to leverage this service, one has got to register a type implementing this interface in +/// the dependency injection container. +/// +public interface IConfigNotifier +{ + /// + /// Invoked when an error occurs during the loading of the configuration. + /// + /// The instance of the configuration provider that failed to provide the configuration. + /// The thrown exception. + /// + void ConfigurationLoadingFailed(IProxyConfigProvider configProvider, Exception ex); + + /// + /// Invoked once the configuration have been successfully loaded. + /// + /// The list of instances that have been loaded. + /// + void ConfigurationLoaded(IReadOnlyList proxyConfigs); + + /// + /// Invoked when an error occurs when attempting to apply the configuration. + /// + /// The list of instances that were being processed. + /// The thrown exception. + /// + void ConfigurationApplyingFailed(IReadOnlyList proxyConfigs, Exception ex); + + /// + /// Invoked once the configuration has been successfully applied. + /// + /// The list of instances that have been applied. + /// + void ConfigurationApplied(IReadOnlyList proxyConfigs); +} diff --git a/src/ReverseProxy/Configuration/IProxyConfig.cs b/src/ReverseProxy/Configuration/IProxyConfig.cs index 66d751b776..18acb694f7 100644 --- a/src/ReverseProxy/Configuration/IProxyConfig.cs +++ b/src/ReverseProxy/Configuration/IProxyConfig.cs @@ -1,7 +1,9 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. +using System; using System.Collections.Generic; +using System.Runtime.CompilerServices; using Microsoft.Extensions.Primitives; namespace Yarp.ReverseProxy.Configuration; @@ -11,6 +13,13 @@ namespace Yarp.ReverseProxy.Configuration; /// public interface IProxyConfig { + private static readonly ConditionalWeakTable _revisionIdsTable = new(); + + /// + /// Configuration version identifier + /// + string RevisionId => _revisionIdsTable.GetValue(this, static _ => Guid.NewGuid().ToString()); + /// /// Routes matching requests to clusters. /// diff --git a/src/ReverseProxy/Configuration/InMemoryConfigProvider.cs b/src/ReverseProxy/Configuration/InMemoryConfigProvider.cs index d0868afea6..5040a189ef 100644 --- a/src/ReverseProxy/Configuration/InMemoryConfigProvider.cs +++ b/src/ReverseProxy/Configuration/InMemoryConfigProvider.cs @@ -1,6 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. +using System; using System.Collections.Generic; using System.Threading; using Microsoft.Extensions.Primitives; @@ -19,8 +20,15 @@ public sealed class InMemoryConfigProvider : IProxyConfigProvider /// Creates a new instance. /// public InMemoryConfigProvider(IReadOnlyList routes, IReadOnlyList clusters) + : this(routes, clusters, Guid.NewGuid().ToString()) + { } + + /// + /// Creates a new instance, specifying a version of the configuration. + /// + public InMemoryConfigProvider(IReadOnlyList routes, IReadOnlyList clusters, string revisionId) { - _config = new InMemoryConfig(routes, clusters); + _config = new InMemoryConfig(routes, clusters, revisionId); } /// @@ -35,6 +43,20 @@ public InMemoryConfigProvider(IReadOnlyList routes, IReadOnlyList routes, IReadOnlyList clusters) { var newConfig = new InMemoryConfig(routes, clusters); + UpdateInternal(newConfig); + } + + /// + /// Swaps the config state with a new versioned snapshot of the configuration, then signals that the old one is outdated. + /// + public void Update(IReadOnlyList routes, IReadOnlyList clusters, string revisionId) + { + var newConfig = new InMemoryConfig(routes, clusters, revisionId); + UpdateInternal(newConfig); + } + + private void UpdateInternal(InMemoryConfig newConfig) + { var oldConfig = Interlocked.Exchange(ref _config, newConfig); oldConfig.SignalChange(); } @@ -48,12 +70,22 @@ private class InMemoryConfig : IProxyConfig private readonly CancellationTokenSource _cts = new CancellationTokenSource(); public InMemoryConfig(IReadOnlyList routes, IReadOnlyList clusters) + : this(routes, clusters, Guid.NewGuid().ToString()) + { } + + public InMemoryConfig(IReadOnlyList routes, IReadOnlyList clusters, string revisionId) { + RevisionId = revisionId; Routes = routes; Clusters = clusters; ChangeToken = new CancellationChangeToken(_cts.Token); } + /// + /// Configuration version identifier + /// + public string RevisionId { get; } + /// /// A snapshot of the list of routes for the proxy /// diff --git a/src/ReverseProxy/Management/ProxyConfigManager.cs b/src/ReverseProxy/Management/ProxyConfigManager.cs index 7f6fea07b4..1475af1b77 100644 --- a/src/ReverseProxy/Management/ProxyConfigManager.cs +++ b/src/ReverseProxy/Management/ProxyConfigManager.cs @@ -49,7 +49,7 @@ internal sealed class ProxyConfigManager : EndpointDataSource, IProxyStateLookup private readonly List> _conventions; private readonly IActiveHealthCheckMonitor _activeHealthCheckMonitor; private readonly IClusterDestinationsUpdater _clusterDestinationsUpdater; - + private readonly IConfigNotifier? _configNotifier; private List? _endpoints; private CancellationTokenSource _endpointsChangeSource = new(); private IChangeToken _endpointsChangeToken; @@ -66,7 +66,8 @@ public ProxyConfigManager( ITransformBuilder transformBuilder, IForwarderHttpClientFactory httpClientFactory, IActiveHealthCheckMonitor activeHealthCheckMonitor, - IClusterDestinationsUpdater clusterDestinationsUpdater) + IClusterDestinationsUpdater clusterDestinationsUpdater, + IConfigNotifier? configNotifier = null) { _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _providers = providers?.ToArray() ?? throw new ArgumentNullException(nameof(providers)); @@ -80,6 +81,8 @@ public ProxyConfigManager( _activeHealthCheckMonitor = activeHealthCheckMonitor ?? throw new ArgumentNullException(nameof(activeHealthCheckMonitor)); _clusterDestinationsUpdater = clusterDestinationsUpdater ?? throw new ArgumentNullException(nameof(clusterDestinationsUpdater)); + _configNotifier = configNotifier; + if (_providers.Length == 0) { throw new ArgumentException($"At least one {nameof(IProxyConfigProvider)} is required.", nameof(providers)); @@ -141,6 +144,11 @@ private void CreateEndpoints() /// public override IChangeToken GetChangeToken() => Volatile.Read(ref _endpointsChangeToken); + private static IReadOnlyList ExtractListOfProxyConfigs(IEnumerable configStates) + { + return configStates.Select(state => state.LatestConfig).ToList().AsReadOnly(); + } + internal async Task InitialLoadAsync() { // Trigger the first load immediately and throw if it fails. @@ -160,8 +168,12 @@ internal async Task InitialLoadAsync() clusters.AddRange(config.Clusters ?? Array.Empty()); } + _configNotifier?.ConfigurationLoaded(ProxyConfigManager.ExtractListOfProxyConfigs(_configs)); + await ApplyConfigAsync(routes, clusters); + _configNotifier?.ConfigurationApplied(_configs.Select(state => state.LatestConfig).ToList().AsReadOnly()); + ListenForConfigChanges(); } catch (Exception ex) @@ -199,6 +211,8 @@ private async Task ReloadConfigAsync() { instance.LoadFailed = true; Log.ErrorReloadingConfig(_logger, ex); + + _configNotifier?.ConfigurationLoadingFailed(instance.Provider, ex); } // If we didn't/couldn't get a new config then re-use the last one. @@ -206,11 +220,14 @@ private async Task ReloadConfigAsync() clusters.AddRange(instance.LatestConfig.Clusters ?? Array.Empty()); } - // Only reload if at least one provider changed. - if (sourcesChanged) + _configNotifier?.ConfigurationLoaded(ProxyConfigManager.ExtractListOfProxyConfigs(_configs)); + + try { - try + // Only reload if at least one provider changed. + if (sourcesChanged) { + var hasChanged = await ApplyConfigAsync(routes, clusters); lock (_syncRoot) { @@ -222,10 +239,14 @@ private async Task ReloadConfigAsync() } } } - catch (Exception ex) - { - Log.ErrorApplyingConfig(_logger, ex); - } + + _configNotifier?.ConfigurationApplied(ProxyConfigManager.ExtractListOfProxyConfigs(_configs)); + } + catch (Exception ex) + { + Log.ErrorApplyingConfig(_logger, ex); + + _configNotifier?.ConfigurationApplyingFailed(ProxyConfigManager.ExtractListOfProxyConfigs(_configs), ex); } ListenForConfigChanges(); diff --git a/test/ReverseProxy.Tests/Management/ProxyConfigManagerTests.cs b/test/ReverseProxy.Tests/Management/ProxyConfigManagerTests.cs index 2a6cab209f..4f1a3a0473 100644 --- a/test/ReverseProxy.Tests/Management/ProxyConfigManagerTests.cs +++ b/test/ReverseProxy.Tests/Management/ProxyConfigManagerTests.cs @@ -14,6 +14,7 @@ using Microsoft.AspNetCore.Routing; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; +using Microsoft.Extensions.Primitives; using Moq; using Xunit; using Yarp.ReverseProxy.Configuration; @@ -29,7 +30,11 @@ namespace Yarp.ReverseProxy.Management.Tests; public class ProxyConfigManagerTests { - private static IServiceProvider CreateServices(List routes, List clusters, Action configureProxy = null) + private static IServiceProvider CreateServices( + List routes, + List clusters, + Action configureProxy = null, + IConfigNotifier configNotifier = null) { var serviceCollection = new ServiceCollection(); serviceCollection.AddLogging(); @@ -41,13 +46,20 @@ private static IServiceProvider CreateServices(List routes, List p.Name).Returns("activePolicyA"); serviceCollection.AddSingleton(activeHealthPolicy.Object); configureProxy?.Invoke(proxyBuilder); + if (configNotifier is not null) + { + serviceCollection.AddSingleton(configNotifier); + } var services = serviceCollection.BuildServiceProvider(); var routeBuilder = services.GetRequiredService(); routeBuilder.SetProxyPipeline(context => Task.CompletedTask); return services; } - private static IServiceProvider CreateServices(IEnumerable configProviders, Action configureProxy = null) + private static IServiceProvider CreateServices( + IEnumerable configProviders, + Action configureProxy = null, + IConfigNotifier configNotifier = null) { var serviceCollection = new ServiceCollection(); serviceCollection.AddLogging(); @@ -63,6 +75,10 @@ private static IServiceProvider CreateServices(IEnumerable activeHealthPolicy.SetupGet(p => p.Name).Returns("activePolicyA"); serviceCollection.AddSingleton(activeHealthPolicy.Object); configureProxy?.Invoke(proxyBuilder); + if (configNotifier is not null) + { + serviceCollection.AddSingleton(configNotifier); + } var services = serviceCollection.BuildServiceProvider(); var routeBuilder = services.GetRequiredService(); routeBuilder.SetProxyPipeline(context => Task.CompletedTask); @@ -367,6 +383,262 @@ public async Task BuildConfig_TwoOverlappingConfigs_Works() Assert.Equal(TestAddress, destination.Model.Config.Address); } + private class FakeConfigNotifier : IConfigNotifier + { + public bool? HasApplyingSucceeded { get; private set; } + public bool DidAtLeastOneErrorOccurWhileLoading { get; private set; } + public string[] EventuallyLoaded; + public string[] SuccessfullyApplied; + public string[] FailedApplied; + + public FakeConfigNotifier() + { + Reset(); + } + + public void Reset() + { + DidAtLeastOneErrorOccurWhileLoading = false; + HasApplyingSucceeded = null; + EventuallyLoaded = new string[] {}; + SuccessfullyApplied = new string[] { }; + FailedApplied = new string[] { }; + } + + public void ConfigurationLoadingFailed(IProxyConfigProvider configProvider, Exception ex) + { + DidAtLeastOneErrorOccurWhileLoading = true; + } + + public void ConfigurationLoaded(IReadOnlyList proxyConfigs) + { + EventuallyLoaded = proxyConfigs.Select(c => c.RevisionId).ToArray(); + } + + public void ConfigurationApplyingFailed(IReadOnlyList proxyConfigs, Exception ex) + { + HasApplyingSucceeded = false; + FailedApplied = proxyConfigs.Select(c => c.RevisionId).ToArray(); + } + + public void ConfigurationApplied(IReadOnlyList proxyConfigs) + { + HasApplyingSucceeded = true; + SuccessfullyApplied = proxyConfigs.Select(c => c.RevisionId).ToArray(); + } + } + + private class InMemoryConfig : IProxyConfig + { + private readonly CancellationTokenSource _cts = new CancellationTokenSource(); + + public InMemoryConfig(IReadOnlyList routes, IReadOnlyList clusters, string revisionId) + { + RevisionId = revisionId; + Routes = routes; + Clusters = clusters; + ChangeToken = new CancellationChangeToken(_cts.Token); + } + + public string RevisionId { get; } + + public IReadOnlyList Routes { get; } + + public IReadOnlyList Clusters { get; } + + public IChangeToken ChangeToken { get; } + + internal void SignalChange() + { + _cts.Cancel(); + } + } + + private class OnDemandFailingInMemoryConfigProvider : IProxyConfigProvider + { + private volatile InMemoryConfig _config; + + public OnDemandFailingInMemoryConfigProvider( + InMemoryConfig config) + { + _config = config; + } + + public OnDemandFailingInMemoryConfigProvider( + IReadOnlyList routes, + IReadOnlyList clusters, + string revisionId) : this(new InMemoryConfig(routes, clusters, revisionId)) + { + } + + public IProxyConfig GetConfig() + { + if (ShouldConfigLoadingFail) + { + return null; + } + + return _config; + } + + public void Update(IReadOnlyList routes, IReadOnlyList clusters, string revisionId) + { + Update(new InMemoryConfig(routes, clusters, revisionId)); + } + + public void Update(InMemoryConfig config) + { + var oldConfig = Interlocked.Exchange(ref _config, config); + oldConfig.SignalChange(); + } + + public bool ShouldConfigLoadingFail { get; set; } + } + + [Fact] + public async Task BuildConfig_CanBeNotifiedOfProxyConfigSuccessfulAndFailedLoading() + { + var configProviderA = new OnDemandFailingInMemoryConfigProvider(new List() { }, new List() { }, "A1"); + var configProviderB = new OnDemandFailingInMemoryConfigProvider(new List() { }, new List() { }, "B1"); + + var configNotifier = new FakeConfigNotifier(); + + configNotifier.Reset(); + var services = CreateServices(new[] { configProviderA, configProviderB }, null, configNotifier); + + var manager = services.GetRequiredService(); + await manager.InitialLoadAsync(); + + Assert.False(configNotifier.DidAtLeastOneErrorOccurWhileLoading); + Assert.Equal(new[] { "A1", "B1" }, configNotifier.EventuallyLoaded); + Assert.True(configNotifier.HasApplyingSucceeded); + Assert.Equal(new[] { "A1", "B1" }, configNotifier.SuccessfullyApplied); + Assert.Empty(configNotifier.FailedApplied); + + const string TestAddress = "https://localhost:123/"; + + var cluster1 = new ClusterConfig + { + ClusterId = "cluster1", + Destinations = new Dictionary(StringComparer.OrdinalIgnoreCase) + { + { "d1", new DestinationConfig { Address = TestAddress } } + } + }; + var cluster2 = new ClusterConfig + { + ClusterId = "cluster2", + Destinations = new Dictionary(StringComparer.OrdinalIgnoreCase) + { + { "d2", new DestinationConfig { Address = TestAddress } } + } + }; + + var route1 = new RouteConfig + { + RouteId = "route1", + ClusterId = "cluster1", + Match = new RouteMatch { Path = "/" } + }; + var route2 = new RouteConfig + { + RouteId = "route2", + ClusterId = "cluster2", + Match = new RouteMatch { Path = "/" } + }; + + configNotifier.Reset(); + configProviderA.Update(new List() { route1 }, new List() { cluster1 }, "A2"); + + Assert.False(configNotifier.DidAtLeastOneErrorOccurWhileLoading); + Assert.Equal(new[] { "A2", "B1" }, configNotifier.EventuallyLoaded); + Assert.True(configNotifier.HasApplyingSucceeded); + Assert.Equal(new[] { "A2", "B1" }, configNotifier.SuccessfullyApplied); + Assert.Empty(configNotifier.FailedApplied); + + configProviderB.ShouldConfigLoadingFail = true; + + configNotifier.Reset(); + configProviderB.Update(new List() { route2 }, new List() { cluster2 }, "B2"); + + Assert.True(configNotifier.DidAtLeastOneErrorOccurWhileLoading); + Assert.Equal(new[] { "A2", "B1" }, configNotifier.EventuallyLoaded); + Assert.True(configNotifier.HasApplyingSucceeded); + Assert.Equal(new[] { "A2", "B1" }, configNotifier.SuccessfullyApplied); + Assert.Empty(configNotifier.FailedApplied); + } + + [Fact] + public async Task BuildConfig_CanBeNotifiedOfProxyConfigSuccessfulAndFailedUpdating() + { + var configProviderA = new InMemoryConfigProvider(new List() { }, new List() { }, "A1"); + var configProviderB = new InMemoryConfigProvider(new List() { }, new List() { }, "B1"); + + var configNotifier = new FakeConfigNotifier(); + + configNotifier.Reset(); + var services = CreateServices(new[] { configProviderA, configProviderB }, null, configNotifier); + + var manager = services.GetRequiredService(); + var dataSource = await manager.InitialLoadAsync(); + + Assert.False(configNotifier.DidAtLeastOneErrorOccurWhileLoading); + Assert.Equal(new[] { "A1", "B1" }, configNotifier.EventuallyLoaded); + Assert.True(configNotifier.HasApplyingSucceeded); + Assert.Equal(new[] { "A1", "B1" }, configNotifier.SuccessfullyApplied); + Assert.Empty(configNotifier.FailedApplied); + + const string TestAddress = "https://localhost:123/"; + + var cluster1 = new ClusterConfig + { + ClusterId = "cluster1", + Destinations = new Dictionary(StringComparer.OrdinalIgnoreCase) + { + { "d1", new DestinationConfig { Address = TestAddress } } + } + }; + var cluster2 = new ClusterConfig + { + ClusterId = "cluster2", + Destinations = new Dictionary(StringComparer.OrdinalIgnoreCase) + { + { "d2", new DestinationConfig { Address = TestAddress } } + } + }; + + var route1 = new RouteConfig + { + RouteId = "route1", + ClusterId = "cluster1", + Match = new RouteMatch { Path = "/" } + }; + var route2 = new RouteConfig + { + RouteId = "route2", + ClusterId = "cluster2", + // Missing Match here will be caught by the analysis + }; + + configNotifier.Reset(); + configProviderA.Update(new List() { route1 }, new List() { cluster1 }, "A2"); + + Assert.False(configNotifier.DidAtLeastOneErrorOccurWhileLoading); + Assert.Equal(new[] { "A2", "B1" }, configNotifier.EventuallyLoaded); + Assert.True(configNotifier.HasApplyingSucceeded); + Assert.Equal(new[] { "A2", "B1" }, configNotifier.SuccessfullyApplied); + Assert.Empty(configNotifier.FailedApplied); + + configNotifier.Reset(); + configProviderB.Update(new List() { route2 }, new List() { cluster2 }, "B2"); + + Assert.False(configNotifier.DidAtLeastOneErrorOccurWhileLoading); + Assert.Equal(new[] { "A2", "B2" }, configNotifier.EventuallyLoaded); + Assert.False(configNotifier.HasApplyingSucceeded); + Assert.Empty(configNotifier.SuccessfullyApplied); + Assert.Equal(new[] { "A2", "B2" }, configNotifier.FailedApplied); + } + [Fact] public async Task InitialLoadAsync_ProxyHttpClientOptionsSet_CreateAndSetHttpClient() {