Skip to content

Commit

Permalink
Refactored k8s endpoints PR +semver: major
Browse files Browse the repository at this point in the history
  • Loading branch information
TomPallister committed Apr 11, 2020
2 parents fd7c6d7 + 84907bd commit 6e5471a
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 72 deletions.
8 changes: 5 additions & 3 deletions docs/features/kubernetes.rst
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
Kubernetes
==============

This feature was requested as part of `Issue 345 <https://github.com/ThreeMammals/Ocelot/issues/345>`_ . to add support for kubernetes's service discovery provider.
This feature was requested as part of `Issue 345 <https://github.com/ThreeMammals/Ocelot/issues/345>`_ . to add support for kubernetes's provider.

Ocelot will call the k8s endpoints API in a given namespace to get all of the endpoints for a pod and then load balance across them. Ocelot used to use the services api to send requests to the k8s service but this was changed in `PR 1134 <https://github.com/ThreeMammals/Ocelot/pull/1134>`_ because the service did not load balance as expected.

The first thing you need to do is install the NuGet package that provides kubernetes support in Ocelot.

Expand All @@ -23,7 +25,7 @@ If you have services deployed in kubernetes you will normally use the naming ser
}
You can replicate a Permissive. Using RBAC role bindings.
`Permissive RBAC Permissions <https://kubernetes.io/docs/reference/access-authn-authz/rbac/#permissive-rbac-permissions>`_, k8s api server and token will read from pod .
`Permissive RBAC Permissions <https://kubernetes.io/docs/reference/access-authn-authz/rbac/#permissive-rbac-permissions>`_, k8s api server and token will read from pod.

.. code-block::bash
kubectl create clusterrolebinding permissive-binding --clusterrole=cluster-admin --user=admin --user=kubelet --group=system:serviceaccounts
Expand Down Expand Up @@ -76,7 +78,7 @@ The polling interval is in milliseconds and tells Ocelot how often to call kuber
Please note there are tradeoffs here. If you poll kubernetes it is possible Ocelot will not know if a service is down depending on your polling interval and you might get more errors than if you get the latest services per request. This really depends on how volatile your services are. I doubt it will matter for most people and polling may give a tiny performance improvement over calling kubernetes per request.
There is no way for Ocelot to work these out for you.

If your downstream service resides in a different namespace you can override the global setting at the ReRoute level by specifying a ServiceNamespace
If your downstream service resides in a different namespace you can override the global setting at the ReRoute level by specifying a ServiceNamespace.


.. code-block:: json
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
using HTTPlease;
using KubeClient;
using KubeClient.Models;
using KubeClient.ResourceClients;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace Ocelot.Provider.Kubernetes.KubeApiClientExtensions
{
public class EndPointClientV1 : KubeResourceClient
{
private readonly HttpRequest _collection = KubeRequest.Create("api/v1/namespaces/{Namespace}/endpoints/{ServiceName}");

public EndPointClientV1(IKubeApiClient client) : base(client)
{
}

public async Task<EndpointsV1> Get(string serviceName, string kubeNamespace = null, CancellationToken cancellationToken = default)
{
if (string.IsNullOrEmpty(serviceName)) throw new ArgumentNullException(nameof(serviceName));

var response = await Http.GetAsync(
_collection.WithTemplateParameters(new
{
Namespace = kubeNamespace ?? KubeClient.DefaultNamespace,
ServiceName = serviceName
}),
cancellationToken
);

if (response.IsSuccessStatusCode)
return await response.ReadContentAsAsync<EndpointsV1>();

return null;
}
}
}
53 changes: 24 additions & 29 deletions src/Ocelot.Provider.Kubernetes/KubeProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,57 +6,52 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Ocelot.Provider.Kubernetes.KubeApiClientExtensions;

namespace Ocelot.Provider.Kubernetes
{
public class Kube : IServiceDiscoveryProvider
public class KubernetesServiceDiscoveryProvider : IServiceDiscoveryProvider
{
private KubeRegistryConfiguration kubeRegistryConfiguration;
private IOcelotLogger logger;
private IKubeApiClient kubeApi;
private readonly KubeRegistryConfiguration _kubeRegistryConfiguration;
private readonly IOcelotLogger _logger;
private readonly IKubeApiClient _kubeApi;

public Kube(KubeRegistryConfiguration kubeRegistryConfiguration, IOcelotLoggerFactory factory, IKubeApiClient kubeApi)
public KubernetesServiceDiscoveryProvider(KubeRegistryConfiguration kubeRegistryConfiguration, IOcelotLoggerFactory factory, IKubeApiClient kubeApi)
{
this.kubeRegistryConfiguration = kubeRegistryConfiguration;
this.logger = factory.CreateLogger<Kube>();
this.kubeApi = kubeApi;
_kubeRegistryConfiguration = kubeRegistryConfiguration;
_logger = factory.CreateLogger<KubernetesServiceDiscoveryProvider>();
_kubeApi = kubeApi;
}


public async Task<List<Service>> Get()
{
var service = await kubeApi.ServicesV1().Get(kubeRegistryConfiguration.KeyOfServiceInK8s, kubeRegistryConfiguration.KubeNamespace);
var endpoint = await _kubeApi
.ResourceClient(client => new EndPointClientV1(client))
.Get(_kubeRegistryConfiguration.KeyOfServiceInK8s, _kubeRegistryConfiguration.KubeNamespace);

var services = new List<Service>();
if (IsValid(service))
if (endpoint != null && endpoint.Subsets.Any())
{
services.Add(BuildService(service));
services.AddRange(BuildServices(endpoint));
}
else
{
logger.LogWarning($"namespace:{kubeRegistryConfiguration.KubeNamespace }service:{kubeRegistryConfiguration.KeyOfServiceInK8s} Unable to use ,it is invalid. Address must contain host only e.g. localhost and port must be greater than 0");
_logger.LogWarning($"namespace:{_kubeRegistryConfiguration.KubeNamespace }service:{_kubeRegistryConfiguration.KeyOfServiceInK8s} Unable to use ,it is invalid. Address must contain host only e.g. localhost and port must be greater than 0");
}
return services;
}

private bool IsValid(ServiceV1 service)
private List<Service> BuildServices(EndpointsV1 endpoint)
{
if (string.IsNullOrEmpty(service.Spec.ClusterIP) || service.Spec.Ports.Count <= 0)
var services = new List<Service>();

foreach (var subset in endpoint.Subsets)
{
return false;
services.AddRange(subset.Addresses.Select(address => new Service(endpoint.Metadata.Name,
new ServiceHostAndPort(address.Ip, subset.Ports.First().Port),
endpoint.Metadata.Uid, string.Empty, Enumerable.Empty<string>())));
}

return true;
}

private Service BuildService(ServiceV1 serviceEntry)
{
var servicePort = serviceEntry.Spec.Ports.FirstOrDefault();
return new Service(
serviceEntry.Metadata.Name,
new ServiceHostAndPort(serviceEntry.Spec.ClusterIP, servicePort.Port),
serviceEntry.Metadata.Uid,
string.Empty,
Enumerable.Empty<string>());
return services;
}
}
}
10 changes: 6 additions & 4 deletions src/Ocelot.Provider.Kubernetes/KubernetesProviderFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,24 @@ public static class KubernetesProviderFactory
public static ServiceDiscoveryFinderDelegate Get = (provider, config, reRoute) =>
{
var factory = provider.GetService<IOcelotLoggerFactory>();
return GetkubeProvider(provider, config, reRoute, factory);
return GetKubeProvider(provider, config, reRoute, factory);
};

private static ServiceDiscovery.Providers.IServiceDiscoveryProvider GetkubeProvider(IServiceProvider provider, Configuration.ServiceProviderConfiguration config, DownstreamReRoute reRoute, IOcelotLoggerFactory factory)
private static ServiceDiscovery.Providers.IServiceDiscoveryProvider GetKubeProvider(IServiceProvider provider, ServiceProviderConfiguration config, DownstreamReRoute reRoute, IOcelotLoggerFactory factory)
{
var kubeClient = provider.GetService<IKubeApiClient>();

var k8sRegistryConfiguration = new KubeRegistryConfiguration()
{
KeyOfServiceInK8s = reRoute.ServiceName,
KubeNamespace = string.IsNullOrEmpty(reRoute.ServiceNamespace) ? config.Namespace : reRoute.ServiceNamespace
};

var k8sServiceDiscoveryProvider = new Kube(k8sRegistryConfiguration, factory, kubeClient);
var k8sServiceDiscoveryProvider = new KubernetesServiceDiscoveryProvider(k8sRegistryConfiguration, factory, kubeClient);

if (config.Type?.ToLower() == "pollkube")
{
return new PollKube(config.PollingInterval, factory, k8sServiceDiscoveryProvider);
return new PollKubernetes(config.PollingInterval, factory, k8sServiceDiscoveryProvider);
}
return k8sServiceDiscoveryProvider;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@

namespace Ocelot.Provider.Kubernetes
{
public class PollKube : IServiceDiscoveryProvider
public class PollKubernetes : IServiceDiscoveryProvider
{
private readonly IOcelotLogger _logger;
private readonly IServiceDiscoveryProvider _kubeServiceDiscoveryProvider;
private readonly Timer _timer;
private bool _polling;
private List<Service> _services;

public PollKube(int pollingInterval, IOcelotLoggerFactory factory, IServiceDiscoveryProvider kubeServiceDiscoveryProvider)
public PollKubernetes(int pollingInterval, IOcelotLoggerFactory factory, IServiceDiscoveryProvider kubeServiceDiscoveryProvider)
{
_logger = factory.CreateLogger<PollKube>();
_logger = factory.CreateLogger<PollKubernetes>();
_kubeServiceDiscoveryProvider = kubeServiceDiscoveryProvider;
_services = new List<Service>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ namespace Ocelot.UnitTests.Kubernetes
public class KubeServiceDiscoveryProviderTests : IDisposable
{
private IWebHost _fakeKubeBuilder;
private ServiceV1 _serviceEntries;
private readonly Kube _provider;
private readonly KubernetesServiceDiscoveryProvider _provider;
private EndpointsV1 _endpointEntries;
private readonly string _serviceName;
private readonly string _namespaces;
private readonly int _port;
Expand All @@ -41,59 +41,55 @@ public KubeServiceDiscoveryProviderTests()
_port = 86;
_kubeHost = "localhost";
_fakekubeServiceDiscoveryUrl = $"http://{_kubeHost}:{_port}";
_serviceEntries = new ServiceV1();
_endpointEntries = new EndpointsV1();
_factory = new Mock<IOcelotLoggerFactory>();

var option = new KubeClientOptions
{
ApiEndPoint = new Uri(_fakekubeServiceDiscoveryUrl),
AccessToken = "txpc696iUhbVoudg164r93CxDTrKRVWG",
AuthStrategy = KubeClient.KubeAuthStrategy.BearerToken,
AllowInsecure = true
AllowInsecure = true,
};

_clientFactory = KubeApiClient.Create(option);
_logger = new Mock<IOcelotLogger>();
_factory.Setup(x => x.CreateLogger<Kube>()).Returns(_logger.Object);
_factory.Setup(x => x.CreateLogger<KubernetesServiceDiscoveryProvider>()).Returns(_logger.Object);
var config = new KubeRegistryConfiguration()
{
KeyOfServiceInK8s = _serviceName,
KubeNamespace = _namespaces
KubeNamespace = _namespaces,
};
_provider = new Kube(config, _factory.Object, _clientFactory);
_provider = new KubernetesServiceDiscoveryProvider(config, _factory.Object, _clientFactory);
}

[Fact]
public void should_return_service_from_k8s()
{
var token = "Bearer txpc696iUhbVoudg164r93CxDTrKRVWG";
var serviceEntryOne = new ServiceV1()
var endPointEntryOne = new EndpointsV1
{
Kind = "service",
Kind = "endpoint",
ApiVersion = "1.0",
Metadata = new ObjectMetaV1()
{
Namespace = "dev"
Namespace = "dev",
},
Spec = new ServiceSpecV1()
{
ClusterIP = "localhost"
},
Status = new ServiceStatusV1()
{
LoadBalancer = new LoadBalancerStatusV1()
}
};

serviceEntryOne.Spec.Ports.Add(
new ServicePortV1()
{
Port = 80
}
);
var endpointSubsetV1 = new EndpointSubsetV1();
endpointSubsetV1.Addresses.Add(new EndpointAddressV1()
{
Ip = "127.0.0.1",
Hostname = "localhost",
});
endpointSubsetV1.Ports.Add(new EndpointPortV1()
{
Port = 80,
});
endPointEntryOne.Subsets.Add(endpointSubsetV1);

this.Given(x => GivenThereIsAFakeKubeServiceDiscoveryProvider(_fakekubeServiceDiscoveryUrl, _serviceName, _namespaces))
.And(x => GivenTheServicesAreRegisteredWithKube(serviceEntryOne))
.And(x => GivenTheServicesAreRegisteredWithKube(endPointEntryOne))
.When(x => WhenIGetTheServices())
.Then(x => ThenTheCountIs(1))
.And(_ => _receivedToken.ShouldBe(token))
Expand All @@ -110,9 +106,9 @@ private void WhenIGetTheServices()
_services = _provider.Get().GetAwaiter().GetResult();
}

private void GivenTheServicesAreRegisteredWithKube(ServiceV1 serviceEntries)
private void GivenTheServicesAreRegisteredWithKube(EndpointsV1 endpointEntries)
{
_serviceEntries = serviceEntries;
_endpointEntries = endpointEntries;
}

private void GivenThereIsAFakeKubeServiceDiscoveryProvider(string url, string serviceName, string namespaces)
Expand All @@ -127,14 +123,14 @@ private void GivenThereIsAFakeKubeServiceDiscoveryProvider(string url, string se
{
app.Run(async context =>
{
if (context.Request.Path.Value == $"/api/v1/namespaces/{namespaces}/services/{serviceName}")
if (context.Request.Path.Value == $"/api/v1/namespaces/{namespaces}/endpoints/{serviceName}")
{
if (context.Request.Headers.TryGetValue("Authorization", out var values))
{
_receivedToken = values.First();
}

var json = JsonConvert.SerializeObject(_serviceEntries);
var json = JsonConvert.SerializeObject(_endpointEntries);
context.Response.Headers.Add("Content-Type", "application/json");
await context.Response.WriteAsync(json);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace Ocelot.UnitTests.Kubernetes
public class PollingKubeServiceDiscoveryProviderTests
{
private readonly int _delay;
private PollKube _provider;
private PollKubernetes _provider;
private readonly List<Service> _services;
private readonly Mock<IOcelotLoggerFactory> _factory;
private readonly Mock<IOcelotLogger> _logger;
Expand All @@ -28,7 +28,7 @@ public PollingKubeServiceDiscoveryProviderTests()
_delay = 1;
_factory = new Mock<IOcelotLoggerFactory>();
_logger = new Mock<IOcelotLogger>();
_factory.Setup(x => x.CreateLogger<PollKube>()).Returns(_logger.Object);
_factory.Setup(x => x.CreateLogger<PollKubernetes>()).Returns(_logger.Object);
_kubeServiceDiscoveryProvider = new Mock<IServiceDiscoveryProvider>();
}

Expand Down Expand Up @@ -56,7 +56,7 @@ private void ThenTheCountIs(int count)

private void WhenIGetTheServices(int expected)
{
_provider = new PollKube(_delay, _factory.Object, _kubeServiceDiscoveryProvider.Object);
_provider = new PollKubernetes(_delay, _factory.Object, _kubeServiceDiscoveryProvider.Object);

var result = Wait.WaitFor(3000).Until(() =>
{
Expand Down

0 comments on commit 6e5471a

Please sign in to comment.