Skip to content

Commit

Permalink
Update Kafka Module: Add Support for Configuring Consumer and Produce…
Browse files Browse the repository at this point in the history
…r Factories (#6139)

* Enable Kafka Worker Factory and Refactor Worker Implementation

Introduce a flexible worker factory mechanism allowing custom worker creation with DefaultWorkerFactory as the initial implementation. Enhance Worker class to be generic, remove manual consumer configuration, and streamline message processing logic, improving code maintainability and extensibility.

* Refactor Kafka configuration properties

Renamed configuration properties in Consumer and Producer entities. Updated references in the codebase to use the new `Config` property instead of `ConsumerConfig` and `BootstrapServers`. Adjusted appsettings.json to match the new configuration schema.

* Add Kafka producer and consumer implementation

Implemented classes and interfaces to handle Kafka producers and consumers, including `ProducerProxy`, `ConsumerProxy`, and related context classes and factories. Refactored existing code to utilize these new implementations, replacing worker terminology with consumer and addressing context-specific fields.

* Remove redundant code in DefaultConsumerFactory and SendMessage

Removed commented-out unused return statement in DefaultConsumerFactory. Also eliminated explicit producer.Dispose() call in SendMessage, as the 'using' statement already handles resource cleanup.

* Add ExpandoObject producer and consumer factories

Replaced DefaultSerializers with new JsonSerializer and JsonDeserializer classes. Introduced ExpandoObjectProducerFactory and ExpandoObjectConsumerFactory to handle dynamic types. Updated workflow and configuration to use the new factories.

* Refactor bookmark processing and manage worker subscriptions

Refactored bookmark processing logic to utilize extension methods. Optimized worker subscriptions by centralizing topic subscription management and added logging for subscribed topics. This improves maintainability and clarity of the codebase.

* Refactor worker creation to use ActivatorUtilities

Updated WorkerManager to instantiate workers using ActivatorUtilities for better dependency injection support. This enhances code readability and maintains consistency with the service provider approach used throughout the codebase.
  • Loading branch information
sfmskywalker authored Nov 22, 2024
1 parent 8b4ff07 commit b534b42
Show file tree
Hide file tree
Showing 42 changed files with 433 additions and 315 deletions.
2 changes: 1 addition & 1 deletion src/apps/Elsa.Server.Web/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
const bool useMemoryStores = false;
const bool useCaching = true;
const bool useAzureServiceBus = false;
const bool useKafka = false;
const bool useKafka = true;
const bool useReadOnlyMode = false;
const bool useSignalR = false; // Disabled until Elsa Studio sends authenticated requests.
const WorkflowRuntime workflowRuntime = WorkflowRuntime.ProtoActor;
Expand Down
3 changes: 1 addition & 2 deletions src/apps/Elsa.Server.Web/Workflows/ConsumerWorkflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@ protected override void Build(IWorkflowBuilder builder)
new MessageReceived
{
ConsumerDefinitionId = new("consumer-1"),
MessageType = new(typeof(ExpandoObject)),
Topics = new(["topic-1"]),
Predicate = new(JavaScriptExpression.Create("message => message.OrderId == '1'")),
Result = new(message),
CanStartWorkflow = true
CanStartWorkflow = false
},
new WriteLine(c => JsonSerializer.Serialize(message.Get(c)))
}
Expand Down
2 changes: 1 addition & 1 deletion src/apps/Elsa.Server.Web/Workflows/ProducerWorkflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ public class ProducerWorkflow : WorkflowBase
protected override void Build(IWorkflowBuilder builder)
{
builder.Name = "Producer Workflow";
builder.Root = new SendMessage
builder.Root = new ProduceMessage
{
Topic = new("topic-2"),
ProducerDefinitionId = new("producer-1"),
Expand Down
34 changes: 11 additions & 23 deletions src/apps/Elsa.Server.Web/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -220,35 +220,23 @@
{
"Id": "producer-1",
"Name": "Producer 1",
"BootstrapServers": [
"localhost:9092"
]
"FactoryType": "Elsa.Kafka.Factories.ExpandoObjectProducerFactory, Elsa.Kafka",
"Config": {
"BootstrapServers": "localhost:9092"
}
}
],
"Consumers": [
{
"Id": "consumer-1",
"Name": "Consumer 1",
"BootstrapServers": [
"localhost:9092"
],
"GroupId": "group-1",
"AutoOffsetReset": "earliest",
"EnableAutoCommit": "true",
"CorrelatingFields": [
"orderId",
"customerId"
]
},
{
"Id": "consumer-2",
"Name": "Consumer 2",
"BootstrapServers": [
"localhost:9092"
],
"GroupId": "group-1",
"AutoOffsetReset": "earliest",
"EnableAutoCommit": "true"
"FactoryType": "Elsa.Kafka.Factories.ExpandoObjectConsumerFactory, Elsa.Kafka",
"Config": {
"BootstrapServers": "localhost:9092",
"GroupId": "group-1",
"AutoOffsetReset": "earliest",
"EnableAutoCommit": "true"
}
}
]
},
Expand Down
3 changes: 3 additions & 0 deletions src/modules/Elsa.Common/Features/DefaultFormattersFeature.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using System.ComponentModel;
using Elsa.Common.Serialization;
using Elsa.Common.Services;
using Elsa.Features.Abstractions;
using Elsa.Features.Services;
Expand All @@ -9,6 +11,7 @@ public class DefaultFormattersFeature(IModule module) : FeatureBase(module)
{
public override void Configure()
{
TypeDescriptor.AddAttributes(typeof(Type), new TypeConverterAttribute(typeof(TypeTypeConverter)));
Module.Services.AddSingleton<IFormatter, JsonFormatter>();
}
}
33 changes: 33 additions & 0 deletions src/modules/Elsa.Common/Serialization/TypeTypeConverter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
using System.ComponentModel;
using System.Globalization;
using JetBrains.Annotations;

namespace Elsa.Common.Serialization;

[PublicAPI]
public class TypeTypeConverter : TypeConverter
{
public override bool CanConvertFrom(ITypeDescriptorContext? context, Type sourceType)
{
return sourceType == typeof(string) || base.CanConvertFrom(context, sourceType);
}

public override object? ConvertFrom(ITypeDescriptorContext? context, CultureInfo? culture, object value)
{
if (value is string stringValue)
return Type.GetType(stringValue);
return base.ConvertFrom(context, culture, value);
}

public override bool CanConvertTo(ITypeDescriptorContext? context, Type? destinationType)
{
return destinationType == typeof(string) || base.CanConvertTo(context, destinationType);
}

public override object? ConvertTo(ITypeDescriptorContext? context, CultureInfo? culture, object? value, Type destinationType)
{
if (destinationType == typeof(string) && value is Type type)
return type.AssemblyQualifiedName;
return base.ConvertTo(context, culture, value, destinationType);
}
}
16 changes: 1 addition & 15 deletions src/modules/Elsa.Kafka/Activities/MessageReceived.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
using Elsa.Workflows.Attributes;
using Elsa.Workflows.Models;
using Elsa.Workflows.UIHints;
using Microsoft.Extensions.Options;

namespace Elsa.Kafka.Activities;

Expand Down Expand Up @@ -48,12 +47,6 @@ public MessageReceived(Input<string> consumerDefinitionId, [CallerFilePath] stri
)]
public Input<ICollection<string>> Topics { get; set; } = default!;

/// <summary>
/// Optional. The .NET type to deserialize the message into. Defaults to <see cref="string"/>.
/// </summary>
[Input(Description = "Optional. The .NET type to deserialize the message into.")]
public Input<Type?> MessageType { get; set; } = default!;

[Input(
Description = "Optional. A predicate to filter messages.",
AutoEvaluate = false
Expand Down Expand Up @@ -95,12 +88,7 @@ private async ValueTask Resume(ActivityExecutionContext context)

private void SetResult(KafkaTransportMessage receivedMessage, ActivityExecutionContext context)
{
var bodyAsString = receivedMessage.Value;
var targetType = MessageType.GetOrDefault(context);
var deserializer = context.GetRequiredService<IOptions<KafkaOptions>>().Value.Deserializer;
var serviceProvider = context.WorkflowExecutionContext.ServiceProvider;
var body = targetType == null ? bodyAsString : deserializer(serviceProvider, bodyAsString, targetType);

var body = receivedMessage.Value;
context.Set(TransportMessage, receivedMessage);
context.SetResult(body);
}
Expand All @@ -109,7 +97,6 @@ private object GetStimulus(ExpressionExecutionContext context)
{
var consumerDefinitionId = ConsumerDefinitionId.Get(context);
var topics = Topics.GetOrDefault(context) ?? [];
var messageType = MessageType.GetOrDefault(context) ?? typeof(string);
var isLocal = IsLocal.GetOrDefault(context);
var activity = context.GetActivity();
var activityRegistry = context.GetRequiredService<IActivityRegistry>();
Expand All @@ -122,7 +109,6 @@ private object GetStimulus(ExpressionExecutionContext context)
{
ConsumerDefinitionId = consumerDefinitionId,
Topics = topics.Distinct().ToList(),
MessageType = messageType,
IsLocal = isLocal,
Predicate = predicateExpression,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@

namespace Elsa.Kafka.Activities;

[Activity("Elsa.Kafka", "Kafka", "Sends a message to a given topic")]
public class SendMessage : CodeActivity
[Activity("Elsa.Kafka", "Kafka", "Produces a message and delivers it to a given topic.")]
public class ProduceMessage : CodeActivity
{
/// <summary>
/// The topic to which the message will be sent.
/// </summary>
[Input(
Description = "The topic to which the message will be sent.",
Description = "The topic to which the message will be delivered.",
UIHint = InputUIHints.DropDown,
UIHandler = typeof(TopicDefinitionsDropdownOptionsProvider)
)]
Expand All @@ -29,7 +29,7 @@ public class SendMessage : CodeActivity
/// </summary>
[Input(
DisplayName = "Producer",
Description = "The producer to use when sending the message.",
Description = "The producer to use to produce the message.",
UIHint = InputUIHints.DropDown,
UIHandler = typeof(ProducerDefinitionsDropdownOptionsProvider)
)]
Expand All @@ -50,7 +50,7 @@ public class SendMessage : CodeActivity
/// <summary>
/// The content of the message to send.
/// </summary>
[Input(Description = "The content of the message to send.")]
[Input(Description = "The content of the message to produce.")]
public Input<object> Content { get; set; } = default!;

protected override async ValueTask ExecuteAsync(ActivityExecutionContext context)
Expand All @@ -60,24 +60,12 @@ protected override async ValueTask ExecuteAsync(ActivityExecutionContext context
var producerDefinitionEnumerator = context.GetRequiredService<IProducerDefinitionEnumerator>();
var producerDefinition = await producerDefinitionEnumerator.GetByIdAsync(producerDefinitionId);
var content = Content.Get(context);
var serializer = context.GetRequiredService<IOptions<KafkaOptions>>().Value.Serializer;
var serviceProvider = context.WorkflowExecutionContext.ServiceProvider;
var serializedContent = content as string ?? serializer(serviceProvider, content);
var config = new ProducerConfig
{
BootstrapServers = string.Join(",", producerDefinition.BootstrapServers),
};

context.DeferTask(async () =>
{
using var producer = new ProducerBuilder<Null, string>(config).Build();
using var producer = CreateProducer(context, producerDefinition);
var headers = CreateHeaders(context);
var message = new Message<Null, string>
{
Value = serializedContent,
Headers = headers
};
await producer.ProduceAsync(topic, message);
await producer.ProduceAsync(topic, content, headers);
});
}

Expand All @@ -87,9 +75,6 @@ private Headers CreateHeaders(ActivityExecutionContext context)
var headers = new Headers();
var correlationId = CorrelationId.GetOrDefault(context);
var isLocal = IsLocal.Get(context);
var topic = Topic.Get(context);

headers.Add(options.TopicHeaderKey, Encoding.UTF8.GetBytes(topic));

if (!string.IsNullOrWhiteSpace(correlationId))
headers.Add(options.CorrelationHeaderKey, Encoding.UTF8.GetBytes(correlationId));
Expand All @@ -99,4 +84,15 @@ private Headers CreateHeaders(ActivityExecutionContext context)

return headers;
}

private IProducer CreateProducer(ActivityExecutionContext context, ProducerDefinition producerDefinition)
{
var factory = context.GetRequiredService(producerDefinition.FactoryType) as IProducerFactory;

if (factory == null)
throw new InvalidOperationException($"Producer factory of type '{producerDefinition.FactoryType}' not found.");

var createProducerContext = new CreateProducerContext(producerDefinition);
return factory.CreateProducer(createProducerContext);
}
}
6 changes: 6 additions & 0 deletions src/modules/Elsa.Kafka/Contracts/IConsumer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace Elsa.Kafka;

public interface IConsumer
{
object Consumer { get; }
}
6 changes: 6 additions & 0 deletions src/modules/Elsa.Kafka/Contracts/IConsumerFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace Elsa.Kafka;

public interface IConsumerFactory
{
IConsumer CreateConsumer(CreateConsumerContext workerContext);
}
8 changes: 8 additions & 0 deletions src/modules/Elsa.Kafka/Contracts/IProducer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
using Confluent.Kafka;

namespace Elsa.Kafka;

public interface IProducer : IDisposable
{
Task ProduceAsync(string topic, object value, Headers? headers = null, CancellationToken cancellationToken = default);
}
6 changes: 6 additions & 0 deletions src/modules/Elsa.Kafka/Contracts/IProducerFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace Elsa.Kafka;

public interface IProducerFactory
{
IProducer CreateProducer(CreateProducerContext workerContext);
}
3 changes: 3 additions & 0 deletions src/modules/Elsa.Kafka/Contracts/IWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ namespace Elsa.Kafka;

public interface IWorker : IDisposable
{
ConsumerDefinition ConsumerDefinition { get; }
IDictionary<string, BookmarkBinding> BookmarkBindings { get; }
IDictionary<string, TriggerBinding> TriggerBindings { get; }
void Start(CancellationToken cancellationToken);
void Stop();
void BindTrigger(TriggerBinding binding);
Expand Down
2 changes: 2 additions & 0 deletions src/modules/Elsa.Kafka/Contracts/IWorkerManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ public interface IWorkerManager
Task UpdateWorkersAsync(CancellationToken cancellationToken = default);
void StopWorkers();
Task BindTriggersAsync(IEnumerable<StoredTrigger> triggers, CancellationToken cancellationToken = default);
Task UnbindTriggersAsync(IEnumerable<StoredTrigger> triggers, CancellationToken cancellationToken = default);
Task BindBookmarksAsync(IEnumerable<StoredBookmark> bookmarks, CancellationToken cancellationToken = default);
Task UnbindBookmarksAsync(IEnumerable<StoredBookmark> bookmarks, CancellationToken cancellationToken = default);
IWorker? GetWorker(string consumerDefinitionId);
}
13 changes: 0 additions & 13 deletions src/modules/Elsa.Kafka/Entities/ConsumerConfigDefinition.cs

This file was deleted.

12 changes: 12 additions & 0 deletions src/modules/Elsa.Kafka/Entities/ConsumerDefinition.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using Confluent.Kafka;
using Elsa.Common.Entities;
using Elsa.Kafka.Factories;

namespace Elsa.Kafka;

public class ConsumerDefinition : Entity
{
public string Name { get; set; } = default!;
public Type FactoryType { get; set; } = typeof(DefaultConsumerFactory);
public ConsumerConfig Config { get; set; } = new();
}
5 changes: 4 additions & 1 deletion src/modules/Elsa.Kafka/Entities/ProducerDefinition.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
using Confluent.Kafka;
using Elsa.Common.Entities;
using Elsa.Kafka.Factories;

namespace Elsa.Kafka;

public class ProducerDefinition : Entity
{
public string Name { get; set; } = default!;
public ICollection<string> BootstrapServers { get; set; } = [];
public Type FactoryType { get; set; } = typeof(DefaultProducerFactory);
public ProducerConfig Config { get; set; } = new();
}
18 changes: 18 additions & 0 deletions src/modules/Elsa.Kafka/Extensions/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using Microsoft.Extensions.DependencyInjection;

namespace Elsa.Kafka;

public static class ServiceCollectionExtensions
{
public static IServiceCollection AddConsumerFactory<T>(this IServiceCollection services) where T : class, IConsumerFactory
{
services.AddScoped<T>();
return services;
}

public static IServiceCollection AddProducerFactory<T>(this IServiceCollection services) where T : class, IProducerFactory
{
services.AddScoped<T>();
return services;
}
}
Loading

0 comments on commit b534b42

Please sign in to comment.