Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extracted Serializer to better manage message contract types #136

Merged
merged 2 commits into from
Aug 30, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion source/Halibut.Tests/ConnectionManagerFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void DisconnectDisposesActiveConnections()

public MessageExchangeProtocol GetProtocol(Stream stream, ILog log)
{
return new MessageExchangeProtocol(new MessageExchangeStream(stream, new Type[] { }, log), log);
return new MessageExchangeProtocol(new MessageExchangeStream(stream, new MessageSerializer(), log), log);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,11 @@ public void SendNext() { }
public void SendProceed() { }
public Task SendProceedAsync() { }
}
public interface IMessageSerializer
{
public T ReadMessage<T>(Stream stream) { }
public void WriteMessage<T>(Stream stream, T message) { }
}
public class InMemoryDataStreamReceiver : Halibut.IDataStreamReceiver
{
public InMemoryDataStreamReceiver(Action<Stream> writer) { }
Expand All @@ -503,8 +508,7 @@ public void StopAcceptingClientRequests() { }
}
public class MessageExchangeStream : Halibut.Transport.Protocol.IMessageExchangeStream
{
public static Func<IEnumerable<Type>, Newtonsoft.Json.JsonSerializer> Serializer;
public MessageExchangeStream(Stream stream, IEnumerable<Type> registeredServiceTypes, Halibut.Diagnostics.ILog log) { }
public MessageExchangeStream(Stream stream, Halibut.Transport.Protocol.IMessageSerializer serializer, Halibut.Diagnostics.ILog log) { }
public bool ExpectNextOrEnd() { }
public Task<bool> ExpectNextOrEndAsync() { }
public void ExpectProceeed() { }
Expand All @@ -519,15 +523,23 @@ public void SendNext() { }
public void SendProceed() { }
public Task SendProceedAsync() { }
}
public class MessageSerializer : Halibut.Transport.Protocol.IMessageSerializer
{
public MessageSerializer() { }
public void AddToMessageContract(Type[] types) { }
public T ReadMessage<T>(Stream stream) { }
public void WriteMessage<T>(Stream stream, T message) { }
}
public class ProtocolException : Exception, ISerializable
{
public ProtocolException(string message) { }
}
public class RegisteredSerializationBinder : Newtonsoft.Json.Serialization.ISerializationBinder
{
public RegisteredSerializationBinder(IEnumerable<Type> registeredServiceTypes) { }
public RegisteredSerializationBinder() { }
public void BindToName(Type serializedType, String& assemblyName, String& typeName) { }
public Type BindToType(string assemblyName, string typeName) { }
public void Register(Type[] registeredServiceTypes) { }
}
public class RemoteIdentity
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,11 @@ public void SendNext() { }
public void SendProceed() { }
public Task SendProceedAsync() { }
}
public interface IMessageSerializer
{
public T ReadMessage<T>(Stream stream) { }
public void WriteMessage<T>(Stream stream, T message) { }
}
public class InMemoryDataStreamReceiver : Halibut.IDataStreamReceiver
{
public InMemoryDataStreamReceiver(Action<Stream> writer) { }
Expand All @@ -510,8 +515,7 @@ public void StopAcceptingClientRequests() { }
}
public class MessageExchangeStream : Halibut.Transport.Protocol.IMessageExchangeStream
{
public static Func<IEnumerable<Type>, Newtonsoft.Json.JsonSerializer> Serializer;
public MessageExchangeStream(Stream stream, IEnumerable<Type> registeredServiceTypes, Halibut.Diagnostics.ILog log) { }
public MessageExchangeStream(Stream stream, Halibut.Transport.Protocol.IMessageSerializer serializer, Halibut.Diagnostics.ILog log) { }
public bool ExpectNextOrEnd() { }
public Task<bool> ExpectNextOrEndAsync() { }
public void ExpectProceeed() { }
Expand All @@ -526,15 +530,23 @@ public void SendNext() { }
public void SendProceed() { }
public Task SendProceedAsync() { }
}
public class MessageSerializer : Halibut.Transport.Protocol.IMessageSerializer
{
public MessageSerializer() { }
public void AddToMessageContract(Type[] types) { }
public T ReadMessage<T>(Stream stream) { }
public void WriteMessage<T>(Stream stream, T message) { }
}
public class ProtocolException : Exception, ISerializable, _Exception
{
public ProtocolException(string message) { }
}
public class RegisteredSerializationBinder : Newtonsoft.Json.Serialization.ISerializationBinder
{
public RegisteredSerializationBinder(IEnumerable<Type> registeredServiceTypes) { }
public RegisteredSerializationBinder() { }
public void BindToName(Type serializedType, String& assemblyName, String& typeName) { }
public Type BindToType(string assemblyName, string typeName) { }
public void Register(Type[] registeredServiceTypes) { }
}
public class RemoteIdentity
{
Expand Down
11 changes: 7 additions & 4 deletions source/Halibut.Tests/RegisteredSerializationBinderFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ public class RegisteredSerializationBinderFixture
[Test]
public void BindMethods_WithValidClass_FindsAllMethodTypes()
{
var binder = new RegisteredSerializationBinder(new[] { typeof(IExampleService) });
var binder = new RegisteredSerializationBinder();
binder.Register(typeof(IExampleService));
binder.BindToName(typeof(ExampleProperties), out var assemblyName, out var typeName);
var t = binder.BindToType(assemblyName, typeName);
t.Should().Be(typeof(ExampleProperties));
Expand Down Expand Up @@ -67,9 +68,10 @@ public interface IExampleService : IHaveBase
[TestCase(typeof(IObjectResultService))]
[TestCase(typeof(IObjectPropertyService))]
[TestCase(typeof(IObjectExampleService))]
public void Constructor_WithInvalidTypes_WillThrow(params Type[] types)
public void Register_WithInvalidTypes_WillThrow(params Type[] types)
{
Assert.Throws<TypeNotAllowedException>(() => { _ = new RegisteredSerializationBinder(types); });
var binder = new RegisteredSerializationBinder();
Assert.Throws<TypeNotAllowedException>(() => { binder.Register(types); });
}

public interface IAsyncService
Expand All @@ -95,7 +97,8 @@ public interface IObjectExampleService
[Test]
public void Circular_Types_CanBeResolved()
{
var binder = new RegisteredSerializationBinder(new[] { typeof(IMCircular) });
var binder = new RegisteredSerializationBinder();
binder.Register(typeof(IMCircular));
binder.BindToName(typeof(CircularPart1), out var assemblyName1, out var typeName1);
var t1 = binder.BindToType(assemblyName1, typeName1);
t1.Should().Be(typeof(CircularPart1));
Expand Down
4 changes: 2 additions & 2 deletions source/Halibut.Tests/SecureClientFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void SecureClientClearsPoolWhenAllConnectionsCorrupt()
{
var connectionManager = new ConnectionManager();
var stream = Substitute.For<IMessageExchangeStream>();
stream.When(x => x.IdentifyAsClient()).Do(x => { throw new ConnectionInitializationFailedException(""); });
stream.When(x => x.IdentifyAsClient()).Do(x => throw new ConnectionInitializationFailedException(""));
for (int i = 0; i < HalibutLimits.RetryCountLimit; i++)
{
var connection = Substitute.For<IConnection>();
Expand Down Expand Up @@ -70,7 +70,7 @@ public void SecureClientClearsPoolWhenAllConnectionsCorrupt()

public MessageExchangeProtocol GetProtocol(Stream stream, ILog logger)
{
return new MessageExchangeProtocol(new MessageExchangeStream(stream, new Type[] { }, logger), logger);
return new MessageExchangeProtocol(new MessageExchangeStream(stream, new MessageSerializer(), logger), logger);
}
}
}
20 changes: 15 additions & 5 deletions source/Halibut.Tests/UsageFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
using System.Security.Cryptography.X509Certificates;
using System.Threading.Tasks;
using FluentAssertions;
using FluentAssertions.Common;
using Halibut.ServiceModel;
using Halibut.Tests.TestServices;
using NUnit.Framework;
Expand Down Expand Up @@ -65,6 +64,7 @@ public void OctopusCanSendMessagesToListeningTentacle()
public void OctopusCanSendMessagesToPollingTentacle()
{
var services = GetDelegateServiceFactory();
services.Register<ISupportedServices>(() => new SupportedServices());
using (var octopus = new HalibutRuntime(Certificates.Octopus))
using (var tentaclePolling = new HalibutRuntime(services, Certificates.TentaclePolling))
{
Expand All @@ -73,10 +73,16 @@ public void OctopusCanSendMessagesToPollingTentacle()

tentaclePolling.Poll(new Uri("poll://SQ-TENTAPOLL"), new ServiceEndPoint(new Uri("https://localhost:" + octopusPort), Certificates.OctopusPublicThumbprint));

// This is here to exercise the path there the Listener's (web socket) handle loop has the protocol (with type serializer) built before the type is registered

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there ➡ where

var echo = octopus.CreateClient<IEchoService>("poll://SQ-TENTAPOLL", Certificates.TentaclePollingPublicThumbprint);
for (var i = 0; i < 2000; i++)
echo.SayHello("Deploy package A").Should().Be("Deploy package A" + "..."); // This must come before CreateClient<ISupportedServices> for the situation to occur

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a easy call either way, but I wonder if it's worth moving this into its own test?


//Thread.Sleep(TimeSpan.FromSeconds(12));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line can be removed

var svc = octopus.CreateClient<ISupportedServices>("poll://SQ-TENTAPOLL", Certificates.TentaclePollingPublicThumbprint);
for (var i = 1; i < 100; i++)
{
echo.SayHello("Deploy package A" + i).Should().Be("Deploy package A" + i + "...");
var i1 = i;
svc.GetLocation(new MapLocation { Latitude = -i, Longitude = i }).Should().Match<MapLocation>(x => x.Latitude == i1 && x.Longitude == -i1);
}
}
}
Expand All @@ -100,11 +106,15 @@ public void OctopusCanSendMessagesToWebSocketPollingTentacle()

tentaclePolling.Poll(new Uri("poll://SQ-TENTAPOLL"), new ServiceEndPoint(new Uri($"wss://localhost:{octopusPort}/Halibut"), Certificates.SslThumbprint));

var echo = octopus.CreateClient<ISupportedServices>("poll://SQ-TENTAPOLL", Certificates.TentaclePollingPublicThumbprint);
// This is here to exercise the path there the Listener's (web socket) handle loop has the protocol (with type serializer) built before the type is registered

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there ➡ where

var echo = octopus.CreateClient<IEchoService>("poll://SQ-TENTAPOLL", Certificates.TentaclePollingPublicThumbprint);
echo.SayHello("Deploy package A").Should().Be("Deploy package A" + "...");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above regarding possibly making another test for this.


var svc = octopus.CreateClient<ISupportedServices>("poll://SQ-TENTAPOLL", Certificates.TentaclePollingPublicThumbprint);
for (var i = 1; i < 100; i++)
{
var i1 = i;
echo.GetLocation(new MapLocation { Latitude = -i, Longitude = i }).Should().Match<MapLocation>(x => x.Latitude == i1 && x.Longitude == -i1);
svc.GetLocation(new MapLocation { Latitude = -i, Longitude = i }).Should().Match<MapLocation>(x => x.Latitude == i1 && x.Longitude == -i1);
}
}
}
Expand Down
49 changes: 25 additions & 24 deletions source/Halibut/HalibutRuntime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ public class HalibutRuntime : IHalibutRuntime
readonly PollingClientCollection pollingClients = new PollingClientCollection();
string friendlyHtmlPageContent = DefaultFriendlyHtmlPageContent;
Dictionary<string, string> friendlyHtmlPageHeaders = new Dictionary<string, string>();
readonly IServiceFactory serviceFactory;
readonly HashSet<Type> clientTypes = new HashSet<Type>();
readonly MessageSerializer messageSerializer = new MessageSerializer();

public HalibutRuntime(X509Certificate2 serverCertificate) : this(new NullServiceFactory(), serverCertificate, new DefaultTrustProvider())
{
Expand All @@ -49,7 +48,7 @@ public HalibutRuntime(IServiceFactory serviceFactory, X509Certificate2 serverCer
{
this.serverCertificate = serverCertificate;
this.trustProvider = trustProvider;
this.serviceFactory = serviceFactory;
messageSerializer.AddToMessageContract(serviceFactory.RegisteredServiceTypes.ToArray());
invoker = new ServiceInvoker(serviceFactory);
}

Expand All @@ -75,36 +74,31 @@ public int Listen(int port)

return Listen(new IPEndPoint(ipAddress, port));
}

IEnumerable<Type> AllProtocolTypes()
{
foreach (var clientType in clientTypes)
{
yield return clientType;
}

foreach (var serviceType in serviceFactory.RegisteredServiceTypes)
{
yield return serviceType;
}
}

ExchangeProtocolBuilder ExchangeProtocolBuilder()
{
return (stream, log) => new MessageExchangeProtocol(new MessageExchangeStream(stream, AllProtocolTypes(), log), log);
return (stream, log) => new MessageExchangeProtocol(new MessageExchangeStream(stream, messageSerializer, log), log);
}

public int Listen(IPEndPoint endpoint)
{
var listener = new SecureListener(endpoint, serverCertificate, ExchangeProtocolBuilder(), HandleMessage, IsTrusted, logs, () => friendlyHtmlPageContent, () => friendlyHtmlPageHeaders, HandleUnauthorizedClientConnect);
listeners.Add(listener);
lock (listeners)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that any calls to listeners need to be locked (as well as serviceTypes, below), is it worth creating a ConcurrentHashSet class that wraps a HashSet and makes the calls thread-safe? You could just expose the methods you need here. Just spitballing here.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the time frame needed for this fix though, it might be worth just adding a comment saying why the locks are needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree in principle, and yes it shouldn't be too hard... but for now, will skip

{
listeners.Add(listener);
}

return listener.Start();
}

public void ListenWebSocket(string endpoint)
{
var listener = new SecureWebSocketListener(endpoint, serverCertificate, ExchangeProtocolBuilder(), HandleMessage, IsTrusted, logs, () => friendlyHtmlPageContent, () => friendlyHtmlPageHeaders, HandleUnauthorizedClientConnect);
listeners.Add(listener);
lock (listeners)
{
listeners.Add(listener);
}

listener.Start();
}

Expand Down Expand Up @@ -177,7 +171,8 @@ public TService CreateClient<TService>(ServiceEndPoint endpoint)

public TService CreateClient<TService>(ServiceEndPoint endpoint, CancellationToken cancellationToken)
{
clientTypes.Add(typeof(TService));
messageSerializer.AddToMessageContract(typeof(TService));

#if HAS_REAL_PROXY
return (TService)new HalibutProxy(SendOutgoingRequest, typeof(TService), endpoint, cancellationToken).GetTransparentProxy();
#else
Expand Down Expand Up @@ -254,9 +249,12 @@ void DisconnectFromAllListeners(IReadOnlyCollection<string> thumbprints)

void DisconnectFromAllListeners(string thumbprint)
{
foreach (var secureListener in listeners.OfType<SecureListener>())
lock (listeners)
{
secureListener.Disconnect(thumbprint);
foreach (var secureListener in listeners.OfType<SecureListener>())
{
secureListener.Disconnect(thumbprint);
}
}
}

Expand Down Expand Up @@ -290,9 +288,12 @@ public void Dispose()
{
pollingClients.Dispose();
connectionManager.Dispose();
foreach (var listener in listeners)
lock (listeners)
{
listener.Dispose();
foreach (var listener in listeners)
{
listener?.Dispose();
}
}
}

Expand Down
28 changes: 18 additions & 10 deletions source/Halibut/ServiceModel/DelegateServiceFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ public void Register<TContract>(Func<TContract> implementation)
{
var serviceType = typeof(TContract);
services.Add(serviceType.Name, () => implementation());
serviceTypes.Add(serviceType);
lock (serviceTypes)
{
serviceTypes.Add(serviceType);
}
}

public IServiceLease CreateService(string serviceName)
Expand All @@ -24,8 +27,7 @@ public IServiceLease CreateService(string serviceName)

Func<object> GetService(string name)
{
Func<object> result;
if (!services.TryGetValue(name, out result))
if (!services.TryGetValue(name, out var result))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So much cleaner... 👍

{
throw new Exception("Service not found: " + name);
}
Expand All @@ -39,7 +41,16 @@ static IServiceLease CreateService(Func<object> serviceBuilder)
return new Lease(service);
}

public IReadOnlyList<Type> RegisteredServiceTypes => serviceTypes.ToList();
public IReadOnlyList<Type> RegisteredServiceTypes
{
get
{
lock (serviceTypes)
{
return serviceTypes.ToList();
}
}
}

#region Nested type: Lease

Expand All @@ -52,16 +63,13 @@ public Lease(object service)
this.service = service;
}

public object Service
{
get { return service; }
}
public object Service => service;

public void Dispose()
{
if (service is IDisposable)
if (service is IDisposable disposable)
{
((IDisposable)service).Dispose();
disposable.Dispose();
}
}
}
Expand Down
11 changes: 11 additions & 0 deletions source/Halibut/Transport/Protocol/IMessageSerializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using System.IO;

namespace Halibut.Transport.Protocol
{
public interface IMessageSerializer
{
void WriteMessage<T>(Stream stream, T message);

T ReadMessage<T>(Stream stream);
}
}
Loading