-
Notifications
You must be signed in to change notification settings - Fork 202
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
#213 Postgres: Add support for transaction-scoped advisory locks with external transactions #222
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -46,11 +46,12 @@ private PostgresAdvisoryLock(bool isShared) | |
|
||
// Our acquire command will use SET LOCAL to set up statement timeouts. This lasts until the end | ||
// of the current transaction instead of just the current batch if we're in a transaction. To make sure | ||
// we don't leak those settings, in the case of a transaction we first set up a save point which we can | ||
// we don't leak those settings, in the case of a transaction, we first set up a save point which we can | ||
// later roll back (taking the settings changes with it but NOT the lock). Because we can't confidently | ||
// roll back a save point without knowing that it has been set up, we start the save point in its own | ||
// query before we try-catch | ||
var needsSavePoint = await HasTransactionAsync(connection).ConfigureAwait(false); | ||
// query before we try-catch. | ||
var needsSavePoint = await ShouldDefineSavePoint(connection).ConfigureAwait(false); | ||
|
||
if (needsSavePoint) | ||
{ | ||
using var setSavePointCommand = connection.CreateCommand(); | ||
|
@@ -124,9 +125,7 @@ private PostgresAdvisoryLock(bool isShared) | |
async ValueTask RollBackTransactionTimeoutVariablesIfNeededAsync(bool acquired) | ||
{ | ||
if (needsSavePoint | ||
// For transaction scoped locks, we can't roll back the save point on success because that will roll | ||
// back our hold on the lock. It's ok to "leak" the savepoint in that case because it's an internally-owned | ||
// transaction/connection and the savepoint will be cleaned up with the disposal of the transaction. | ||
// For transaction scoped locks, we can't roll back the save point on success because that will roll back our hold on the lock. | ||
&& !(acquired && UseTransactionScopedLock(connection))) | ||
{ | ||
// attempt to clear the timeout variables we set | ||
|
@@ -182,13 +181,17 @@ private DatabaseCommand CreateAcquireCommand(DatabaseConnection connection, Post | |
return command; | ||
} | ||
|
||
private static async ValueTask<bool> HasTransactionAsync(DatabaseConnection connection) | ||
private static async ValueTask<bool> ShouldDefineSavePoint(DatabaseConnection connection) | ||
{ | ||
if (connection.HasTransaction) { return true; } | ||
if (!connection.IsExernallyOwned) { return false; } | ||
// If the connection is internally-owned, we only define a save point if a transaction has been opened. | ||
if (!connection.IsExernallyOwned) { return connection.HasTransaction; } | ||
|
||
// If the connection is externally-owned with an established transaction, we don't want to pollute it with a save point | ||
// which we won't be able to release in case the lock will be acquired. | ||
if (connection.HasTransaction) { return false; } | ||
|
||
// If the connection is externally owned, then it might be part of a transaction that we can't | ||
// see. In that case, the only real way to detect it is to begin a new one | ||
// The externally-owned connection might still be part of a transaction that we can't see. | ||
// In that case, the only real way to detect it is to begin a new one. | ||
try | ||
{ | ||
await connection.BeginTransactionAsync().ConfigureAwait(false); | ||
|
@@ -199,6 +202,7 @@ private static async ValueTask<bool> HasTransactionAsync(DatabaseConnection conn | |
} | ||
|
||
await connection.DisposeTransactionAsync().ConfigureAwait(false); | ||
|
||
return false; | ||
} | ||
|
||
|
@@ -235,10 +239,9 @@ private static string AddKeyParametersAndGetKeyArguments(DatabaseCommand command | |
} | ||
|
||
private static bool UseTransactionScopedLock(DatabaseConnection connection) => | ||
// This implementation (similar to what we do for SQL Server) is based on the fact that we only create transactions on | ||
// internally-owned connections when doing transaction-scoped locking, and we only support transaction-scoped locking on | ||
// internally-owned connections (since there's no explicit release). | ||
!connection.IsExernallyOwned && connection.HasTransaction; | ||
// Transaction-scoped locking is supported on both externally-owned and internally-owned connections, | ||
// as long as the connection has a transaction. | ||
connection.HasTransaction; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is right but the comment is somewhat misleading. My understanding is that there is no path to get here with an external connection that explicitly has a transaction (vs. implicitly which is tested for above) except in the case where the caller deliberately went through one of the transactional locking APIs. Do you concur? If so, let's be clear about that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I concur and changed the comment in the new PR, I hope it's clearer |
||
|
||
private static string AddPGLocksFilterParametersAndGetFilterExpression(DatabaseCommand command, PostgresAdvisoryLockKey key) | ||
{ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
using Medallion.Threading.Internal; | ||
using System.Data; | ||
|
||
namespace Medallion.Threading.Postgres; | ||
|
||
public partial class PostgresDistributedLock | ||
{ | ||
/// <summary> | ||
/// Attempts to acquire a transaction-scoped advisory lock synchronously with an externally owned transaction. Usage: | ||
/// <code> | ||
/// var transaction = /* create a DB transaction */ | ||
/// | ||
/// var isLockAcquired = myLock.TryAcquireWithTransaction(..., transaction, ...) | ||
/// | ||
/// if (isLockAcquired != null) | ||
/// { | ||
/// /* we have the lock! */ | ||
/// | ||
/// // Commit or Rollback the transaction, which in turn will release the lock | ||
/// } | ||
/// </code> | ||
/// | ||
/// NOTE: The owner of the transaction is the responsible party for it - the owner must commit or rollback the transaction in order to release the acquired lock. | ||
/// </summary> | ||
/// <param name="key">The postgres advisory lock key which will be used to acquire the lock.</param> | ||
/// <param name="transaction">The externally owned transaction which will be used to acquire the lock. The owner of the transaction must commit or rollback it for the lock to be released.</param> | ||
/// <param name="timeout">How long to wait before giving up on the acquisition attempt. Defaults to 0.</param> | ||
/// <param name="cancellationToken">Specifies a token by which the wait can be canceled</param> | ||
/// <returns>Whether the lock has been acquired</returns> | ||
public static bool TryAcquireWithTransaction(PostgresAdvisoryLockKey key, IDbTransaction transaction, TimeSpan timeout = default, CancellationToken cancellationToken = default) => | ||
SyncViaAsync.Run(state => TryAcquireWithTransactionAsyncInternal(state.key, state.transaction, state.timeout, state.cancellationToken), (key, transaction, timeout, cancellationToken)); | ||
|
||
/// <summary> | ||
/// Acquires a transaction-scoped advisory lock synchronously, failing with <see cref="TimeoutException"/> if the attempt times out. Usage: | ||
/// <code> | ||
/// var transaction = /* create a DB transaction */ | ||
/// | ||
/// myLock.AcquireWithTransaction(..., transaction, ...) | ||
/// | ||
/// /* we have the lock! */ | ||
/// | ||
/// // Commit or Rollback the transaction, which in turn will release the lock | ||
/// </code> | ||
/// | ||
/// NOTE: The owner of the transaction is the responsible party for it - the owner must commit or rollback the transaction in order to release the acquired lock. | ||
/// </summary> | ||
/// <param name="key">The postgres advisory lock key which will be used to acquire the lock.</param> | ||
/// <param name="transaction">The externally owned transaction which will be used to acquire the lock. The owner of the transaction must commit or rollback it for the lock to be released.</param> | ||
/// <param name="timeout">How long to wait before giving up on the acquisition attempt. Defaults to <see cref="Timeout.InfiniteTimeSpan"/></param> | ||
/// <param name="cancellationToken">Specifies a token by which the wait can be canceled</param> | ||
public static void AcquireWithTransaction(PostgresAdvisoryLockKey key, IDbTransaction transaction, TimeSpan? timeout = null, CancellationToken cancellationToken = default) => | ||
SyncViaAsync.Run(state => AcquireWithTransactionAsyncInternal(state.key, state.transaction, state.timeout, state.cancellationToken), (key, transaction, timeout, cancellationToken)); | ||
|
||
/// <summary> | ||
/// Attempts to acquire a transaction-scoped advisory lock asynchronously with an externally owned transaction. Usage: | ||
/// <code> | ||
/// var transaction = /* create a DB transaction */ | ||
/// | ||
/// var isLockAcquired = await myLock.TryAcquireWithTransactionAsync(..., transaction, ...) | ||
/// | ||
/// if (isLockAcquired != null) | ||
/// { | ||
/// /* we have the lock! */ | ||
/// | ||
/// // Commit or Rollback the transaction, which in turn will release the lock | ||
/// } | ||
/// </code> | ||
/// | ||
/// NOTE: The owner of the transaction is the responsible party for it - the owner must commit or rollback the transaction in order to release the acquired lock. | ||
/// </summary> | ||
/// <param name="key">The postgres advisory lock key which will be used to acquire the lock.</param> | ||
/// <param name="transaction">The externally owned transaction which will be used to acquire the lock. The owner of the transaction must commit or rollback it for the lock to be released.</param> | ||
/// <param name="timeout">How long to wait before giving up on the acquisition attempt. Defaults to 0.</param> | ||
/// <param name="cancellationToken">Specifies a token by which the wait can be canceled</param> | ||
/// <returns>Whether the lock has been acquired</returns> | ||
public static ValueTask<bool> TryAcquireWithTransactionAsync(PostgresAdvisoryLockKey key, IDbTransaction transaction, TimeSpan timeout = default, CancellationToken cancellationToken = default) => | ||
TryAcquireWithTransactionAsyncInternal(key, transaction, timeout, cancellationToken); | ||
|
||
/// <summary> | ||
/// Acquires a transaction-scoped advisory lock asynchronously, failing with <see cref="TimeoutException"/> if the attempt times out. Usage: | ||
/// <code> | ||
/// var transaction = /* create a DB transaction */ | ||
/// | ||
/// await myLock.AcquireWithTransaction(..., transaction, ...) | ||
/// | ||
/// /* we have the lock! */ | ||
/// | ||
/// // Commit or Rollback the transaction, which in turn will release the lock | ||
/// </code> | ||
/// | ||
/// NOTE: The owner of the transaction is the responsible party for it - the owner must commit or rollback the transaction in order to release the acquired lock. | ||
/// </summary> | ||
/// <param name="key">The postgres advisory lock key which will be used to acquire the lock.</param> | ||
/// <param name="transaction">The externally owned transaction which will be used to acquire the lock. The owner of the transaction must commit or rollback it for the lock to be released.</param> | ||
/// <param name="timeout">How long to wait before giving up on the acquisition attempt. Defaults to <see cref="Timeout.InfiniteTimeSpan"/></param> | ||
/// <param name="cancellationToken">Specifies a token by which the wait can be canceled</param> | ||
public static ValueTask AcquireWithTransactionAsync(PostgresAdvisoryLockKey key, IDbTransaction transaction, TimeSpan? timeout = null, CancellationToken cancellationToken = default) => | ||
AcquireWithTransactionAsyncInternal(key, transaction, timeout, cancellationToken); | ||
|
||
internal static ValueTask<bool> TryAcquireWithTransactionAsyncInternal(PostgresAdvisoryLockKey key, IDbTransaction transaction, TimeSpan timeout = default, CancellationToken cancellationToken = default) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This internal method shouldn't have default parameter values There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
{ | ||
if (key == null) { throw new ArgumentNullException(nameof(key)); } | ||
if (transaction == null) { throw new ArgumentNullException(nameof(transaction)); } | ||
|
||
return TryAcquireAsync(); | ||
|
||
async ValueTask<bool> TryAcquireAsync() | ||
{ | ||
var connection = new PostgresDatabaseConnection(transaction); | ||
|
||
await using (connection.ConfigureAwait(false)) | ||
{ | ||
var handle = await PostgresAdvisoryLock.ExclusiveLock.TryAcquireAsync(connection, key.ToString(), timeout, cancellationToken).ConfigureAwait(false); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I'm reading the code correctly, this isn't actually a handle but merely a "Cookie" object which for Postgres is just a sentinel value. Let's call this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's correct, I changed the name |
||
|
||
return handle != null; | ||
} | ||
} | ||
} | ||
|
||
internal static ValueTask AcquireWithTransactionAsyncInternal(PostgresAdvisoryLockKey key, IDbTransaction transaction, TimeSpan? timeout = null, CancellationToken cancellationToken = default) | ||
{ | ||
if (key == null) { throw new ArgumentNullException(nameof(key)); } | ||
if (transaction == null) { throw new ArgumentNullException(nameof(transaction)); } | ||
|
||
return AcquireAsync(); | ||
|
||
async ValueTask AcquireAsync() | ||
{ | ||
var connection = new PostgresDatabaseConnection(transaction); | ||
|
||
await using (connection.ConfigureAwait(false)) | ||
{ | ||
await PostgresAdvisoryLock.ExclusiveLock.TryAcquireAsync(connection, key.ToString(), timeout, cancellationToken).ThrowTimeoutIfNull().ConfigureAwait(false); | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
#nullable enable | ||
static Medallion.Threading.Postgres.PostgresDistributedLock.AcquireWithTransaction(Medallion.Threading.Postgres.PostgresAdvisoryLockKey key, System.Data.IDbTransaction! transaction, System.TimeSpan? timeout = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> void | ||
static Medallion.Threading.Postgres.PostgresDistributedLock.AcquireWithTransactionAsync(Medallion.Threading.Postgres.PostgresAdvisoryLockKey key, System.Data.IDbTransaction! transaction, System.TimeSpan? timeout = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask | ||
static Medallion.Threading.Postgres.PostgresDistributedLock.TryAcquireWithTransaction(Medallion.Threading.Postgres.PostgresAdvisoryLockKey key, System.Data.IDbTransaction! transaction, System.TimeSpan timeout = default(System.TimeSpan), System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> bool | ||
static Medallion.Threading.Postgres.PostgresDistributedLock.TryAcquireWithTransactionAsync(Medallion.Threading.Postgres.PostgresAdvisoryLockKey key, System.Data.IDbTransaction! transaction, System.TimeSpan timeout = default(System.TimeSpan), System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask<bool> |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
using Medallion.Threading.Postgres; | ||
using Npgsql; | ||
using NUnit.Framework; | ||
|
||
namespace Medallion.Threading.Tests.Postgres; | ||
|
||
internal class PostgresDistributedLockExtensionsTest | ||
{ | ||
[Test] | ||
public void TestValidatesConstructorArguments() | ||
{ | ||
Assert.Throws<ArgumentNullException>(() => PostgresDistributedLock.TryAcquireWithTransaction(default, null!)); | ||
Assert.ThrowsAsync<ArgumentNullException>(async () => await PostgresDistributedLock.TryAcquireWithTransactionAsync(default, null!).ConfigureAwait(false)); | ||
Assert.Throws<ArgumentNullException>(() => PostgresDistributedLock.AcquireWithTransaction(default, null!)); | ||
Assert.ThrowsAsync<ArgumentNullException>(async () => await PostgresDistributedLock.AcquireWithTransactionAsync(default, null!).ConfigureAwait(false)); | ||
} | ||
|
||
[Test] | ||
public async Task TestWorksWithExternalTransaction() | ||
{ | ||
bool isLockAcquired; | ||
|
||
var key = new PostgresAdvisoryLockKey(0); | ||
|
||
using var connection = new NpgsqlConnection(TestingPostgresDb.DefaultConnectionString); | ||
await connection.OpenAsync(); | ||
|
||
using (var transaction = connection.BeginTransaction()) | ||
{ | ||
PostgresDistributedLock.AcquireWithTransaction(key, transaction); | ||
|
||
isLockAcquired = PostgresDistributedLock.TryAcquireWithTransaction(key, transaction); | ||
Assert.That(isLockAcquired, Is.False); | ||
|
||
transaction.Rollback(); | ||
} | ||
|
||
using (var transaction = connection.BeginTransaction()) | ||
{ | ||
isLockAcquired = await PostgresDistributedLock.TryAcquireWithTransactionAsync(key, transaction).ConfigureAwait(false); | ||
Assert.That(isLockAcquired, Is.True); | ||
|
||
Assert.ThrowsAsync<TimeoutException>(async () => await PostgresDistributedLock.AcquireWithTransactionAsync(key, transaction, TimeSpan.FromMilliseconds(10)).ConfigureAwait(false)); | ||
|
||
transaction.Commit(); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking about this more, I'm not sure I agree with the logic for two reasons.
RollBackTransactionTimeoutVariablesIfNeededAsync
The problem with not having a save point is that we end up polluting the transaction by setting the statement_timeout and lock_timeout values; the point of the savepoint is to clean those up before we return to the caller.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added more details to the comments in this method in the new PR, I hope it's clearer.
RollBackTransactionTimeoutVariablesIfNeededAsync
that checks if the lock has been acquired and if the lock is transactional - if that's the case, then we can't rollback a savepoint, since it will release the lock (it's true, I checked), therefore there is no point in creating a save point. Although, you are right when it comes to polluting the transaction by setting the statement_timeout and lock_timeout values in this scenario - how do you think we should handle it, if at all? A warning in the public comments of the API will suffice? Or should we write code that try to restore the previous values?if (connection.HasTransaction) { return false; }
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are correct. Unfortunately this creates another problem which is that we are now polluting the external connection's
statement_timeout
andlock_timeout
state.So for the case of an externally-owned transaction, I think we need some code that does this:
RollBackTransactionTimeoutVariablesIfNeededAsync
, we can pass in these values. In that function it would do something like this:Thinking about it, I wonder if we should just drop the SAVEPOINT logic altogether in favor of this. If so, we could replace
ShouldDefineSavePoint
andRollBackTransactionTimeoutVariablesIfNeededAsync
with something like this:I think it would be good to add a unit test case for this in the case of the new API. Something like:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All right, I will start looking into replacing save point logic