Skip to content

Commit

Permalink
update durability provider class for new core-entities support. (#2570)
Browse files Browse the repository at this point in the history
* update durability provider class for new core-entities support.

* add configuration setting for max entity concurrency to DurableTaskOptions

* minor fixes.
  • Loading branch information
sebastianburckhardt authored Sep 11, 2023
1 parent c36d3d9 commit 10111e0
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using DurableTask.AzureStorage;
using DurableTask.AzureStorage.Tracking;
using DurableTask.Core;
using DurableTask.Core.Entities;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Newtonsoft.Json.Converters;
Expand All @@ -18,6 +19,7 @@
using Microsoft.Azure.WebJobs.Host.Scale;
#endif
using AzureStorage = DurableTask.AzureStorage;
using DTCore = DurableTask.Core;

namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
{
Expand Down Expand Up @@ -53,8 +55,6 @@ public AzureStorageDurabilityProvider(
this.logger = logger;
}

public override bool SupportsEntities => true;

public override bool CheckStatusBeforeRaiseEvent => true;

/// <summary>
Expand Down Expand Up @@ -97,6 +97,29 @@ public async override Task<IList<OrchestrationState>> GetAllOrchestrationStatesW

/// <inheritdoc/>
public async override Task<string> RetrieveSerializedEntityState(EntityId entityId, JsonSerializerSettings serializerSettings)
{
EntityBackendQueries entityBackendQueries = (this.serviceClient as IEntityOrchestrationService)?.EntityBackendQueries;

if (entityBackendQueries != null) // entity queries are natively supported
{
var entity = await entityBackendQueries.GetEntityAsync(new DTCore.Entities.EntityId(entityId.EntityName, entityId.EntityKey), cancellation: default);

if (entity == null)
{
return null;
}
else
{
return entity.Value.SerializedState;
}
}
else // fall back to old implementation
{
return await this.LegacyImplementationOfRetrieveSerializedEntityState(entityId, serializerSettings);
}
}

private async Task<string> LegacyImplementationOfRetrieveSerializedEntityState(EntityId entityId, JsonSerializerSettings serializerSettings)
{
var instanceId = EntityId.GetSchedulerIdFromEntityId(entityId);
IList<OrchestrationState> stateList = await this.serviceClient.GetOrchestrationStateAsync(instanceId, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ internal class AzureStorageDurabilityProviderFactory : IDurabilityProviderFactor
private readonly AzureStorageOptions azureStorageOptions;
private readonly INameResolver nameResolver;
private readonly ILoggerFactory loggerFactory;
private readonly bool useSeparateQueriesForEntities;
private readonly bool useSeparateQueueForEntityWorkItems;
private readonly bool inConsumption; // If true, optimize defaults for consumption
private AzureStorageDurabilityProvider defaultStorageProvider;

Expand Down Expand Up @@ -56,6 +58,7 @@ public AzureStorageDurabilityProviderFactory(
// different defaults for key configuration values.
int maxConcurrentOrchestratorsDefault = this.inConsumption ? 5 : 10 * Environment.ProcessorCount;
int maxConcurrentActivitiesDefault = this.inConsumption ? 10 : 10 * Environment.ProcessorCount;
int maxConcurrentEntitiesDefault = this.inConsumption ? 10 : 10 * Environment.ProcessorCount;
int maxEntityOperationBatchSizeDefault = this.inConsumption ? 50 : 5000;

if (this.inConsumption)
Expand All @@ -71,9 +74,19 @@ public AzureStorageDurabilityProviderFactory(
}
}

WorkerRuntimeType runtimeType = platformInfo.GetWorkerRuntimeType();
if (runtimeType == WorkerRuntimeType.DotNetIsolated ||
runtimeType == WorkerRuntimeType.Java ||
runtimeType == WorkerRuntimeType.Custom)
{
this.useSeparateQueriesForEntities = true;
this.useSeparateQueueForEntityWorkItems = true;
}

// The following defaults are only applied if the customer did not explicitely set them on `host.json`
this.options.MaxConcurrentOrchestratorFunctions = this.options.MaxConcurrentOrchestratorFunctions ?? maxConcurrentOrchestratorsDefault;
this.options.MaxConcurrentActivityFunctions = this.options.MaxConcurrentActivityFunctions ?? maxConcurrentActivitiesDefault;
this.options.MaxConcurrentEntityFunctions = this.options.MaxConcurrentEntityFunctions ?? maxConcurrentEntitiesDefault;
this.options.MaxEntityOperationBatchSize = this.options.MaxEntityOperationBatchSize ?? maxEntityOperationBatchSizeDefault;

// Override the configuration defaults with user-provided values in host.json, if any.
Expand Down Expand Up @@ -188,6 +201,7 @@ internal AzureStorageOrchestrationServiceSettings GetAzureStorageOrchestrationSe
WorkItemQueueVisibilityTimeout = this.azureStorageOptions.WorkItemQueueVisibilityTimeout,
MaxConcurrentTaskOrchestrationWorkItems = this.options.MaxConcurrentOrchestratorFunctions ?? throw new InvalidOperationException($"{nameof(this.options.MaxConcurrentOrchestratorFunctions)} needs a default value"),
MaxConcurrentTaskActivityWorkItems = this.options.MaxConcurrentActivityFunctions ?? throw new InvalidOperationException($"{nameof(this.options.MaxConcurrentOrchestratorFunctions)} needs a default value"),
MaxConcurrentTaskEntityWorkItems = this.options.MaxConcurrentEntityFunctions ?? throw new InvalidOperationException($"{nameof(this.options.MaxConcurrentEntityFunctions)} needs a default value"),
ExtendedSessionsEnabled = this.options.ExtendedSessionsEnabled,
ExtendedSessionIdleTimeout = extendedSessionTimeout,
MaxQueuePollingInterval = this.azureStorageOptions.MaxQueuePollingInterval,
Expand All @@ -202,6 +216,8 @@ internal AzureStorageOrchestrationServiceSettings GetAzureStorageOrchestrationSe
LoggerFactory = this.loggerFactory,
UseLegacyPartitionManagement = this.azureStorageOptions.UseLegacyPartitionManagement,
UseTablePartitionManagement = this.azureStorageOptions.UseTablePartitionManagement,
UseSeparateQueriesForEntities = this.useSeparateQueriesForEntities,
UseSeparateQueueForEntityWorkItems = this.useSeparateQueueForEntityWorkItems,
};

if (this.inConsumption)
Expand Down
10 changes: 9 additions & 1 deletion src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Threading;
using System.Threading.Tasks;
using DurableTask.Core;
using DurableTask.Core.Entities;
using DurableTask.Core.History;
using DurableTask.Core.Query;
using Newtonsoft.Json;
Expand Down Expand Up @@ -36,6 +37,7 @@ public class DurabilityProvider :
private readonly string name;
private readonly IOrchestrationService innerService;
private readonly IOrchestrationServiceClient innerServiceClient;
private readonly IEntityOrchestrationService entityOrchestrationService;
private readonly string connectionName;

/// <summary>
Expand All @@ -52,6 +54,7 @@ public DurabilityProvider(string storageProviderName, IOrchestrationService serv
this.name = storageProviderName ?? throw new ArgumentNullException(nameof(storageProviderName));
this.innerService = service ?? throw new ArgumentNullException(nameof(service));
this.innerServiceClient = serviceClient ?? throw new ArgumentNullException(nameof(serviceClient));
this.entityOrchestrationService = service as IEntityOrchestrationService;
this.connectionName = connectionName ?? throw new ArgumentNullException(connectionName);
}

Expand All @@ -64,7 +67,7 @@ public DurabilityProvider(string storageProviderName, IOrchestrationService serv
/// <summary>
/// Specifies whether the durability provider supports Durable Entities.
/// </summary>
public virtual bool SupportsEntities => false;
public virtual bool SupportsEntities => this.entityOrchestrationService != null;

/// <summary>
/// Specifies whether the backend's WaitForOrchestration is implemented without polling.
Expand Down Expand Up @@ -101,6 +104,11 @@ public DurabilityProvider(string storageProviderName, IOrchestrationService serv
/// </summary>
public virtual TimeSpan LongRunningTimerIntervalLength { get; set; }

/// <summary>
/// Returns the entity orchestration service, if this provider supports entities, or null otherwise.
/// </summary>
public virtual IEntityOrchestrationService EntityOrchestrationService => this.entityOrchestrationService;

/// <summary>
/// Event source name (e.g. DurableTask-AzureStorage).
/// </summary>
Expand Down
17 changes: 17 additions & 0 deletions src/WebJobs.Extensions.DurableTask/Options/DurableTaskOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,18 @@ public string HubName
/// </value>
public int? MaxConcurrentOrchestratorFunctions { get; set; } = null;

/// <summary>
/// Gets or sets the maximum number of entity functions that can be processed concurrently on a single host instance.
/// </summary>
/// <remarks>
/// Increasing entity function concurrency can result in increased throughput but can
/// also increase the total CPU and memory usage on a single worker instance.
/// </remarks>
/// <value>
/// A positive integer configured by the host.
/// </value>
public int? MaxConcurrentEntityFunctions { get; set; } = null;

/// <summary>
/// Gets or sets a value indicating whether to enable the local RPC endpoint managed by this extension.
/// </summary>
Expand Down Expand Up @@ -308,6 +320,11 @@ internal void Validate(INameResolver environmentVariableResolver, EndToEndTraceH
throw new InvalidOperationException($"{nameof(this.MaxConcurrentOrchestratorFunctions)} must be a positive integer value.");
}

if (this.MaxConcurrentEntityFunctions <= 0)
{
throw new InvalidOperationException($"{nameof(this.MaxConcurrentEntityFunctions)} must be a positive integer value.");
}

if (this.MaxEntityOperationBatchSize <= 0)
{
throw new InvalidOperationException($"{nameof(this.MaxEntityOperationBatchSize)} must be a positive integer value.");
Expand Down

0 comments on commit 10111e0

Please sign in to comment.