diff --git a/src/DistributedLock.Postgres/PostgresAdvisoryLock.cs b/src/DistributedLock.Postgres/PostgresAdvisoryLock.cs index 2a8f9617..068f9f58 100644 --- a/src/DistributedLock.Postgres/PostgresAdvisoryLock.cs +++ b/src/DistributedLock.Postgres/PostgresAdvisoryLock.cs @@ -46,11 +46,12 @@ private PostgresAdvisoryLock(bool isShared) // Our acquire command will use SET LOCAL to set up statement timeouts. This lasts until the end // of the current transaction instead of just the current batch if we're in a transaction. To make sure - // we don't leak those settings, in the case of a transaction we first set up a save point which we can + // we don't leak those settings, in the case of a transaction, we first set up a save point which we can // later roll back (taking the settings changes with it but NOT the lock). Because we can't confidently // roll back a save point without knowing that it has been set up, we start the save point in its own - // query before we try-catch - var needsSavePoint = await HasTransactionAsync(connection).ConfigureAwait(false); + // query before we try-catch. + var needsSavePoint = await ShouldDefineSavePoint(connection).ConfigureAwait(false); + if (needsSavePoint) { using var setSavePointCommand = connection.CreateCommand(); @@ -124,9 +125,7 @@ private PostgresAdvisoryLock(bool isShared) async ValueTask RollBackTransactionTimeoutVariablesIfNeededAsync(bool acquired) { if (needsSavePoint - // For transaction scoped locks, we can't roll back the save point on success because that will roll - // back our hold on the lock. It's ok to "leak" the savepoint in that case because it's an internally-owned - // transaction/connection and the savepoint will be cleaned up with the disposal of the transaction. + // For transaction scoped locks, we can't roll back the save point on success because that will roll back our hold on the lock. && !(acquired && UseTransactionScopedLock(connection))) { // attempt to clear the timeout variables we set @@ -182,13 +181,17 @@ private DatabaseCommand CreateAcquireCommand(DatabaseConnection connection, Post return command; } - private static async ValueTask HasTransactionAsync(DatabaseConnection connection) + private static async ValueTask ShouldDefineSavePoint(DatabaseConnection connection) { - if (connection.HasTransaction) { return true; } - if (!connection.IsExernallyOwned) { return false; } + // If the connection is internally-owned, we only define a save point if a transaction has been opened. + if (!connection.IsExernallyOwned) { return connection.HasTransaction; } + + // If the connection is externally-owned with an established transaction, we don't want to pollute it with a save point + // which we won't be able to release in case the lock will be acquired. + if (connection.HasTransaction) { return false; } - // If the connection is externally owned, then it might be part of a transaction that we can't - // see. In that case, the only real way to detect it is to begin a new one + // The externally-owned connection might still be part of a transaction that we can't see. + // In that case, the only real way to detect it is to begin a new one. try { await connection.BeginTransactionAsync().ConfigureAwait(false); @@ -199,6 +202,7 @@ private static async ValueTask HasTransactionAsync(DatabaseConnection conn } await connection.DisposeTransactionAsync().ConfigureAwait(false); + return false; } @@ -235,10 +239,9 @@ private static string AddKeyParametersAndGetKeyArguments(DatabaseCommand command } private static bool UseTransactionScopedLock(DatabaseConnection connection) => - // This implementation (similar to what we do for SQL Server) is based on the fact that we only create transactions on - // internally-owned connections when doing transaction-scoped locking, and we only support transaction-scoped locking on - // internally-owned connections (since there's no explicit release). - !connection.IsExernallyOwned && connection.HasTransaction; + // Transaction-scoped locking is supported on both externally-owned and internally-owned connections, + // as long as the connection has a transaction. + connection.HasTransaction; private static string AddPGLocksFilterParametersAndGetFilterExpression(DatabaseCommand command, PostgresAdvisoryLockKey key) { diff --git a/src/DistributedLock.Postgres/PostgresDistributedLock.Extensions.cs b/src/DistributedLock.Postgres/PostgresDistributedLock.Extensions.cs new file mode 100644 index 00000000..895b2619 --- /dev/null +++ b/src/DistributedLock.Postgres/PostgresDistributedLock.Extensions.cs @@ -0,0 +1,137 @@ +using Medallion.Threading.Internal; +using System.Data; + +namespace Medallion.Threading.Postgres; + +public partial class PostgresDistributedLock +{ + /// + /// Attempts to acquire a transaction-scoped advisory lock synchronously with an externally owned transaction. Usage: + /// + /// var transaction = /* create a DB transaction */ + /// + /// var isLockAcquired = myLock.TryAcquireWithTransaction(..., transaction, ...) + /// + /// if (isLockAcquired != null) + /// { + /// /* we have the lock! */ + /// + /// // Commit or Rollback the transaction, which in turn will release the lock + /// } + /// + /// + /// NOTE: The owner of the transaction is the responsible party for it - the owner must commit or rollback the transaction in order to release the acquired lock. + /// + /// The postgres advisory lock key which will be used to acquire the lock. + /// The externally owned transaction which will be used to acquire the lock. The owner of the transaction must commit or rollback it for the lock to be released. + /// How long to wait before giving up on the acquisition attempt. Defaults to 0. + /// Specifies a token by which the wait can be canceled + /// Whether the lock has been acquired + public static bool TryAcquireWithTransaction(PostgresAdvisoryLockKey key, IDbTransaction transaction, TimeSpan timeout = default, CancellationToken cancellationToken = default) => + SyncViaAsync.Run(state => TryAcquireWithTransactionAsyncInternal(state.key, state.transaction, state.timeout, state.cancellationToken), (key, transaction, timeout, cancellationToken)); + + /// + /// Acquires a transaction-scoped advisory lock synchronously, failing with if the attempt times out. Usage: + /// + /// var transaction = /* create a DB transaction */ + /// + /// myLock.AcquireWithTransaction(..., transaction, ...) + /// + /// /* we have the lock! */ + /// + /// // Commit or Rollback the transaction, which in turn will release the lock + /// + /// + /// NOTE: The owner of the transaction is the responsible party for it - the owner must commit or rollback the transaction in order to release the acquired lock. + /// + /// The postgres advisory lock key which will be used to acquire the lock. + /// The externally owned transaction which will be used to acquire the lock. The owner of the transaction must commit or rollback it for the lock to be released. + /// How long to wait before giving up on the acquisition attempt. Defaults to + /// Specifies a token by which the wait can be canceled + public static void AcquireWithTransaction(PostgresAdvisoryLockKey key, IDbTransaction transaction, TimeSpan? timeout = null, CancellationToken cancellationToken = default) => + SyncViaAsync.Run(state => AcquireWithTransactionAsyncInternal(state.key, state.transaction, state.timeout, state.cancellationToken), (key, transaction, timeout, cancellationToken)); + + /// + /// Attempts to acquire a transaction-scoped advisory lock asynchronously with an externally owned transaction. Usage: + /// + /// var transaction = /* create a DB transaction */ + /// + /// var isLockAcquired = await myLock.TryAcquireWithTransactionAsync(..., transaction, ...) + /// + /// if (isLockAcquired != null) + /// { + /// /* we have the lock! */ + /// + /// // Commit or Rollback the transaction, which in turn will release the lock + /// } + /// + /// + /// NOTE: The owner of the transaction is the responsible party for it - the owner must commit or rollback the transaction in order to release the acquired lock. + /// + /// The postgres advisory lock key which will be used to acquire the lock. + /// The externally owned transaction which will be used to acquire the lock. The owner of the transaction must commit or rollback it for the lock to be released. + /// How long to wait before giving up on the acquisition attempt. Defaults to 0. + /// Specifies a token by which the wait can be canceled + /// Whether the lock has been acquired + public static ValueTask TryAcquireWithTransactionAsync(PostgresAdvisoryLockKey key, IDbTransaction transaction, TimeSpan timeout = default, CancellationToken cancellationToken = default) => + TryAcquireWithTransactionAsyncInternal(key, transaction, timeout, cancellationToken); + + /// + /// Acquires a transaction-scoped advisory lock asynchronously, failing with if the attempt times out. Usage: + /// + /// var transaction = /* create a DB transaction */ + /// + /// await myLock.AcquireWithTransaction(..., transaction, ...) + /// + /// /* we have the lock! */ + /// + /// // Commit or Rollback the transaction, which in turn will release the lock + /// + /// + /// NOTE: The owner of the transaction is the responsible party for it - the owner must commit or rollback the transaction in order to release the acquired lock. + /// + /// The postgres advisory lock key which will be used to acquire the lock. + /// The externally owned transaction which will be used to acquire the lock. The owner of the transaction must commit or rollback it for the lock to be released. + /// How long to wait before giving up on the acquisition attempt. Defaults to + /// Specifies a token by which the wait can be canceled + public static ValueTask AcquireWithTransactionAsync(PostgresAdvisoryLockKey key, IDbTransaction transaction, TimeSpan? timeout = null, CancellationToken cancellationToken = default) => + AcquireWithTransactionAsyncInternal(key, transaction, timeout, cancellationToken); + + internal static ValueTask TryAcquireWithTransactionAsyncInternal(PostgresAdvisoryLockKey key, IDbTransaction transaction, TimeSpan timeout, CancellationToken cancellationToken) + { + if (key == null) { throw new ArgumentNullException(nameof(key)); } + if (transaction == null) { throw new ArgumentNullException(nameof(transaction)); } + + return TryAcquireAsync(); + + async ValueTask TryAcquireAsync() + { + var connection = new PostgresDatabaseConnection(transaction); + + await using (connection.ConfigureAwait(false)) + { + var lockAcquiredCookie = await PostgresAdvisoryLock.ExclusiveLock.TryAcquireAsync(connection, key.ToString(), timeout, cancellationToken).ConfigureAwait(false); + + return lockAcquiredCookie != null; + } + } + } + + internal static ValueTask AcquireWithTransactionAsyncInternal(PostgresAdvisoryLockKey key, IDbTransaction transaction, TimeSpan? timeout, CancellationToken cancellationToken) + { + if (key == null) { throw new ArgumentNullException(nameof(key)); } + if (transaction == null) { throw new ArgumentNullException(nameof(transaction)); } + + return AcquireAsync(); + + async ValueTask AcquireAsync() + { + var connection = new PostgresDatabaseConnection(transaction); + + await using (connection.ConfigureAwait(false)) + { + await PostgresAdvisoryLock.ExclusiveLock.TryAcquireAsync(connection, key.ToString(), timeout, cancellationToken).ThrowTimeoutIfNull().ConfigureAwait(false); + } + } + } +} diff --git a/src/DistributedLock.Postgres/PublicAPI.Unshipped.txt b/src/DistributedLock.Postgres/PublicAPI.Unshipped.txt index e69de29b..e80fc01b 100644 --- a/src/DistributedLock.Postgres/PublicAPI.Unshipped.txt +++ b/src/DistributedLock.Postgres/PublicAPI.Unshipped.txt @@ -0,0 +1,5 @@ +#nullable enable +static Medallion.Threading.Postgres.PostgresDistributedLock.AcquireWithTransaction(Medallion.Threading.Postgres.PostgresAdvisoryLockKey key, System.Data.IDbTransaction! transaction, System.TimeSpan? timeout = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> void +static Medallion.Threading.Postgres.PostgresDistributedLock.AcquireWithTransactionAsync(Medallion.Threading.Postgres.PostgresAdvisoryLockKey key, System.Data.IDbTransaction! transaction, System.TimeSpan? timeout = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask +static Medallion.Threading.Postgres.PostgresDistributedLock.TryAcquireWithTransaction(Medallion.Threading.Postgres.PostgresAdvisoryLockKey key, System.Data.IDbTransaction! transaction, System.TimeSpan timeout = default(System.TimeSpan), System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> bool +static Medallion.Threading.Postgres.PostgresDistributedLock.TryAcquireWithTransactionAsync(Medallion.Threading.Postgres.PostgresAdvisoryLockKey key, System.Data.IDbTransaction! transaction, System.TimeSpan timeout = default(System.TimeSpan), System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask \ No newline at end of file diff --git a/src/DistributedLock.Tests/Tests/Postgres/PostgresDistributedLockExtensionsTest.cs b/src/DistributedLock.Tests/Tests/Postgres/PostgresDistributedLockExtensionsTest.cs new file mode 100644 index 00000000..ae8de36e --- /dev/null +++ b/src/DistributedLock.Tests/Tests/Postgres/PostgresDistributedLockExtensionsTest.cs @@ -0,0 +1,48 @@ +using Medallion.Threading.Postgres; +using Npgsql; +using NUnit.Framework; + +namespace Medallion.Threading.Tests.Postgres; + +internal class PostgresDistributedLockExtensionsTest +{ + [Test] + public void TestValidatesConstructorArguments() + { + Assert.Throws(() => PostgresDistributedLock.TryAcquireWithTransaction(default, null!)); + Assert.ThrowsAsync(async () => await PostgresDistributedLock.TryAcquireWithTransactionAsync(default, null!).ConfigureAwait(false)); + Assert.Throws(() => PostgresDistributedLock.AcquireWithTransaction(default, null!)); + Assert.ThrowsAsync(async () => await PostgresDistributedLock.AcquireWithTransactionAsync(default, null!).ConfigureAwait(false)); + } + + [Test] + public async Task TestWorksWithExternalTransaction() + { + bool isLockAcquired; + + var key = new PostgresAdvisoryLockKey(0); + + using var connection = new NpgsqlConnection(TestingPostgresDb.DefaultConnectionString); + await connection.OpenAsync(); + + using (var transaction = connection.BeginTransaction()) + { + PostgresDistributedLock.AcquireWithTransaction(key, transaction); + + isLockAcquired = PostgresDistributedLock.TryAcquireWithTransaction(key, transaction); + Assert.That(isLockAcquired, Is.False); + + transaction.Rollback(); + } + + using (var transaction = connection.BeginTransaction()) + { + isLockAcquired = await PostgresDistributedLock.TryAcquireWithTransactionAsync(key, transaction).ConfigureAwait(false); + Assert.That(isLockAcquired, Is.True); + + Assert.ThrowsAsync(async () => await PostgresDistributedLock.AcquireWithTransactionAsync(key, transaction, TimeSpan.FromMilliseconds(10)).ConfigureAwait(false)); + + transaction.Commit(); + } + } +}