From 10146cd661117a933eeac1a38cb8c879dea21313 Mon Sep 17 00:00:00 2001 From: Sipke Schoorstra Date: Sat, 23 Nov 2024 14:41:38 +0100 Subject: [PATCH] Add binding support to predicate evaluation in Kafka handler Included binding parameter in EvaluatePredicateAsync method to incorporate bookmark bindings for evaluating predicates. Added new methods and dependencies to manage workflow instances and contexts, ensuring accurate predicate evaluation within given bookmarks. --- .../Elsa.Kafka/Handlers/TriggerWorkflows.cs | 69 ++++++++++++++++--- 1 file changed, 58 insertions(+), 11 deletions(-) diff --git a/src/modules/Elsa.Kafka/Handlers/TriggerWorkflows.cs b/src/modules/Elsa.Kafka/Handlers/TriggerWorkflows.cs index c11e45d7f4..c856599479 100644 --- a/src/modules/Elsa.Kafka/Handlers/TriggerWorkflows.cs +++ b/src/modules/Elsa.Kafka/Handlers/TriggerWorkflows.cs @@ -1,11 +1,15 @@ using System.Text; using Elsa.Expressions.Contracts; using Elsa.Expressions.Models; +using Elsa.Extensions; using Elsa.Kafka.Activities; using Elsa.Kafka.Notifications; using Elsa.Kafka.Stimuli; using Elsa.Mediator.Contracts; +using Elsa.Workflows; using Elsa.Workflows.Helpers; +using Elsa.Workflows.Management; +using Elsa.Workflows.Management.Entities; using Elsa.Workflows.Memory; using Elsa.Workflows.Runtime; using Elsa.Workflows.Runtime.Options; @@ -20,6 +24,9 @@ public class TriggerWorkflows( ITriggerInvoker triggerInvoker, IBookmarkQueue bookmarkQueue, ICorrelationStrategy correlationStrategy, + IWorkflowInstanceStore workflowInstanceStore, + IWorkflowDefinitionService workflowDefinitionService, + IVariablePersistenceManager variablePersistenceManager, IExpressionEvaluator expressionEvaluator, IOptions options, IServiceProvider serviceProvider, @@ -116,7 +123,7 @@ private async Task> GetMatchingTriggerBindingsAsync( if (stimulus.Topics.All(x => x != topic)) continue; - var isMatch = await EvaluatePredicateAsync(transportMessage, stimulus, cancellationToken); + var isMatch = await EvaluatePredicateAsync(transportMessage, stimulus, null, cancellationToken); if (isMatch) matchingTriggers.Add(binding); @@ -160,8 +167,8 @@ private async Task> GetMatchingBookmarkBindingsAsyn continue; } } - - var isMatch = await EvaluatePredicateAsync(transportMessage, stimulus, cancellationToken); + + var isMatch = await EvaluatePredicateAsync(transportMessage, stimulus, binding, cancellationToken); if (isMatch) matchingBookmarks.Add(binding); @@ -170,20 +177,14 @@ private async Task> GetMatchingBookmarkBindingsAsyn return matchingBookmarks; } - private async Task EvaluatePredicateAsync(KafkaTransportMessage transportMessage, MessageReceivedStimulus stimulus, CancellationToken cancellationToken) + private async Task EvaluatePredicateAsync(KafkaTransportMessage transportMessage, MessageReceivedStimulus stimulus, BookmarkBinding? binding, CancellationToken cancellationToken) { var predicate = stimulus.Predicate; if (predicate == null) return true; - var memory = new MemoryRegister(); - var transportMessageVariable = new Variable("transportMessage", transportMessage); - var messageVariable = new Variable("message", transportMessage.Value); - var expressionExecutionContext = new ExpressionExecutionContext(serviceProvider, memory, cancellationToken: cancellationToken); - - transportMessageVariable.Set(expressionExecutionContext, transportMessage); - messageVariable.Set(expressionExecutionContext, transportMessage.Value); + var expressionExecutionContext = await GetExpressionExecutionContextAsync(transportMessage, binding, cancellationToken); try { @@ -195,6 +196,41 @@ private async Task EvaluatePredicateAsync(KafkaTransportMessage transportM return false; } } + + private async Task GetExpressionExecutionContextAsync(KafkaTransportMessage transportMessage, BookmarkBinding? binding, CancellationToken cancellationToken) + { + var memory = new MemoryRegister(); + var expressionExecutionContext = new ExpressionExecutionContext(serviceProvider, memory, cancellationToken: cancellationToken); + var transportMessageVariable = new Variable("transportMessage", transportMessage); + var messageVariable = new Variable("message", transportMessage.Value); + + transportMessageVariable.Set(expressionExecutionContext, transportMessage); + messageVariable.Set(expressionExecutionContext, transportMessage.Value); + + if(binding == null) + { + return new ExpressionExecutionContext(serviceProvider, memory, cancellationToken: cancellationToken); + } + + var boundWorkflowInstanceId = binding.WorkflowInstanceId; + var boundWorkflowInstance = await workflowInstanceStore.FindAsync(boundWorkflowInstanceId, cancellationToken); + + if (boundWorkflowInstance == null) + { + logger.LogWarning("Could not find workflow instance with ID {WorkflowInstanceId}", boundWorkflowInstanceId); + throw new InvalidOperationException($"Could not find workflow instance with ID {boundWorkflowInstanceId}"); + } + + var workflowExecutionContext = await CreateWorkflowExecutionContextAsync(boundWorkflowInstance, cancellationToken); + var bookmark = workflowExecutionContext.Bookmarks.FirstOrDefault(x => x.Id == binding.BookmarkId) ?? throw new InvalidOperationException($"Could not find bookmark with ID {binding.BookmarkId}"); + var activityInstanceId = bookmark.ActivityInstanceId!; + var activityExecutionContext = workflowExecutionContext.ActivityExecutionContexts.FirstOrDefault(x => x.Id == activityInstanceId) ?? throw new InvalidOperationException($"Could not find activity execution context with ID {activityInstanceId}"); + var parentExecutionContext = activityExecutionContext.ExpressionExecutionContext; + expressionExecutionContext.ParentContext = parentExecutionContext; + await variablePersistenceManager.LoadVariablesAsync(workflowExecutionContext); + + return expressionExecutionContext; + } private string? GetWorkflowInstanceId(KafkaTransportMessage transportMessage) { @@ -206,4 +242,15 @@ private async Task EvaluatePredicateAsync(KafkaTransportMessage transportM { return correlationStrategy.GetCorrelationId(transportMessage); } + + private async Task CreateWorkflowExecutionContextAsync(WorkflowInstance workflowInstance, CancellationToken cancellationToken) + { + var workflowDefinition = await workflowDefinitionService.FindWorkflowGraphAsync(workflowInstance.DefinitionVersionId, cancellationToken); + + if (workflowDefinition == null) + throw new InvalidOperationException($"Could not find workflow definition with ID {workflowInstance.DefinitionVersionId}"); + + var workflowState = workflowInstance.WorkflowState; + return await WorkflowExecutionContext.CreateAsync(serviceProvider, workflowDefinition, workflowState, cancellationToken: cancellationToken); + } } \ No newline at end of file