Skip to content

Commit

Permalink
Merge pull request #793 from Cysharp/feature/ReduceAllocationStreamin…
Browse files Browse the repository at this point in the history
…gHubClient2

Reduce allocation on StreamingHubClient method calls
  • Loading branch information
mayuki authored Jun 21, 2024
2 parents 1192202 + f732e56 commit 1a04afd
Show file tree
Hide file tree
Showing 61 changed files with 768 additions and 569 deletions.
2 changes: 2 additions & 0 deletions perf/BenchmarkApp/PerformanceTest.Client/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,9 @@ async Task<PerformanceResult> RunScenarioAsync(ScenarioType scenario, ScenarioCo
ScenarioType.UnaryLargePayload32K => () => new UnaryLargePayload32KScenario(),
ScenarioType.UnaryLargePayload64K => () => new UnaryLargePayload64KScenario(),
ScenarioType.StreamingHub => () => new StreamingHubScenario(),
ScenarioType.StreamingHubValueTask => () => new StreamingHubValueTaskScenario(),
ScenarioType.StreamingHubComplex => () => new StreamingHubComplexScenario(),
ScenarioType.StreamingHubComplexValueTask => () => new StreamingHubComplexValueTaskScenario(),
ScenarioType.StreamingHubLargePayload1K => () => new StreamingHubLargePayload1KScenario(),
ScenarioType.StreamingHubLargePayload2K => () => new StreamingHubLargePayload2KScenario(),
ScenarioType.StreamingHubLargePayload4K => () => new StreamingHubLargePayload4KScenario(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"profiles": {
"PerformanceTest.Client": {
"commandName": "Project",
"commandLineArgs": "-u http://localhost:5000 -s streaminghub --channels 1 --streams 1"
"commandLineArgs": "-u http://localhost:5000 -s StreamingHubValueTask --channels 1 --streams 1"
}
}
}
}
2 changes: 2 additions & 0 deletions perf/BenchmarkApp/PerformanceTest.Client/ScenarioType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ public enum ScenarioType
UnaryLargePayload64K,

StreamingHub,
StreamingHubValueTask,
StreamingHubComplex,
StreamingHubComplexValueTask,
StreamingHubLargePayload1K,
StreamingHubLargePayload2K,
StreamingHubLargePayload4K,
Expand Down
38 changes: 38 additions & 0 deletions perf/BenchmarkApp/PerformanceTest.Client/StreamingHubScenario.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,25 @@ public async ValueTask RunAsync(PerformanceTestRunningContext ctx, CancellationT
}
}

public class StreamingHubValueTaskScenario : IScenario, IPerfTestHubReceiver
{
IPerfTestHub client = default!;

public async ValueTask PrepareAsync(GrpcChannel channel)
{
this.client = await StreamingHubClient.ConnectAsync<IPerfTestHub, IPerfTestHubReceiver>(channel, this);
}

public async ValueTask RunAsync(PerformanceTestRunningContext ctx, CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
await client.CallMethodValueTaskAsync("FooBarBaz🚀こんにちは世界", 123, 4567, 891011);
ctx.Increment();
}
}
}

public class StreamingHubComplexScenario : IScenario, IPerfTestHubReceiver
{
IPerfTestHub client = default!;
Expand All @@ -39,3 +58,22 @@ public async ValueTask RunAsync(PerformanceTestRunningContext ctx, CancellationT
}
}
}

public class StreamingHubComplexValueTaskScenario : IScenario, IPerfTestHubReceiver
{
IPerfTestHub client = default!;

public async ValueTask PrepareAsync(GrpcChannel channel)
{
this.client = await StreamingHubClient.ConnectAsync<IPerfTestHub, IPerfTestHubReceiver>(channel, this);
}

public async ValueTask RunAsync(PerformanceTestRunningContext ctx, CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
await client.CallMethodComplexValueTaskAsync("FooBarBaz🚀こんにちは世界", 123, 4567, 891011);
ctx.Increment();
}
}
}
10 changes: 10 additions & 0 deletions perf/BenchmarkApp/PerformanceTest.Server/PerfTestHub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,21 @@ public Task<int> CallMethodAsync(string arg1, int arg2, int arg3, int arg4)
return Task.FromResult(0);
}

public ValueTask<int> CallMethodValueTaskAsync(string arg1, int arg2, int arg3, int arg4)
{
return ValueTask.FromResult(0);
}

public Task<ComplexResponse> CallMethodComplexAsync(string arg1, int arg2, int arg3, int arg4)
{
return Task.FromResult(ComplexResponse.Cached);
}

public ValueTask<ComplexResponse> CallMethodComplexValueTaskAsync(string arg1, int arg2, int arg3, int arg4)
{
return ValueTask.FromResult(ComplexResponse.Cached);
}

public Task<(int StatusCode, byte[] Data)> CallMethodLargePayloadAsync(string arg1, int arg2, int arg3, int arg4, byte[] arg5)
{
return Task.FromResult((123, arg5));
Expand Down
2 changes: 2 additions & 0 deletions perf/BenchmarkApp/PerformanceTest.Shared/IPerfTestHub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ namespace PerformanceTest.Shared
public interface IPerfTestHub : IStreamingHub<IPerfTestHub, IPerfTestHubReceiver>
{
Task<int> CallMethodAsync(string arg1, int arg2, int arg3, int arg4);
ValueTask<int> CallMethodValueTaskAsync(string arg1, int arg2, int arg3, int arg4);
Task<ComplexResponse> CallMethodComplexAsync(string arg1, int arg2, int arg3, int arg4);
ValueTask<ComplexResponse> CallMethodComplexValueTaskAsync(string arg1, int arg2, int arg3, int arg4);
Task<(int StatusCode, byte[] Data)> CallMethodLargePayloadAsync(string arg1, int arg2, int arg3, int arg4, byte[] arg5);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,24 +120,66 @@ static void EmitHelperMethods(StreamingHubClientBuildContext ctx)
if (ctx.EnableStreamingHubDiagnosticHandler)
{
ctx.Writer.AppendLineWithFormat($$"""
global::System.Threading.Tasks.Task<TResponse> WriteMessageWithResponseDiagnosticAsync<TRequest, TResponse>(int methodId, TRequest message, [global::System.Runtime.CompilerServices.CallerMemberName] string callerMemberName = default!)
global::System.Threading.Tasks.Task<TResponse> WriteMessageWithResponseDiagnosticTaskAsync<TRequest, TResponse>(int methodId, TRequest message, [global::System.Runtime.CompilerServices.CallerMemberName] string callerMemberName = default!)
{
if (diagnosticHandler is null)
{
return base.WriteMessageWithResponseAsync<TRequest, TResponse>(methodId, message);
return base.WriteMessageWithResponseTaskAsync<TRequest, TResponse>(methodId, message);
}
return diagnosticHandler.OnMethodInvoke(this, methodId, callerMemberName, message, isFireAndForget: true, base.WriteMessageWithResponseAsync<TRequest, TResponse>);
return diagnosticHandler.OnMethodInvoke(this, methodId, callerMemberName, message, isFireAndForget: true, base.WriteMessageWithResponseValueTaskOfTAsync<TRequest, TResponse>).AsTask();
}
global::System.Threading.Tasks.Task<TResponse> WriteMessageFireAndForgetDiagnosticAsync<TRequest, TResponse>(int methodId, TRequest message, [global::System.Runtime.CompilerServices.CallerMemberName] string callerMemberName = default!)
async global::System.Threading.Tasks.ValueTask WriteMessageWithResponseDiagnosticValueTaskAsync<TRequest, TResponse>(int methodId, TRequest message, [global::System.Runtime.CompilerServices.CallerMemberName] string callerMemberName = default!)
{
if (diagnosticHandler is null)
{
return base.WriteMessageFireAndForgetAsync<TRequest, TResponse>(methodId, message);
await base.WriteMessageWithResponseValueTaskAsync<TRequest, TResponse>(methodId, message);
return;
}
return diagnosticHandler.OnMethodInvoke(this, methodId, callerMemberName, message, isFireAndForget: true, base.WriteMessageFireAndForgetAsync<TRequest, TResponse>);
await diagnosticHandler.OnMethodInvoke(this, methodId, callerMemberName, message, isFireAndForget: true, base.WriteMessageWithResponseValueTaskOfTAsync<TRequest, TResponse>);
}
global::System.Threading.Tasks.ValueTask<TResponse> WriteMessageWithResponseDiagnosticValueTaskOfTAsync<TRequest, TResponse>(int methodId, TRequest message, [global::System.Runtime.CompilerServices.CallerMemberName] string callerMemberName = default!)
{
if (diagnosticHandler is null)
{
return base.WriteMessageWithResponseValueTaskOfTAsync<TRequest, TResponse>(methodId, message);
}
return diagnosticHandler.OnMethodInvoke(this, methodId, callerMemberName, message, isFireAndForget: true, base.WriteMessageWithResponseValueTaskOfTAsync<TRequest, TResponse>);
}
global::System.Threading.Tasks.Task<TResponse> WriteMessageFireAndForgetDiagnosticTaskAsync<TRequest, TResponse>(int methodId, TRequest message, [global::System.Runtime.CompilerServices.CallerMemberName] string callerMemberName = default!)
{
if (diagnosticHandler is null)
{
return base.WriteMessageFireAndForgetTaskAsync<TRequest, TResponse>(methodId, message);
}
return diagnosticHandler.OnMethodInvoke(this, methodId, callerMemberName, message, isFireAndForget: true, base.WriteMessageFireAndForgetValueTaskOfTAsync<TRequest, TResponse>).AsTask();
}
async global::System.Threading.Tasks.ValueTask WriteMessageFireAndForgetDiagnosticValueTaskAsync<TRequest, TResponse>(int methodId, TRequest message, [global::System.Runtime.CompilerServices.CallerMemberName] string callerMemberName = default!)
{
if (diagnosticHandler is null)
{
await base.WriteMessageFireAndForgetTaskAsync<TRequest, TResponse>(methodId, message);
return;
}
await diagnosticHandler.OnMethodInvoke(this, methodId, callerMemberName, message, isFireAndForget: true, base.WriteMessageFireAndForgetValueTaskOfTAsync<TRequest, TResponse>);
}
global::System.Threading.Tasks.ValueTask<TResponse> WriteMessageFireAndForgetDiagnosticValueTaskOfTAsync<TRequest, TResponse>(int methodId, TRequest message, [global::System.Runtime.CompilerServices.CallerMemberName] string callerMemberName = default!)
{
if (diagnosticHandler is null)
{
return base.WriteMessageFireAndForgetValueTaskOfTAsync<TRequest, TResponse>(methodId, message);
}
return diagnosticHandler.OnMethodInvoke(this, methodId, callerMemberName, message, isFireAndForget: true, base.WriteMessageFireAndForgetValueTaskOfTAsync<TRequest, TResponse>);
}
""");
ctx.Writer.AppendLine();
Expand Down Expand Up @@ -217,39 +259,39 @@ static void EmitHubMethods(StreamingHubClientBuildContext ctx, bool isFireAndFor
};
var isReturnTypeVoid = method.MethodReturnType == MagicOnionTypeInfo.KnownTypes.System_Void;
var writeMessageTarget = isFireAndForget ? "parent" : "this";
var writeMessageAsync = ctx.EnableStreamingHubDiagnosticHandler
var writeMessageAsyncPrefix = ctx.EnableStreamingHubDiagnosticHandler
? isFireAndForget || isReturnTypeVoid
? $"{writeMessageTarget}.WriteMessageFireAndForgetDiagnosticAsync"
: $"{writeMessageTarget}.WriteMessageWithResponseDiagnosticAsync"
? $"{writeMessageTarget}.WriteMessageFireAndForgetDiagnostic"
: $"{writeMessageTarget}.WriteMessageWithResponseDiagnostic"
: isFireAndForget || isReturnTypeVoid
? $"{writeMessageTarget}.WriteMessageFireAndForgetAsync"
: $"{writeMessageTarget}.WriteMessageWithResponseAsync";
? $"{writeMessageTarget}.WriteMessageFireAndForget"
: $"{writeMessageTarget}.WriteMessageWithResponse";

if (isFireAndForget) ctx.Writer.Append(" ");
ctx.Writer.AppendLineWithFormat($"""
public {(isReturnTypeVoid ? "void" : method.MethodReturnType.FullName)} {method.MethodName}({method.Parameters.ToMethodSignaturize()})
""");

if (isFireAndForget) ctx.Writer.Append(" ");
if (method.MethodReturnType == MagicOnionTypeInfo.KnownTypes.System_Threading_Tasks_ValueTask)
if (method.MethodReturnType == MagicOnionTypeInfo.KnownTypes.System_Threading_Tasks_ValueTask || isReturnTypeVoid)
{
// ValueTask
// ValueTask, void
ctx.Writer.AppendLineWithFormat($"""
=> new global::System.Threading.Tasks.ValueTask({writeMessageAsync}<{method.RequestType.FullName}, {method.ResponseType.FullName}>({method.HubId}{writeMessageParameters}));
=> {writeMessageAsyncPrefix}ValueTaskAsync<{method.RequestType.FullName}, {method.ResponseType.FullName}>({method.HubId}{writeMessageParameters});
""");
}
else if (method.MethodReturnType.HasGenericArguments && method.MethodReturnType.GetGenericTypeDefinition() == MagicOnionTypeInfo.KnownTypes.System_Threading_Tasks_ValueTask)
{
// ValueTask<T>
ctx.Writer.AppendLineWithFormat($"""
=> new global::System.Threading.Tasks.ValueTask<{method.ResponseType.FullName}>({writeMessageAsync}<{method.RequestType.FullName}, {method.ResponseType.FullName}>({method.HubId}{writeMessageParameters}));
=> {writeMessageAsyncPrefix}ValueTaskOfTAsync<{method.RequestType.FullName}, {method.ResponseType.FullName}>({method.HubId}{writeMessageParameters});
""");
}
else
{
// Task, Task<T>
ctx.Writer.AppendLineWithFormat($"""
=> {writeMessageAsync}<{method.RequestType.FullName}, {method.ResponseType.FullName}>({method.HubId}{writeMessageParameters});
=> {writeMessageAsyncPrefix}TaskAsync<{method.RequestType.FullName}, {method.ResponseType.FullName}>({method.HubId}{writeMessageParameters});
""");
}
}
Expand Down
Loading

0 comments on commit 1a04afd

Please sign in to comment.