Skip to content

Commit

Permalink
Refactor workflow scheduling with deferred task execution
Browse files Browse the repository at this point in the history
Reorder pipeline steps to ensure deferred tasks are used correctly in `WorkflowExecutionPipelineBuilderExtensions.cs`. Add `workflowInstanceManager` to `ScheduleBackgroundActivitiesMiddleware` and defer background activity scheduling to execute after state is saved for better consistency.
  • Loading branch information
sfmskywalker committed Jul 7, 2024
1 parent 2a31f59 commit 0816443
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ public static class WorkflowExecutionPipelineBuilderExtensions
public static IWorkflowExecutionPipelineBuilder UseDefaultPipeline(this IWorkflowExecutionPipelineBuilder pipelineBuilder) =>
pipelineBuilder
.Reset()
.UseBackgroundActivities()
.UseDeferredActivityTasks()
.UseBackgroundActivities()
.UseBookmarkPersistence()
.UseActivityExecutionLogPersistence()
.UseWorkflowExecutionLogPersistence()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Elsa.Extensions;
using Elsa.Workflows.Contracts;
using Elsa.Workflows.Management;
using Elsa.Workflows.Pipelines.WorkflowExecution;
using Elsa.Workflows.Runtime.Entities;
using Elsa.Workflows.Runtime.Middleware.Activities;
Expand All @@ -14,7 +15,8 @@ public class ScheduleBackgroundActivitiesMiddleware(
WorkflowMiddlewareDelegate next,
IBackgroundActivityScheduler backgroundActivityScheduler,
IStimulusHasher stimulusHasher,
IBookmarkStore bookmarkStore)
IBookmarkStore bookmarkStore,
IWorkflowInstanceManager workflowInstanceManager)
: WorkflowExecutionMiddleware(next)
{
/// <inheritdoc />
Expand All @@ -29,42 +31,53 @@ public override async ValueTask InvokeAsync(WorkflowExecutionContext context)
var scheduledBackgroundActivities = workflowExecutionContext
.TransientProperties
.GetOrAdd(BackgroundActivityInvokerMiddleware.BackgroundActivitySchedulesKey, () => new List<ScheduledBackgroundActivity>());

if (scheduledBackgroundActivities.Count == 0)
return;

foreach (var scheduledBackgroundActivity in scheduledBackgroundActivities)
context.DeferTask(async () =>
{
// Schedule the background activity.
var jobId = await backgroundActivityScheduler.ScheduleAsync(scheduledBackgroundActivity, cancellationToken);
// Commit state.
await workflowInstanceManager.SaveAsync(context, cancellationToken);
foreach (var scheduledBackgroundActivity in scheduledBackgroundActivities)
{
// Schedule the background activity.
var jobId = await backgroundActivityScheduler.ScheduleAsync(scheduledBackgroundActivity, cancellationToken);
// Select the bookmark associated with the background activity.
var bookmark = workflowExecutionContext.Bookmarks.First(x => x.Id == scheduledBackgroundActivity.BookmarkId);
var stimulus = bookmark.GetPayload<BackgroundActivityStimulus>();
// Select the bookmark associated with the background activity.
var bookmark = workflowExecutionContext.Bookmarks.First(x => x.Id == scheduledBackgroundActivity.BookmarkId);
var stimulus = bookmark.GetPayload<BackgroundActivityStimulus>();
// Store the created job ID.
workflowExecutionContext.Bookmarks.Remove(bookmark);
stimulus.JobId = jobId;
bookmark = bookmark with
{
Payload = bookmark.Payload,
Hash = stimulusHasher.Hash(bookmark.Name, stimulus)
};
workflowExecutionContext.Bookmarks.Add(bookmark);
// Store the created job ID.
workflowExecutionContext.Bookmarks.Remove(bookmark);
stimulus.JobId = jobId;
bookmark = bookmark with
{
Payload = bookmark.Payload,
Hash = stimulusHasher.Hash(bookmark.Name, stimulus)
};
workflowExecutionContext.Bookmarks.Add(bookmark);
// Update the bookmark.
var storedBookmark = new StoredBookmark
{
Id = bookmark.Id,
TenantId = tenantId,
ActivityInstanceId = bookmark.ActivityInstanceId,
ActivityTypeName = bookmark.Name,
Hash = bookmark.Hash,
WorkflowInstanceId = workflowExecutionContext.Id,
CreatedAt = bookmark.CreatedAt,
CorrelationId = workflowExecutionContext.CorrelationId,
Payload = bookmark.Payload,
Metadata = bookmark.Metadata,
};
// Update the bookmark.
var storedBookmark = new StoredBookmark
{
Id = bookmark.Id,
TenantId = tenantId,
ActivityInstanceId = bookmark.ActivityInstanceId,
ActivityTypeName = bookmark.Name,
Hash = bookmark.Hash,
WorkflowInstanceId = workflowExecutionContext.Id,
CreatedAt = bookmark.CreatedAt,
CorrelationId = workflowExecutionContext.CorrelationId,
Payload = bookmark.Payload,
Metadata = bookmark.Metadata,
};
await bookmarkStore.SaveAsync(storedBookmark, cancellationToken);
}
await bookmarkStore.SaveAsync(storedBookmark, cancellationToken);
}
});


}
}

0 comments on commit 0816443

Please sign in to comment.