Skip to content

Commit

Permalink
Add binding support to predicate evaluation in Kafka handler
Browse files Browse the repository at this point in the history
Included binding parameter in EvaluatePredicateAsync method to incorporate bookmark bindings for evaluating predicates. Added new methods and dependencies to manage workflow instances and contexts, ensuring accurate predicate evaluation within given bookmarks.
  • Loading branch information
sfmskywalker committed Nov 23, 2024
1 parent 237e476 commit 10146cd
Showing 1 changed file with 58 additions and 11 deletions.
69 changes: 58 additions & 11 deletions src/modules/Elsa.Kafka/Handlers/TriggerWorkflows.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
using System.Text;
using Elsa.Expressions.Contracts;
using Elsa.Expressions.Models;
using Elsa.Extensions;
using Elsa.Kafka.Activities;
using Elsa.Kafka.Notifications;
using Elsa.Kafka.Stimuli;
using Elsa.Mediator.Contracts;
using Elsa.Workflows;
using Elsa.Workflows.Helpers;
using Elsa.Workflows.Management;
using Elsa.Workflows.Management.Entities;
using Elsa.Workflows.Memory;
using Elsa.Workflows.Runtime;
using Elsa.Workflows.Runtime.Options;
Expand All @@ -20,6 +24,9 @@ public class TriggerWorkflows(
ITriggerInvoker triggerInvoker,
IBookmarkQueue bookmarkQueue,
ICorrelationStrategy correlationStrategy,
IWorkflowInstanceStore workflowInstanceStore,
IWorkflowDefinitionService workflowDefinitionService,
IVariablePersistenceManager variablePersistenceManager,
IExpressionEvaluator expressionEvaluator,
IOptions<KafkaOptions> options,
IServiceProvider serviceProvider,
Expand Down Expand Up @@ -116,7 +123,7 @@ private async Task<IEnumerable<TriggerBinding>> GetMatchingTriggerBindingsAsync(
if (stimulus.Topics.All(x => x != topic))
continue;

var isMatch = await EvaluatePredicateAsync(transportMessage, stimulus, cancellationToken);
var isMatch = await EvaluatePredicateAsync(transportMessage, stimulus, null, cancellationToken);

if (isMatch)
matchingTriggers.Add(binding);
Expand Down Expand Up @@ -160,8 +167,8 @@ private async Task<IEnumerable<BookmarkBinding>> GetMatchingBookmarkBindingsAsyn
continue;
}
}

var isMatch = await EvaluatePredicateAsync(transportMessage, stimulus, cancellationToken);
var isMatch = await EvaluatePredicateAsync(transportMessage, stimulus, binding, cancellationToken);

if (isMatch)
matchingBookmarks.Add(binding);
Expand All @@ -170,20 +177,14 @@ private async Task<IEnumerable<BookmarkBinding>> GetMatchingBookmarkBindingsAsyn
return matchingBookmarks;
}

private async Task<bool> EvaluatePredicateAsync(KafkaTransportMessage transportMessage, MessageReceivedStimulus stimulus, CancellationToken cancellationToken)
private async Task<bool> EvaluatePredicateAsync(KafkaTransportMessage transportMessage, MessageReceivedStimulus stimulus, BookmarkBinding? binding, CancellationToken cancellationToken)
{
var predicate = stimulus.Predicate;

if (predicate == null)
return true;

var memory = new MemoryRegister();
var transportMessageVariable = new Variable("transportMessage", transportMessage);
var messageVariable = new Variable("message", transportMessage.Value);
var expressionExecutionContext = new ExpressionExecutionContext(serviceProvider, memory, cancellationToken: cancellationToken);

transportMessageVariable.Set(expressionExecutionContext, transportMessage);
messageVariable.Set(expressionExecutionContext, transportMessage.Value);
var expressionExecutionContext = await GetExpressionExecutionContextAsync(transportMessage, binding, cancellationToken);

try
{
Expand All @@ -195,6 +196,41 @@ private async Task<bool> EvaluatePredicateAsync(KafkaTransportMessage transportM
return false;
}
}

private async Task<ExpressionExecutionContext> GetExpressionExecutionContextAsync(KafkaTransportMessage transportMessage, BookmarkBinding? binding, CancellationToken cancellationToken)
{
var memory = new MemoryRegister();
var expressionExecutionContext = new ExpressionExecutionContext(serviceProvider, memory, cancellationToken: cancellationToken);
var transportMessageVariable = new Variable("transportMessage", transportMessage);
var messageVariable = new Variable("message", transportMessage.Value);

transportMessageVariable.Set(expressionExecutionContext, transportMessage);
messageVariable.Set(expressionExecutionContext, transportMessage.Value);

if(binding == null)
{
return new ExpressionExecutionContext(serviceProvider, memory, cancellationToken: cancellationToken);
}

var boundWorkflowInstanceId = binding.WorkflowInstanceId;
var boundWorkflowInstance = await workflowInstanceStore.FindAsync(boundWorkflowInstanceId, cancellationToken);

if (boundWorkflowInstance == null)
{
logger.LogWarning("Could not find workflow instance with ID {WorkflowInstanceId}", boundWorkflowInstanceId);
throw new InvalidOperationException($"Could not find workflow instance with ID {boundWorkflowInstanceId}");
}

var workflowExecutionContext = await CreateWorkflowExecutionContextAsync(boundWorkflowInstance, cancellationToken);
var bookmark = workflowExecutionContext.Bookmarks.FirstOrDefault(x => x.Id == binding.BookmarkId) ?? throw new InvalidOperationException($"Could not find bookmark with ID {binding.BookmarkId}");
var activityInstanceId = bookmark.ActivityInstanceId!;
var activityExecutionContext = workflowExecutionContext.ActivityExecutionContexts.FirstOrDefault(x => x.Id == activityInstanceId) ?? throw new InvalidOperationException($"Could not find activity execution context with ID {activityInstanceId}");
var parentExecutionContext = activityExecutionContext.ExpressionExecutionContext;
expressionExecutionContext.ParentContext = parentExecutionContext;
await variablePersistenceManager.LoadVariablesAsync(workflowExecutionContext);

return expressionExecutionContext;
}

private string? GetWorkflowInstanceId(KafkaTransportMessage transportMessage)
{
Expand All @@ -206,4 +242,15 @@ private async Task<bool> EvaluatePredicateAsync(KafkaTransportMessage transportM
{
return correlationStrategy.GetCorrelationId(transportMessage);
}

private async Task<WorkflowExecutionContext> CreateWorkflowExecutionContextAsync(WorkflowInstance workflowInstance, CancellationToken cancellationToken)
{
var workflowDefinition = await workflowDefinitionService.FindWorkflowGraphAsync(workflowInstance.DefinitionVersionId, cancellationToken);

if (workflowDefinition == null)
throw new InvalidOperationException($"Could not find workflow definition with ID {workflowInstance.DefinitionVersionId}");

var workflowState = workflowInstance.WorkflowState;
return await WorkflowExecutionContext.CreateAsync(serviceProvider, workflowDefinition, workflowState, cancellationToken: cancellationToken);
}
}

0 comments on commit 10146cd

Please sign in to comment.