From 2e02283676a4603ef4aa9bffe0039957ce286ee9 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Wed, 16 Oct 2024 08:54:44 -0500 Subject: [PATCH] Batch processing working *with* external transports. Closes GH-1076 --- .../BatchMessaging/BatchMessaging.csproj | 18 +++++++ .../Kafka/BatchMessaging/BatchMessaging.http | 6 +++ .../Kafka/BatchMessaging/Program.cs | 52 +++++++++++++++++++ .../Properties/launchSettings.json | 41 +++++++++++++++ .../appsettings.Development.json | 8 +++ .../Kafka/BatchMessaging/appsettings.json | 9 ++++ .../Wolverine.Kafka.Tests.csproj | 6 +++ .../batch_processing_with_kafka.cs | 32 ++++++++++++ .../Runtime/Handlers/HandlerGraph.cs | 1 - src/Wolverine/WolverineOptions.Batching.cs | 3 ++ wolverine.sln | 7 +++ 11 files changed, 182 insertions(+), 1 deletion(-) create mode 100644 src/Transports/Kafka/BatchMessaging/BatchMessaging.csproj create mode 100644 src/Transports/Kafka/BatchMessaging/BatchMessaging.http create mode 100644 src/Transports/Kafka/BatchMessaging/Program.cs create mode 100644 src/Transports/Kafka/BatchMessaging/Properties/launchSettings.json create mode 100644 src/Transports/Kafka/BatchMessaging/appsettings.Development.json create mode 100644 src/Transports/Kafka/BatchMessaging/appsettings.json create mode 100644 src/Transports/Kafka/Wolverine.Kafka.Tests/batch_processing_with_kafka.cs diff --git a/src/Transports/Kafka/BatchMessaging/BatchMessaging.csproj b/src/Transports/Kafka/BatchMessaging/BatchMessaging.csproj new file mode 100644 index 000000000..b2094a5df --- /dev/null +++ b/src/Transports/Kafka/BatchMessaging/BatchMessaging.csproj @@ -0,0 +1,18 @@ + + + + enable + enable + net8.0 + + + + + + + + + + + + diff --git a/src/Transports/Kafka/BatchMessaging/BatchMessaging.http b/src/Transports/Kafka/BatchMessaging/BatchMessaging.http new file mode 100644 index 000000000..b4577a97d --- /dev/null +++ b/src/Transports/Kafka/BatchMessaging/BatchMessaging.http @@ -0,0 +1,6 @@ +@BatchMessaging_HostAddress = http://localhost:5089 + +GET {{BatchMessaging_HostAddress}}/weatherforecast/ +Accept: application/json + +### diff --git a/src/Transports/Kafka/BatchMessaging/Program.cs b/src/Transports/Kafka/BatchMessaging/Program.cs new file mode 100644 index 000000000..4a27dacba --- /dev/null +++ b/src/Transports/Kafka/BatchMessaging/Program.cs @@ -0,0 +1,52 @@ +using Confluent.Kafka; +using Oakton; +using Wolverine; +using Wolverine.Kafka; + +var builder = WebApplication.CreateBuilder(args); + +builder.Services.AddEndpointsApiExplorer(); +builder.Services.AddSwaggerGen(); + +builder.Host.UseWolverine(opts => +{ + opts.UseKafka("localhost:9092").AutoProvision(); + + opts.PublishAllMessages().ToKafkaTopic("topic_0"); + + opts.BatchMessagesOf(); + opts.ListenToKafkaTopic("topic_0"); +}); + +var app = builder.Build(); + +if (app.Environment.IsDevelopment()) +{ + app.UseSwagger(); + app.UseSwaggerUI(); +} + +app.MapPost("/test", async (IMessageBus bus) => + { + var message = new TestMessage(); + await bus.PublishAsync(message); + await bus.PublishAsync(message); + // results in: + // No known handler for TestMessage#08dced0c-3834-b4c6-54d7-e075bf020000 from kafka://topic/topic_0 + }) + .WithOpenApi(); + +return await app.RunOaktonCommands(args); + +public partial class Program {} + + +public record TestMessage; + +public class TestMessagesHandler +{ + public void Handle(TestMessage[] messages) + { + Console.WriteLine("Messages received"); + } +} \ No newline at end of file diff --git a/src/Transports/Kafka/BatchMessaging/Properties/launchSettings.json b/src/Transports/Kafka/BatchMessaging/Properties/launchSettings.json new file mode 100644 index 000000000..cf9d5da4c --- /dev/null +++ b/src/Transports/Kafka/BatchMessaging/Properties/launchSettings.json @@ -0,0 +1,41 @@ +{ + "$schema": "http://json.schemastore.org/launchsettings.json", + "iisSettings": { + "windowsAuthentication": false, + "anonymousAuthentication": true, + "iisExpress": { + "applicationUrl": "http://localhost:3391", + "sslPort": 44303 + } + }, + "profiles": { + "http": { + "commandName": "Project", + "dotnetRunMessages": true, + "launchBrowser": true, + "launchUrl": "swagger", + "applicationUrl": "http://localhost:5089", + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development" + } + }, + "https": { + "commandName": "Project", + "dotnetRunMessages": true, + "launchBrowser": true, + "launchUrl": "swagger", + "applicationUrl": "https://localhost:7028;http://localhost:5089", + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development" + } + }, + "IIS Express": { + "commandName": "IISExpress", + "launchBrowser": true, + "launchUrl": "swagger", + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development" + } + } + } +} diff --git a/src/Transports/Kafka/BatchMessaging/appsettings.Development.json b/src/Transports/Kafka/BatchMessaging/appsettings.Development.json new file mode 100644 index 000000000..0c208ae91 --- /dev/null +++ b/src/Transports/Kafka/BatchMessaging/appsettings.Development.json @@ -0,0 +1,8 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.AspNetCore": "Warning" + } + } +} diff --git a/src/Transports/Kafka/BatchMessaging/appsettings.json b/src/Transports/Kafka/BatchMessaging/appsettings.json new file mode 100644 index 000000000..10f68b8c8 --- /dev/null +++ b/src/Transports/Kafka/BatchMessaging/appsettings.json @@ -0,0 +1,9 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.AspNetCore": "Warning" + } + }, + "AllowedHosts": "*" +} diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/Wolverine.Kafka.Tests.csproj b/src/Transports/Kafka/Wolverine.Kafka.Tests/Wolverine.Kafka.Tests.csproj index 1e9dd6b54..a2d930880 100644 --- a/src/Transports/Kafka/Wolverine.Kafka.Tests/Wolverine.Kafka.Tests.csproj +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/Wolverine.Kafka.Tests.csproj @@ -3,6 +3,7 @@ false true + net8.0 @@ -21,6 +22,7 @@ + @@ -30,4 +32,8 @@ + + + + diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/batch_processing_with_kafka.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/batch_processing_with_kafka.cs new file mode 100644 index 000000000..e6b8ebe58 --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/batch_processing_with_kafka.cs @@ -0,0 +1,32 @@ +using Alba; +using Oakton; +using Shouldly; +using Wolverine.Tracking; + +namespace Wolverine.Kafka.Tests; + +public class batch_processing_with_kafka +{ + [Fact] + public async Task end_to_end() + { + OaktonEnvironment.AutoStartHost = true; + + await using var host = await AlbaHost.For(_ => {}); + + IScenarioResult result = null!; + + Func execute = async _ => + { + result = await host.Scenario(x => { x.Post.Url("/test"); }); + }; + + var tracked = await host + .TrackActivity() + .WaitForMessageToBeReceivedAt(host) + .ExecuteAndWaitAsync(execute); + + tracked.FindSingleTrackedMessageOfType() + .Length.ShouldBe(2); + } +} \ No newline at end of file diff --git a/src/Wolverine/Runtime/Handlers/HandlerGraph.cs b/src/Wolverine/Runtime/Handlers/HandlerGraph.cs index 3c59f7ac6..9c4b43817 100644 --- a/src/Wolverine/Runtime/Handlers/HandlerGraph.cs +++ b/src/Wolverine/Runtime/Handlers/HandlerGraph.cs @@ -23,7 +23,6 @@ namespace Wolverine.Runtime.Handlers; public partial class HandlerGraph : ICodeFileCollectionWithServices, IWithFailurePolicies { - public static readonly string Context = "context"; private readonly List _calls = new(); private readonly object _compilingLock = new(); diff --git a/src/Wolverine/WolverineOptions.Batching.cs b/src/Wolverine/WolverineOptions.Batching.cs index ab048267c..96f6f5cdd 100644 --- a/src/Wolverine/WolverineOptions.Batching.cs +++ b/src/Wolverine/WolverineOptions.Batching.cs @@ -35,6 +35,9 @@ public LocalQueueConfiguration BatchMessagesOf(Type elementType, Action().FindQueueForMessageType(elementType); diff --git a/wolverine.sln b/wolverine.sln index ed475995c..7de67de5a 100644 --- a/wolverine.sln +++ b/wolverine.sln @@ -241,6 +241,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Wolverine.RavenDb", "src\Pe EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RavenDbTests", "src\Persistence\RavenDbTests\RavenDbTests.csproj", "{71B152DD-7A0B-4935-B8B1-1060E674D23D}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BatchMessaging", "src\Transports\Kafka\BatchMessaging\BatchMessaging.csproj", "{B035801D-E786-4AAA-858A-0770D88116D6}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -610,6 +612,10 @@ Global {71B152DD-7A0B-4935-B8B1-1060E674D23D}.Debug|Any CPU.Build.0 = Debug|Any CPU {71B152DD-7A0B-4935-B8B1-1060E674D23D}.Release|Any CPU.ActiveCfg = Release|Any CPU {71B152DD-7A0B-4935-B8B1-1060E674D23D}.Release|Any CPU.Build.0 = Release|Any CPU + {B035801D-E786-4AAA-858A-0770D88116D6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {B035801D-E786-4AAA-858A-0770D88116D6}.Debug|Any CPU.Build.0 = Debug|Any CPU + {B035801D-E786-4AAA-858A-0770D88116D6}.Release|Any CPU.ActiveCfg = Release|Any CPU + {B035801D-E786-4AAA-858A-0770D88116D6}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(NestedProjects) = preSolution {24497E6A-D6B1-4C80-ABFB-57FFAD5070C4} = {96119B5E-B5F0-400A-9580-B342EBE26212} @@ -717,5 +723,6 @@ Global {1A34A78B-F6AB-41A9-8B90-384C6E1DBC63} = {7A9E0EAE-9ABF-40F6-9DB9-8FB1243F4210} {AAFFC067-D110-45FF-9FA0-8E02F77D9D14} = {7A9E0EAE-9ABF-40F6-9DB9-8FB1243F4210} {71B152DD-7A0B-4935-B8B1-1060E674D23D} = {7A9E0EAE-9ABF-40F6-9DB9-8FB1243F4210} + {B035801D-E786-4AAA-858A-0770D88116D6} = {63E9B289-95E8-4F2B-A064-156971A6853C} EndGlobalSection EndGlobal