Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable supply of pre-provisoned AWS clients #1163

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/providers/WorkflowCore.Providers.AWS/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,18 @@ services.AddWorkflow(cfg =>
If any AWS resources do not exists, they will be automatcially created. By default, all DynamoDB tables and indexes will be provisioned with a throughput of 1, you can modify these values from the AWS console.
You may also specify a prefix for the dynamo table names.

If you have a preconfigured dynamoClient, you can pass this in instead of the credentials and config
```C#
var client = new AmazonDynamoDBClient();
var sqsClient = new AmazonSQSClient();
services.AddWorkflow(cfg =>
{
cfg.UseAwsDynamoPersistenceWithProvisionedClient(client, "table-prefix");
cfg.UseAwsDynamoLockingWithProvisionedClient(client, "workflow-core-locks");
cfg.UseAwsSimpleQueueServiceWithProvisionedClient(sqsClient, "queues-prefix");
});
```


## Usage (Kinesis)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using Amazon;
using Amazon.DynamoDBv2;
using Amazon.Kinesis;
using Amazon.Runtime;
using Amazon.SQS;
using Microsoft.Extensions.Logging;
Expand All @@ -15,28 +16,55 @@ public static class ServiceCollectionExtensions
{
public static WorkflowOptions UseAwsSimpleQueueService(this WorkflowOptions options, AWSCredentials credentials, AmazonSQSConfig config, string queuesPrefix = "workflowcore")
{
options.UseQueueProvider(sp => new SQSQueueProvider(credentials, config, sp.GetService<ILoggerFactory>(), queuesPrefix));
var sqsClient = new AmazonSQSClient(credentials, config);
return options.UseAwsSimpleQueueServiceWithProvisionedClient(sqsClient, queuesPrefix);
}

public static WorkflowOptions UseAwsSimpleQueueServiceWithProvisionedClient(this WorkflowOptions options, AmazonSQSClient sqsClient, string queuesPrefix = "workflowcore")
{
options.UseQueueProvider(sp => new SQSQueueProvider(sqsClient, sp.GetService<ILoggerFactory>(), queuesPrefix));
return options;
}

public static WorkflowOptions UseAwsDynamoLocking(this WorkflowOptions options, AWSCredentials credentials, AmazonDynamoDBConfig config, string tableName)
{
options.UseDistributedLockManager(sp => new DynamoLockProvider(credentials, config, tableName, sp.GetService<ILoggerFactory>(), sp.GetService<IDateTimeProvider>()));
var dbClient = new AmazonDynamoDBClient(credentials, config);
return options.UseAwsDynamoLockingWithProvisionedClient(dbClient, tableName);
}

public static WorkflowOptions UseAwsDynamoLockingWithProvisionedClient (this WorkflowOptions options, AmazonDynamoDBClient dynamoClient, string tableName)
{
options.UseDistributedLockManager(sp => new DynamoLockProvider(dynamoClient, tableName, sp.GetService<ILoggerFactory>(), sp.GetService<IDateTimeProvider>()));
return options;
}

public static WorkflowOptions UseAwsDynamoPersistence(this WorkflowOptions options, AWSCredentials credentials, AmazonDynamoDBConfig config, string tablePrefix)
{
options.Services.AddTransient<IDynamoDbProvisioner>(sp => new DynamoDbProvisioner(credentials, config, tablePrefix, sp.GetService<ILoggerFactory>()));
options.UsePersistence(sp => new DynamoPersistenceProvider(credentials, config, sp.GetService<IDynamoDbProvisioner>(), tablePrefix, sp.GetService<ILoggerFactory>()));
var dbClient = new AmazonDynamoDBClient(credentials, config);
return options.UseAwsDynamoPersistenceWithProvisionedClient(dbClient, tablePrefix);
}

public static WorkflowOptions UseAwsDynamoPersistenceWithProvisionedClient(this WorkflowOptions options, AmazonDynamoDBClient dynamoClient, string tablePrefix)
{
options.Services.AddTransient<IDynamoDbProvisioner>(sp => new DynamoDbProvisioner(dynamoClient, tablePrefix, sp.GetService<ILoggerFactory>()));
options.UsePersistence(sp => new DynamoPersistenceProvider(dynamoClient, sp.GetService<IDynamoDbProvisioner>(), tablePrefix, sp.GetService<ILoggerFactory>()));
return options;
}

public static WorkflowOptions UseAwsKinesis(this WorkflowOptions options, AWSCredentials credentials, RegionEndpoint region, string appName, string streamName)
{
options.Services.AddTransient<IKinesisTracker>(sp => new KinesisTracker(credentials, region, "workflowcore_kinesis", sp.GetService<ILoggerFactory>()));
options.Services.AddTransient<IKinesisStreamConsumer>(sp => new KinesisStreamConsumer(credentials, region, sp.GetService<IKinesisTracker>(), sp.GetService<IDistributedLockProvider>(), sp.GetService<ILoggerFactory>(), sp.GetService<IDateTimeProvider>()));
options.UseEventHub(sp => new KinesisProvider(credentials, region, appName, streamName, sp.GetService<IKinesisStreamConsumer>(), sp.GetService<ILoggerFactory>()));
var kinesisClient = new AmazonKinesisClient(credentials, region);
var dynamoClient = new AmazonDynamoDBClient(credentials, region);

return options.UseAwsKinesisWithProvisionedClients(kinesisClient, dynamoClient,appName, streamName);

}

public static WorkflowOptions UseAwsKinesisWithProvisionedClients(this WorkflowOptions options, AmazonKinesisClient kinesisClient, AmazonDynamoDBClient dynamoDbClient, string appName, string streamName)
{
options.Services.AddTransient<IKinesisTracker>(sp => new KinesisTracker(dynamoDbClient, "workflowcore_kinesis", sp.GetService<ILoggerFactory>()));
options.Services.AddTransient<IKinesisStreamConsumer>(sp => new KinesisStreamConsumer(kinesisClient, sp.GetService<IKinesisTracker>(), sp.GetService<IDistributedLockProvider>(), sp.GetService<ILoggerFactory>(), sp.GetService<IDateTimeProvider>()));
options.UseEventHub(sp => new KinesisProvider(kinesisClient, appName, streamName, sp.GetService<IKinesisStreamConsumer>(), sp.GetService<ILoggerFactory>()));
return options;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ public class DynamoDbProvisioner : IDynamoDbProvisioner
private readonly IAmazonDynamoDB _client;
private readonly string _tablePrefix;

public DynamoDbProvisioner(AWSCredentials credentials, AmazonDynamoDBConfig config, string tablePrefix, ILoggerFactory logFactory)
public DynamoDbProvisioner(AmazonDynamoDBClient dynamoDBClient, string tablePrefix, ILoggerFactory logFactory)
{
_logger = logFactory.CreateLogger<DynamoDbProvisioner>();
_client = new AmazonDynamoDBClient(credentials, config);
_client = dynamoDBClient;
_tablePrefix = tablePrefix;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ public class DynamoLockProvider : IDistributedLockProvider
private readonly AutoResetEvent _mutex = new AutoResetEvent(true);
private readonly IDateTimeProvider _dateTimeProvider;

public DynamoLockProvider(AWSCredentials credentials, AmazonDynamoDBConfig config, string tableName, ILoggerFactory logFactory, IDateTimeProvider dateTimeProvider)
public DynamoLockProvider(AmazonDynamoDBClient dynamoDBClient, string tableName, ILoggerFactory logFactory, IDateTimeProvider dateTimeProvider)
{
_logger = logFactory.CreateLogger<DynamoLockProvider>();
_client = new AmazonDynamoDBClient(credentials, config);
_client = dynamoDBClient;
_localLocks = new List<string>();
_tableName = tableName;
_nodeId = Guid.NewGuid().ToString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ public class DynamoPersistenceProvider : IPersistenceProvider

public bool SupportsScheduledCommands => false;

public DynamoPersistenceProvider(AWSCredentials credentials, AmazonDynamoDBConfig config, IDynamoDbProvisioner provisioner, string tablePrefix, ILoggerFactory logFactory)
public DynamoPersistenceProvider(AmazonDynamoDBClient dynamoDBClient, IDynamoDbProvisioner provisioner, string tablePrefix, ILoggerFactory logFactory)
{
_logger = logFactory.CreateLogger<DynamoPersistenceProvider>();
_client = new AmazonDynamoDBClient(credentials, config);
_client = dynamoDBClient;
_tablePrefix = tablePrefix;
_provisioner = provisioner;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ public class KinesisProvider : ILifeCycleEventHub
private readonly int _defaultShardCount = 1;
private bool _started = false;

public KinesisProvider(AWSCredentials credentials, RegionEndpoint region, string appName, string streamName, IKinesisStreamConsumer consumer, ILoggerFactory logFactory)
public KinesisProvider(AmazonKinesisClient kinesisClient, string appName, string streamName, IKinesisStreamConsumer consumer, ILoggerFactory logFactory)
{
_logger = logFactory.CreateLogger(GetType());
_appName = appName;
_streamName = streamName;
_consumer = consumer;
_serializer = new JsonSerializer();
_serializer.TypeNameHandling = TypeNameHandling.All;
_client = new AmazonKinesisClient(credentials, region);
_client = kinesisClient;
}

public async Task PublishNotification(LifeCycleEvent evt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ public class KinesisStreamConsumer : IKinesisStreamConsumer, IDisposable
private ICollection<ShardSubscription> _subscribers = new HashSet<ShardSubscription>();
private readonly IDateTimeProvider _dateTimeProvider;

public KinesisStreamConsumer(AWSCredentials credentials, RegionEndpoint region, IKinesisTracker tracker, IDistributedLockProvider lockManager, ILoggerFactory logFactory, IDateTimeProvider dateTimeProvider)
public KinesisStreamConsumer(AmazonKinesisClient kinesisClient, IKinesisTracker tracker, IDistributedLockProvider lockManager, ILoggerFactory logFactory, IDateTimeProvider dateTimeProvider)
{
_logger = logFactory.CreateLogger(GetType());
_tracker = tracker;
_lockManager = lockManager;
_client = new AmazonKinesisClient(credentials, region);
_client = kinesisClient;
_processTask = new Task(Process);
_processTask.Start();
_dateTimeProvider = dateTimeProvider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ public class KinesisTracker : IKinesisTracker
private readonly string _tableName;
private bool _tableConfirmed = false;

public KinesisTracker(AWSCredentials credentials, RegionEndpoint region, string tableName, ILoggerFactory logFactory)
public KinesisTracker(AmazonDynamoDBClient client, string tableName, ILoggerFactory logFactory)
{
_logger = logFactory.CreateLogger(GetType());
_client = new AmazonDynamoDBClient(credentials, region);
_client = client;
_tableName = tableName;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ public class SQSQueueProvider : IQueueProvider

public bool IsDequeueBlocking => true;

public SQSQueueProvider(AWSCredentials credentials, AmazonSQSConfig config, ILoggerFactory logFactory, string queuesPrefix)
public SQSQueueProvider(AmazonSQSClient sqsClient, ILoggerFactory logFactory, string queuesPrefix)
{
_logger = logFactory.CreateLogger<SQSQueueProvider>();
_client = new AmazonSQSClient(credentials, config);
_client = sqsClient;
_queuesPrefix = queuesPrefix;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ protected override IPersistenceProvider Subject
if (_subject == null)
{
var cfg = new AmazonDynamoDBConfig { ServiceURL = DynamoDbDockerSetup.ConnectionString };
var provisioner = new DynamoDbProvisioner(DynamoDbDockerSetup.Credentials, cfg, "unittests", new LoggerFactory());
var client = new DynamoPersistenceProvider(DynamoDbDockerSetup.Credentials, cfg, provisioner, "unittests", new LoggerFactory());
var dbClient = new AmazonDynamoDBClient(DynamoDbDockerSetup.Credentials, cfg);
var provisioner = new DynamoDbProvisioner(dbClient, "unittests", new LoggerFactory());
var client = new DynamoPersistenceProvider(dbClient, provisioner, "unittests", new LoggerFactory());
client.EnsureStoreExists();
_subject = client;
}
Expand Down