Skip to content

Commit

Permalink
Merge pull request #136 from OctopusDeploy/andrew-w/extract-serializer
Browse files Browse the repository at this point in the history
Extracted Serializer to better manage message contract types
  • Loading branch information
andrew-at-octopus authored Aug 30, 2021
2 parents f3a5165 + 1a2a0a7 commit d4ba489
Show file tree
Hide file tree
Showing 12 changed files with 263 additions and 114 deletions.
2 changes: 1 addition & 1 deletion source/Halibut.Tests/ConnectionManagerFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void DisconnectDisposesActiveConnections()

public MessageExchangeProtocol GetProtocol(Stream stream, ILog log)
{
return new MessageExchangeProtocol(new MessageExchangeStream(stream, new Type[] { }, log), log);
return new MessageExchangeProtocol(new MessageExchangeStream(stream, new MessageSerializer(), log), log);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,11 @@ public void SendNext() { }
public void SendProceed() { }
public Task SendProceedAsync() { }
}
public interface IMessageSerializer
{
public T ReadMessage<T>(Stream stream) { }
public void WriteMessage<T>(Stream stream, T message) { }
}
public class InMemoryDataStreamReceiver : Halibut.IDataStreamReceiver
{
public InMemoryDataStreamReceiver(Action<Stream> writer) { }
Expand All @@ -503,8 +508,7 @@ public void StopAcceptingClientRequests() { }
}
public class MessageExchangeStream : Halibut.Transport.Protocol.IMessageExchangeStream
{
public static Func<IEnumerable<Type>, Newtonsoft.Json.JsonSerializer> Serializer;
public MessageExchangeStream(Stream stream, IEnumerable<Type> registeredServiceTypes, Halibut.Diagnostics.ILog log) { }
public MessageExchangeStream(Stream stream, Halibut.Transport.Protocol.IMessageSerializer serializer, Halibut.Diagnostics.ILog log) { }
public bool ExpectNextOrEnd() { }
public Task<bool> ExpectNextOrEndAsync() { }
public void ExpectProceeed() { }
Expand All @@ -519,15 +523,23 @@ public void SendNext() { }
public void SendProceed() { }
public Task SendProceedAsync() { }
}
public class MessageSerializer : Halibut.Transport.Protocol.IMessageSerializer
{
public MessageSerializer() { }
public void AddToMessageContract(Type[] types) { }
public T ReadMessage<T>(Stream stream) { }
public void WriteMessage<T>(Stream stream, T message) { }
}
public class ProtocolException : Exception, ISerializable
{
public ProtocolException(string message) { }
}
public class RegisteredSerializationBinder : Newtonsoft.Json.Serialization.ISerializationBinder
{
public RegisteredSerializationBinder(IEnumerable<Type> registeredServiceTypes) { }
public RegisteredSerializationBinder() { }
public void BindToName(Type serializedType, String& assemblyName, String& typeName) { }
public Type BindToType(string assemblyName, string typeName) { }
public void Register(Type[] registeredServiceTypes) { }
}
public class RemoteIdentity
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,11 @@ public void SendNext() { }
public void SendProceed() { }
public Task SendProceedAsync() { }
}
public interface IMessageSerializer
{
public T ReadMessage<T>(Stream stream) { }
public void WriteMessage<T>(Stream stream, T message) { }
}
public class InMemoryDataStreamReceiver : Halibut.IDataStreamReceiver
{
public InMemoryDataStreamReceiver(Action<Stream> writer) { }
Expand All @@ -510,8 +515,7 @@ public void StopAcceptingClientRequests() { }
}
public class MessageExchangeStream : Halibut.Transport.Protocol.IMessageExchangeStream
{
public static Func<IEnumerable<Type>, Newtonsoft.Json.JsonSerializer> Serializer;
public MessageExchangeStream(Stream stream, IEnumerable<Type> registeredServiceTypes, Halibut.Diagnostics.ILog log) { }
public MessageExchangeStream(Stream stream, Halibut.Transport.Protocol.IMessageSerializer serializer, Halibut.Diagnostics.ILog log) { }
public bool ExpectNextOrEnd() { }
public Task<bool> ExpectNextOrEndAsync() { }
public void ExpectProceeed() { }
Expand All @@ -526,15 +530,23 @@ public void SendNext() { }
public void SendProceed() { }
public Task SendProceedAsync() { }
}
public class MessageSerializer : Halibut.Transport.Protocol.IMessageSerializer
{
public MessageSerializer() { }
public void AddToMessageContract(Type[] types) { }
public T ReadMessage<T>(Stream stream) { }
public void WriteMessage<T>(Stream stream, T message) { }
}
public class ProtocolException : Exception, ISerializable, _Exception
{
public ProtocolException(string message) { }
}
public class RegisteredSerializationBinder : Newtonsoft.Json.Serialization.ISerializationBinder
{
public RegisteredSerializationBinder(IEnumerable<Type> registeredServiceTypes) { }
public RegisteredSerializationBinder() { }
public void BindToName(Type serializedType, String& assemblyName, String& typeName) { }
public Type BindToType(string assemblyName, string typeName) { }
public void Register(Type[] registeredServiceTypes) { }
}
public class RemoteIdentity
{
Expand Down
11 changes: 7 additions & 4 deletions source/Halibut.Tests/RegisteredSerializationBinderFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ public class RegisteredSerializationBinderFixture
[Test]
public void BindMethods_WithValidClass_FindsAllMethodTypes()
{
var binder = new RegisteredSerializationBinder(new[] { typeof(IExampleService) });
var binder = new RegisteredSerializationBinder();
binder.Register(typeof(IExampleService));
binder.BindToName(typeof(ExampleProperties), out var assemblyName, out var typeName);
var t = binder.BindToType(assemblyName, typeName);
t.Should().Be(typeof(ExampleProperties));
Expand Down Expand Up @@ -67,9 +68,10 @@ public interface IExampleService : IHaveBase
[TestCase(typeof(IObjectResultService))]
[TestCase(typeof(IObjectPropertyService))]
[TestCase(typeof(IObjectExampleService))]
public void Constructor_WithInvalidTypes_WillThrow(params Type[] types)
public void Register_WithInvalidTypes_WillThrow(params Type[] types)
{
Assert.Throws<TypeNotAllowedException>(() => { _ = new RegisteredSerializationBinder(types); });
var binder = new RegisteredSerializationBinder();
Assert.Throws<TypeNotAllowedException>(() => { binder.Register(types); });
}

public interface IAsyncService
Expand All @@ -95,7 +97,8 @@ public interface IObjectExampleService
[Test]
public void Circular_Types_CanBeResolved()
{
var binder = new RegisteredSerializationBinder(new[] { typeof(IMCircular) });
var binder = new RegisteredSerializationBinder();
binder.Register(typeof(IMCircular));
binder.BindToName(typeof(CircularPart1), out var assemblyName1, out var typeName1);
var t1 = binder.BindToType(assemblyName1, typeName1);
t1.Should().Be(typeof(CircularPart1));
Expand Down
4 changes: 2 additions & 2 deletions source/Halibut.Tests/SecureClientFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void SecureClientClearsPoolWhenAllConnectionsCorrupt()
{
var connectionManager = new ConnectionManager();
var stream = Substitute.For<IMessageExchangeStream>();
stream.When(x => x.IdentifyAsClient()).Do(x => { throw new ConnectionInitializationFailedException(""); });
stream.When(x => x.IdentifyAsClient()).Do(x => throw new ConnectionInitializationFailedException(""));
for (int i = 0; i < HalibutLimits.RetryCountLimit; i++)
{
var connection = Substitute.For<IConnection>();
Expand Down Expand Up @@ -70,7 +70,7 @@ public void SecureClientClearsPoolWhenAllConnectionsCorrupt()

public MessageExchangeProtocol GetProtocol(Stream stream, ILog logger)
{
return new MessageExchangeProtocol(new MessageExchangeStream(stream, new Type[] { }, logger), logger);
return new MessageExchangeProtocol(new MessageExchangeStream(stream, new MessageSerializer(), logger), logger);
}
}
}
76 changes: 70 additions & 6 deletions source/Halibut.Tests/UsageFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
using System.Security.Cryptography.X509Certificates;
using System.Threading.Tasks;
using FluentAssertions;
using FluentAssertions.Common;
using Halibut.ServiceModel;
using Halibut.Tests.TestServices;
using NUnit.Framework;
Expand Down Expand Up @@ -65,6 +64,7 @@ public void OctopusCanSendMessagesToListeningTentacle()
public void OctopusCanSendMessagesToPollingTentacle()
{
var services = GetDelegateServiceFactory();
services.Register<ISupportedServices>(() => new SupportedServices());
using (var octopus = new HalibutRuntime(Certificates.Octopus))
using (var tentaclePolling = new HalibutRuntime(services, Certificates.TentaclePolling))
{
Expand All @@ -73,10 +73,11 @@ public void OctopusCanSendMessagesToPollingTentacle()

tentaclePolling.Poll(new Uri("poll://SQ-TENTAPOLL"), new ServiceEndPoint(new Uri("https://localhost:" + octopusPort), Certificates.OctopusPublicThumbprint));

var echo = octopus.CreateClient<IEchoService>("poll://SQ-TENTAPOLL", Certificates.TentaclePollingPublicThumbprint);
for (var i = 0; i < 2000; i++)
var svc = octopus.CreateClient<ISupportedServices>("poll://SQ-TENTAPOLL", Certificates.TentaclePollingPublicThumbprint);
for (var i = 1; i < 100; i++)
{
echo.SayHello("Deploy package A" + i).Should().Be("Deploy package A" + i + "...");
var i1 = i;
svc.GetLocation(new MapLocation { Latitude = -i, Longitude = i }).Should().Match<MapLocation>(x => x.Latitude == i1 && x.Longitude == -i1);
}
}
}
Expand All @@ -100,11 +101,11 @@ public void OctopusCanSendMessagesToWebSocketPollingTentacle()

tentaclePolling.Poll(new Uri("poll://SQ-TENTAPOLL"), new ServiceEndPoint(new Uri($"wss://localhost:{octopusPort}/Halibut"), Certificates.SslThumbprint));

var echo = octopus.CreateClient<ISupportedServices>("poll://SQ-TENTAPOLL", Certificates.TentaclePollingPublicThumbprint);
var svc = octopus.CreateClient<ISupportedServices>("poll://SQ-TENTAPOLL", Certificates.TentaclePollingPublicThumbprint);
for (var i = 1; i < 100; i++)
{
var i1 = i;
echo.GetLocation(new MapLocation { Latitude = -i, Longitude = i }).Should().Match<MapLocation>(x => x.Latitude == i1 && x.Longitude == -i1);
svc.GetLocation(new MapLocation { Latitude = -i, Longitude = i }).Should().Match<MapLocation>(x => x.Latitude == i1 && x.Longitude == -i1);
}
}
}
Expand All @@ -118,6 +119,69 @@ public void OctopusCanSendMessagesToWebSocketPollingTentacle()
}
}

[Test]
public void HalibutSerializerIsKeptUpToDateWithPollingTentacle()
{
var services = GetDelegateServiceFactory();
services.Register<ISupportedServices>(() => new SupportedServices());
using (var octopus = new HalibutRuntime(Certificates.Octopus))
using (var tentaclePolling = new HalibutRuntime(services, Certificates.TentaclePolling))
{
var octopusPort = octopus.Listen();
octopus.Trust(Certificates.TentaclePollingPublicThumbprint);

tentaclePolling.Poll(new Uri("poll://SQ-TENTAPOLL"), new ServiceEndPoint(new Uri("https://localhost:" + octopusPort), Certificates.OctopusPublicThumbprint));

// This is here to exercise the path where the Listener's (web socket) handle loop has the protocol (with type serializer) built before the type is registered
var echo = octopus.CreateClient<IEchoService>("poll://SQ-TENTAPOLL", Certificates.TentaclePollingPublicThumbprint);
// This must come before CreateClient<ISupportedServices> for the situation to occur
echo.SayHello("Deploy package A").Should().Be("Deploy package A" + "...");

var svc = octopus.CreateClient<ISupportedServices>("poll://SQ-TENTAPOLL", Certificates.TentaclePollingPublicThumbprint);
// This must happen before the message loop in MessageExchangeProtocol restarts (timeout, exception, or end) for the error to occur
svc.GetLocation(new MapLocation { Latitude = -27, Longitude = 153 }).Should().Match<MapLocation>(x => x.Latitude == 153 && x.Longitude == -27);
}
}

[Test]
[WindowsTestAttribute]
public void HalibutSerializerIsKeptUpToDateWithWebSocketPollingTentacle()
{
var services = GetDelegateServiceFactory();
services.Register<ISupportedServices>(() => new SupportedServices());
const int octopusPort = 8450;
AddSslCertToLocalStoreAndRegisterFor("0.0.0.0:" + octopusPort);

try
{
using (var octopus = new HalibutRuntime(Certificates.Octopus))
using (var tentaclePolling = new HalibutRuntime(services, Certificates.TentaclePolling))
{
octopus.ListenWebSocket($"https://+:{octopusPort}/Halibut");
octopus.Trust(Certificates.TentaclePollingPublicThumbprint);

tentaclePolling.Poll(new Uri("poll://SQ-TENTAPOLL"), new ServiceEndPoint(new Uri($"wss://localhost:{octopusPort}/Halibut"), Certificates.SslThumbprint));

// This is here to exercise the path where the Listener's (web socket) handle loop has the protocol (with type serializer) built before the type is registered
var echo = octopus.CreateClient<IEchoService>("poll://SQ-TENTAPOLL", Certificates.TentaclePollingPublicThumbprint);
// This must come before CreateClient<ISupportedServices> for the situation to occur
echo.SayHello("Deploy package A").Should().Be("Deploy package A" + "...");

var svc = octopus.CreateClient<ISupportedServices>("poll://SQ-TENTAPOLL", Certificates.TentaclePollingPublicThumbprint);
// This must happen before the message loop in MessageExchangeProtocol restarts (timeout, exception, or end) for the error to occur
svc.GetLocation(new MapLocation { Latitude = -27, Longitude = 153 }).Should().Match<MapLocation>(x => x.Latitude == 153 && x.Longitude == -27);
}
}
catch(NotSupportedException nse) when (nse.Message == "The netstandard build of this library cannot act as the client in a WebSocket polling setup")
{
Assert.Inconclusive("This test cannot run on the netstandard build");
}
finally
{
RemoveSslCertBindingFor("0.0.0.0:" + octopusPort);
}
}

[Test]
public void StreamsCanBeSentToListening()
{
Expand Down
49 changes: 25 additions & 24 deletions source/Halibut/HalibutRuntime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ public class HalibutRuntime : IHalibutRuntime
readonly PollingClientCollection pollingClients = new PollingClientCollection();
string friendlyHtmlPageContent = DefaultFriendlyHtmlPageContent;
Dictionary<string, string> friendlyHtmlPageHeaders = new Dictionary<string, string>();
readonly IServiceFactory serviceFactory;
readonly HashSet<Type> clientTypes = new HashSet<Type>();
readonly MessageSerializer messageSerializer = new MessageSerializer();

public HalibutRuntime(X509Certificate2 serverCertificate) : this(new NullServiceFactory(), serverCertificate, new DefaultTrustProvider())
{
Expand All @@ -49,7 +48,7 @@ public HalibutRuntime(IServiceFactory serviceFactory, X509Certificate2 serverCer
{
this.serverCertificate = serverCertificate;
this.trustProvider = trustProvider;
this.serviceFactory = serviceFactory;
messageSerializer.AddToMessageContract(serviceFactory.RegisteredServiceTypes.ToArray());
invoker = new ServiceInvoker(serviceFactory);
}

Expand All @@ -75,36 +74,31 @@ public int Listen(int port)

return Listen(new IPEndPoint(ipAddress, port));
}

IEnumerable<Type> AllProtocolTypes()
{
foreach (var clientType in clientTypes)
{
yield return clientType;
}

foreach (var serviceType in serviceFactory.RegisteredServiceTypes)
{
yield return serviceType;
}
}

ExchangeProtocolBuilder ExchangeProtocolBuilder()
{
return (stream, log) => new MessageExchangeProtocol(new MessageExchangeStream(stream, AllProtocolTypes(), log), log);
return (stream, log) => new MessageExchangeProtocol(new MessageExchangeStream(stream, messageSerializer, log), log);
}

public int Listen(IPEndPoint endpoint)
{
var listener = new SecureListener(endpoint, serverCertificate, ExchangeProtocolBuilder(), HandleMessage, IsTrusted, logs, () => friendlyHtmlPageContent, () => friendlyHtmlPageHeaders, HandleUnauthorizedClientConnect);
listeners.Add(listener);
lock (listeners)
{
listeners.Add(listener);
}

return listener.Start();
}

public void ListenWebSocket(string endpoint)
{
var listener = new SecureWebSocketListener(endpoint, serverCertificate, ExchangeProtocolBuilder(), HandleMessage, IsTrusted, logs, () => friendlyHtmlPageContent, () => friendlyHtmlPageHeaders, HandleUnauthorizedClientConnect);
listeners.Add(listener);
lock (listeners)
{
listeners.Add(listener);
}

listener.Start();
}

Expand Down Expand Up @@ -177,7 +171,8 @@ public TService CreateClient<TService>(ServiceEndPoint endpoint)

public TService CreateClient<TService>(ServiceEndPoint endpoint, CancellationToken cancellationToken)
{
clientTypes.Add(typeof(TService));
messageSerializer.AddToMessageContract(typeof(TService));

#if HAS_REAL_PROXY
return (TService)new HalibutProxy(SendOutgoingRequest, typeof(TService), endpoint, cancellationToken).GetTransparentProxy();
#else
Expand Down Expand Up @@ -254,9 +249,12 @@ void DisconnectFromAllListeners(IReadOnlyCollection<string> thumbprints)

void DisconnectFromAllListeners(string thumbprint)
{
foreach (var secureListener in listeners.OfType<SecureListener>())
lock (listeners)
{
secureListener.Disconnect(thumbprint);
foreach (var secureListener in listeners.OfType<SecureListener>())
{
secureListener.Disconnect(thumbprint);
}
}
}

Expand Down Expand Up @@ -290,9 +288,12 @@ public void Dispose()
{
pollingClients.Dispose();
connectionManager.Dispose();
foreach (var listener in listeners)
lock (listeners)
{
listener.Dispose();
foreach (var listener in listeners)
{
listener?.Dispose();
}
}
}

Expand Down
Loading

0 comments on commit d4ba489

Please sign in to comment.