Skip to content

Commit

Permalink
Add Channeler example (#1851)
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesNK authored Aug 25, 2022
1 parent 4f06b60 commit 1d7f988
Show file tree
Hide file tree
Showing 12 changed files with 403 additions and 1 deletion.
31 changes: 31 additions & 0 deletions examples/Channeler/Channeler.sln
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 16
VisualStudioVersion = 16.0.29230.61
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Server", "Server\Server.csproj", "{534AC5F8-2DF2-40BD-87A5-B3D8310118C4}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Client", "Client\Client.csproj", "{48A1D3BC-A14B-436A-8822-6DE2BEF8B747}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{534AC5F8-2DF2-40BD-87A5-B3D8310118C4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{534AC5F8-2DF2-40BD-87A5-B3D8310118C4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{534AC5F8-2DF2-40BD-87A5-B3D8310118C4}.Release|Any CPU.ActiveCfg = Release|Any CPU
{534AC5F8-2DF2-40BD-87A5-B3D8310118C4}.Release|Any CPU.Build.0 = Release|Any CPU
{48A1D3BC-A14B-436A-8822-6DE2BEF8B747}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{48A1D3BC-A14B-436A-8822-6DE2BEF8B747}.Debug|Any CPU.Build.0 = Debug|Any CPU
{48A1D3BC-A14B-436A-8822-6DE2BEF8B747}.Release|Any CPU.ActiveCfg = Release|Any CPU
{48A1D3BC-A14B-436A-8822-6DE2BEF8B747}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {D22B3129-3BFB-41FA-9FCE-E45EBEF8C2DD}
EndGlobalSection
EndGlobal
16 changes: 16 additions & 0 deletions examples/Channeler/Client/Client.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
<Protobuf Include="..\Proto\data_channel.proto" GrpcServices="Client" Link="Protos\data_channel.proto" />

<PackageReference Include="Google.Protobuf" Version="$(GoogleProtobufPackageVersion)" />
<PackageReference Include="Grpc.Net.Client" Version="$(GrpcDotNetPackageVersion)" />
<PackageReference Include="Grpc.Tools" Version="$(GrpcToolsPackageVersion)" PrivateAssets="All" />
</ItemGroup>

</Project>
72 changes: 72 additions & 0 deletions examples/Channeler/Client/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#region Copyright notice and license

// Copyright 2019 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#endregion

using System.Text;
using DataChannel;
using Google.Protobuf;
using Grpc.Core;
using Grpc.Net.Client;

namespace Client
{
public partial class Program
{
public static readonly byte[] TestData = Encoding.UTF8.GetBytes("The quick brown fox jumped over the lazy dog.");

static async Task Main(string[] args)
{
using var channel = GrpcChannel.ForAddress("https://localhost:5001");
var client = new DataChanneler.DataChannelerClient(channel);

await UploadDataAsync(client);

await DownloadResultsAsync(client);

Console.WriteLine("Shutting down");
Console.WriteLine("Press any key to exit...");
Console.ReadKey();
}

private static async Task UploadDataAsync(DataChanneler.DataChannelerClient client)
{
var call = client.UploadData();

var dataChunks = TestData.Chunk(5);
foreach (var chunk in dataChunks)
{
Console.WriteLine($"Uploading chunk: {chunk.Length} bytes");
await call.RequestStream.WriteAsync(new DataRequest { Value = ByteString.CopyFrom(chunk) });
}

await call.RequestStream.CompleteAsync();

var result = await call;
Console.WriteLine($"Total upload processed: {result.BytesProcessed} bytes");
}

private static async Task DownloadResultsAsync(DataChanneler.DataChannelerClient client)
{
var call = client.DownloadResults(new DataRequest { Value = ByteString.CopyFrom(TestData) });

await foreach (var result in call.ResponseStream.ReadAllAsync())
{
Console.WriteLine($"Downloaded bytes processed result: {result.BytesProcessed}");
}
}
}
}
30 changes: 30 additions & 0 deletions examples/Channeler/Proto/data_channel.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright 2019 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

syntax = "proto3";

package data_channel;

service DataChanneler {
rpc UploadData (stream DataRequest) returns (DataResult);
rpc DownloadResults (DataRequest) returns (stream DataResult);
}

message DataRequest {
bytes value = 1;
}

message DataResult {
int32 bytes_processed = 1;
}
39 changes: 39 additions & 0 deletions examples/Channeler/Server/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#region Copyright notice and license

// Copyright 2019 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#endregion

using System.IO;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Hosting;

namespace Server
{
public class Program
{
public static void Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
}

public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureWebHostDefaults(webBuilder =>
{
webBuilder.UseStartup<Startup>();
});
}
}
12 changes: 12 additions & 0 deletions examples/Channeler/Server/Properties/launchSettings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"profiles": {
"Server": {
"commandName": "Project",
"launchBrowser": false,
"applicationUrl": "https://localhost:5001",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
}
}
}
}
13 changes: 13 additions & 0 deletions examples/Channeler/Server/Server.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<Project Sdk="Microsoft.NET.Sdk.Web">

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
<Protobuf Include="..\Proto\data_channel.proto" GrpcServices="Server" Link="Protos\data_channel.proto" />

<PackageReference Include="Grpc.AspNetCore" Version="$(GrpcDotNetPackageVersion)" />
</ItemGroup>

</Project>
107 changes: 107 additions & 0 deletions examples/Channeler/Server/Services/DataChannelerService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
#region Copyright notice and license

// Copyright 2019 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#endregion

using System.Threading.Channels;
using DataChannel;
using Grpc.Core;

namespace Server
{
public class DataChannelerService : DataChanneler.DataChannelerBase
{
private readonly ILogger _logger;

public DataChannelerService(ILoggerFactory loggerFactory)
{
_logger = loggerFactory.CreateLogger<DataChannelerService>();
}

public override async Task<DataResult> UploadData(
IAsyncStreamReader<DataRequest> requestStream, ServerCallContext context)
{
var channel = Channel.CreateBounded<DataRequest>(new BoundedChannelOptions(capacity: 5)
{
SingleReader = false,
SingleWriter = true
});

var readTask = Task.Run(async () =>
{
await foreach (var message in requestStream.ReadAllAsync())
{
await channel.Writer.WriteAsync(message);
}

channel.Writer.Complete();
});

// Process incoming messages on three threads.
var bytesProcessedByThread = await Task.WhenAll(
ProcessMessagesAsync(channel.Reader, _logger),
ProcessMessagesAsync(channel.Reader, _logger),
ProcessMessagesAsync(channel.Reader, _logger));

await readTask;

return new DataResult { BytesProcessed = bytesProcessedByThread.Sum() };

static async Task<int> ProcessMessagesAsync(ChannelReader<DataRequest> reader, ILogger logger)
{
var total = 0;
await foreach (var message in reader.ReadAllAsync())
{
total += message.Value.Length;
}
return total;
}
}

public override async Task DownloadResults(DataRequest request,
IServerStreamWriter<DataResult> responseStream, ServerCallContext context)
{
var channel = Channel.CreateBounded<DataResult>(new BoundedChannelOptions(capacity: 5)
{
SingleReader = true,
SingleWriter = false
});

var consumerTask = Task.Run(async () =>
{
// Consume messages from channel and write to response stream.
await foreach (var message in channel.Reader.ReadAllAsync())
{
await responseStream.WriteAsync(message);
}
});

var dataChunks = request.Value.Chunk(size: 10);

// Write messages to channel from multiple threads.
await Task.WhenAll(dataChunks.Select(
async c =>
{
var message = new DataResult { BytesProcessed = c.Length };
await channel.Writer.WriteAsync(message);
}));

// Complete writing and wait for consumer to complete.
channel.Writer.Complete();
await consumerTask;
}
}
}
48 changes: 48 additions & 0 deletions examples/Channeler/Server/Startup.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#region Copyright notice and license

// Copyright 2019 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#endregion

using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

namespace Server
{
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddGrpc();
}

public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}

app.UseRouting();

app.UseEndpoints(endpoints =>
{
endpoints.MapGrpcService<DataChannelerService>();
});
}
}
}
10 changes: 10 additions & 0 deletions examples/Channeler/Server/appsettings.Development.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"Logging": {
"LogLevel": {
"Default": "Debug",
"System": "Information",
"Grpc": "Information",
"Microsoft": "Information"
}
}
}
Loading

0 comments on commit 1d7f988

Please sign in to comment.