Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ The library is split into a provider-agnostic core and three database provider p

### Distributed advisory locks flow

1. `DbContextDistributedLockExtensions.AcquireDistributedLockAsync()` resolves `IAdvisoryLockProvider` from `ILockingProvider.AdvisoryLockProvider`, opens the connection if needed, calls `DistributedLockRegistry.RegisterOrThrow` to prevent double-acquisition on the same context+connection, then delegates to the provider.
1. `DatabaseFacadeDistributedLockExtensions.AcquireDistributedLockAsync()` (called as `ctx.Database.AcquireDistributedLockAsync(...)`) resolves `IAdvisoryLockProvider` from `ILockingProvider.AdvisoryLockProvider`, opens the connection if needed, calls `DistributedLockRegistry.RegisterOrThrow` to prevent double-acquisition on the same context+connection, then delegates to the provider.
2. Each provider sends native advisory lock SQL (`pg_advisory_lock`, MySQL `GET_LOCK`, SQL Server `sp_getapplock`).
3. The returned `IDistributedLockHandle` releases the lock on dispose via a captured callback; `DistributedLockCleanupInterceptor` performs best-effort cleanup if handles are not disposed before connection close.

Expand Down
16 changes: 8 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,30 +135,30 @@ No transaction is required.

```csharp
// Acquire — blocks until available (optional timeout)
await using var handle = await ctx.AcquireDistributedLockAsync("invoice:generate");
await using var handle = await ctx.Database.AcquireDistributedLockAsync("invoice:generate");
// ... critical section ...
// lock released automatically on dispose

// With a timeout — throws LockTimeoutException if not acquired within 5 s
await using var handle = await ctx.AcquireDistributedLockAsync(
await using var handle = await ctx.Database.AcquireDistributedLockAsync(
"report:daily", TimeSpan.FromSeconds(5));

// With cancellation token
await using var handle = await ctx.AcquireDistributedLockAsync(
await using var handle = await ctx.Database.AcquireDistributedLockAsync(
"report:daily", timeout: null, cancellationToken: ct);

// TryAcquire — returns null immediately if already held
var handle = await ctx.TryAcquireDistributedLockAsync("invoice:generate");
var handle = await ctx.Database.TryAcquireDistributedLockAsync("invoice:generate");
if (handle is null)
return Results.Conflict("Another process is generating the invoice.");
await using (handle) { /* critical section */ }

// Synchronous variants are also available
using var handle = ctx.AcquireDistributedLock("report:daily");
var handle = ctx.TryAcquireDistributedLock("report:daily");
using var handle = ctx.Database.AcquireDistributedLock("report:daily");
var handle = ctx.Database.TryAcquireDistributedLock("report:daily");

// Check support at runtime
if (ctx.SupportsDistributedLocks()) { ... }
if (ctx.Database.SupportsDistributedLocks()) { ... }
```

### Lock keys
Expand Down Expand Up @@ -186,7 +186,7 @@ Keys are plain strings, up to **255 characters**. The library handles provider-s
```csharp
try
{
await using var handle = await ctx.AcquireDistributedLockAsync(
await using var handle = await ctx.Database.AcquireDistributedLockAsync(
"report:daily", TimeSpan.FromSeconds(5));
}
catch (LockTimeoutException)
Expand Down
4 changes: 2 additions & 2 deletions samples/InventoryApi/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@
"/inventory/snapshot",
async (InventoryDbContext db) =>
{
await using var handle = await db.TryAcquireDistributedLockAsync("inventory:snapshot");
await using var handle = await db.Database.TryAcquireDistributedLockAsync("inventory:snapshot");
if (handle is null)
return Results.Conflict(new { error = "A snapshot is already in progress." });

Expand Down Expand Up @@ -234,7 +234,7 @@
{
try
{
await using var handle = await db.AcquireDistributedLockAsync(
await using var handle = await db.Database.AcquireDistributedLockAsync(
"products:price-sync",
timeout: TimeSpan.FromSeconds(3)
);
Expand Down
2 changes: 1 addition & 1 deletion samples/QueueProcessor/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
Console.WriteLine("\nRunning maintenance sweep...");
await using (var db = new JobDbContext(optionsBuilder.Options))
{
await using var sweepHandle = await db.TryAcquireDistributedLockAsync("jobs:maintenance-sweep");
await using var sweepHandle = await db.Database.TryAcquireDistributedLockAsync("jobs:maintenance-sweep");
if (sweepHandle is null)
{
Console.WriteLine("Another process is already running the sweep — skipping.");
Expand Down
2 changes: 1 addition & 1 deletion samples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ var job = await db.Jobs
After all workers finish, a maintenance sweep requeues any jobs stuck in `Processing` for more than 5 minutes:

```csharp
await using var sweepHandle = await db.TryAcquireDistributedLockAsync("jobs:maintenance-sweep");
await using var sweepHandle = await db.Database.TryAcquireDistributedLockAsync("jobs:maintenance-sweep");
if (sweepHandle is null)
{
Console.WriteLine("Another process is already running the sweep — skipping.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,32 @@
using EntityFrameworkCore.Locking.Internal;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.DependencyInjection;

namespace EntityFrameworkCore.Locking;

/// <summary>
/// Extension methods on <see cref="DbContext"/> for acquiring distributed (advisory) locks.
/// No active transaction is required — locks are session-scoped.
/// Extension methods on <see cref="DatabaseFacade"/> (accessed via <c>DbContext.Database</c>) for
/// acquiring distributed (advisory) locks. No active transaction is required — locks are session-scoped.
/// </summary>
public static class DbContextDistributedLockExtensions
public static class DatabaseFacadeDistributedLockExtensions
{
/// <summary>
/// Acquires a distributed lock with the given key, blocking until it is available.
/// </summary>
/// <param name="ctx">The DbContext whose connection will hold the lock.</param>
/// <param name="database">The <see cref="DatabaseFacade"/> whose connection will hold the lock.</param>
/// <param name="key">Lock key (1–255 characters).</param>
/// <param name="timeout">Maximum time to wait. Throws <see cref="LockTimeoutException"/> if exceeded. Null = wait indefinitely.</param>
/// <param name="ct">Cancellation token. Cancellation is best-effort (driver-dependent).</param>
public static async Task<IDistributedLockHandle> AcquireDistributedLockAsync(
this DbContext ctx,
this DatabaseFacade database,
string key,
TimeSpan? timeout = null,
CancellationToken ct = default
)
{
var (provider, connection, openedByMe) = await PrepareAsync(ctx, key, ct).ConfigureAwait(false);
var (ctx, provider, connection, openedByMe) = await PrepareAsync(database, key, ct).ConfigureAwait(false);
try
{
DistributedLockRegistry.RegisterOrThrow(ctx, connection, key);
Expand All @@ -56,12 +57,12 @@ public static async Task<IDistributedLockHandle> AcquireDistributedLockAsync(
/// Returns null immediately if the lock is held by another connection.
/// </summary>
public static async Task<IDistributedLockHandle?> TryAcquireDistributedLockAsync(
this DbContext ctx,
this DatabaseFacade database,
string key,
CancellationToken ct = default
)
{
var (provider, connection, openedByMe) = await PrepareAsync(ctx, key, ct).ConfigureAwait(false);
var (ctx, provider, connection, openedByMe) = await PrepareAsync(database, key, ct).ConfigureAwait(false);
try
{
DistributedLockRegistry.RegisterOrThrow(ctx, connection, key);
Expand Down Expand Up @@ -93,12 +94,12 @@ public static async Task<IDistributedLockHandle> AcquireDistributedLockAsync(

/// <summary>Acquires a distributed lock synchronously.</summary>
public static IDistributedLockHandle AcquireDistributedLock(
this DbContext ctx,
this DatabaseFacade database,
string key,
TimeSpan? timeout = null
)
{
var (provider, connection, openedByMe) = PrepareSync(ctx, key);
var (ctx, provider, connection, openedByMe) = PrepareSync(database, key);
try
{
DistributedLockRegistry.RegisterOrThrow(ctx, connection, key);
Expand All @@ -121,9 +122,9 @@ public static IDistributedLockHandle AcquireDistributedLock(
}

/// <summary>Attempts to acquire a distributed lock synchronously. Returns null if contested.</summary>
public static IDistributedLockHandle? TryAcquireDistributedLock(this DbContext ctx, string key)
public static IDistributedLockHandle? TryAcquireDistributedLock(this DatabaseFacade database, string key)
{
var (provider, connection, openedByMe) = PrepareSync(ctx, key);
var (ctx, provider, connection, openedByMe) = PrepareSync(database, key);
try
{
DistributedLockRegistry.RegisterOrThrow(ctx, connection, key);
Expand Down Expand Up @@ -153,46 +154,51 @@ public static IDistributedLockHandle AcquireDistributedLock(
}
}

/// <summary>Returns true if the current DbContext's provider supports distributed locks.</summary>
public static bool SupportsDistributedLocks(this DbContext ctx)
/// <summary>Returns true if the configured EF Core provider supports distributed locks.</summary>
public static bool SupportsDistributedLocks(this DatabaseFacade database)
{
var lp = ctx.GetInfrastructure().GetService<ILockingProvider>();
var lp = ((IInfrastructure<IServiceProvider>)database).Instance.GetService<ILockingProvider>();
return lp?.AdvisoryLockProvider is not null;
}

private static async Task<(IAdvisoryLockProvider provider, DbConnection connection, bool openedByMe)> PrepareAsync(
private static async Task<(
DbContext ctx,
string key,
CancellationToken ct
)
IAdvisoryLockProvider provider,
DbConnection connection,
bool openedByMe
)> PrepareAsync(DatabaseFacade database, string key, CancellationToken ct)
{
ValidateKey(key);
var provider = ResolveProvider(ctx);
var connection = ctx.Database.GetDbConnection();
var ctx = GetContext(database);
var provider = ResolveProvider(database);
var connection = database.GetDbConnection();
bool openedByMe = false;
if (connection.State != ConnectionState.Open)
{
await connection.OpenAsync(ct).ConfigureAwait(false);
openedByMe = true;
}
return (provider, connection, openedByMe);
return (ctx, provider, connection, openedByMe);
}

private static (IAdvisoryLockProvider provider, DbConnection connection, bool openedByMe) PrepareSync(
private static (
DbContext ctx,
string key
)
IAdvisoryLockProvider provider,
DbConnection connection,
bool openedByMe
) PrepareSync(DatabaseFacade database, string key)
{
ValidateKey(key);
var provider = ResolveProvider(ctx);
var connection = ctx.Database.GetDbConnection();
var ctx = GetContext(database);
var provider = ResolveProvider(database);
var connection = database.GetDbConnection();
bool openedByMe = false;
if (connection.State != ConnectionState.Open)
{
connection.Open();
openedByMe = true;
}
return (provider, connection, openedByMe);
return (ctx, provider, connection, openedByMe);
}

private static void ValidateKey(string key)
Expand All @@ -203,9 +209,12 @@ private static void ValidateKey(string key)
throw new ArgumentException("Lock key must not exceed 255 characters.", nameof(key));
}

private static IAdvisoryLockProvider ResolveProvider(DbContext ctx)
private static DbContext GetContext(DatabaseFacade database) =>
((IDatabaseFacadeDependenciesAccessor)database).Context;

private static IAdvisoryLockProvider ResolveProvider(DatabaseFacade database)
{
var lp = ctx.GetInfrastructure().GetService<ILockingProvider>();
var lp = ((IInfrastructure<IServiceProvider>)database).Instance.GetService<ILockingProvider>();
if (lp is null)
throw new LockingConfigurationException(
"No ILockingProvider is registered. Call UseLocking() when configuring the DbContext."
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using AwesomeAssertions;
using EntityFrameworkCore.Locking;
using EntityFrameworkCore.Locking.MySql.Tests.Fixtures;
using EntityFrameworkCore.Locking.Tests.Infrastructure;
using Microsoft.EntityFrameworkCore;
Expand Down Expand Up @@ -29,7 +28,7 @@ public async Task LongKey_ExceededMysqlLimit_HashesCorrectly()
// Key > 64 chars is hashed to lock:<hex58> (64 chars total)
var longKey = new string('x', 100);
await using var ctx = CreateContext();
await using var handle = await ctx.AcquireDistributedLockAsync(longKey);
await using var handle = await ctx.Database.AcquireDistributedLockAsync(longKey);
handle.Should().NotBeNull();
handle.Key.Should().Be(longKey); // public Key is the original, not encoded
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using AwesomeAssertions;
using EntityFrameworkCore.Locking;
using EntityFrameworkCore.Locking.PostgreSQL.Tests.Fixtures;
using EntityFrameworkCore.Locking.Tests.Infrastructure;
using Microsoft.EntityFrameworkCore;
Expand All @@ -21,12 +20,12 @@ public async Task Acquire_Contested_BlocksUntilReleased()
const string key = "pg-block-key";

await using var ctxA = CreateContext();
var handleA = await ctxA.AcquireDistributedLockAsync(key);
var handleA = await ctxA.Database.AcquireDistributedLockAsync(key);

var acquireTask = Task.Run(async () =>
{
await using var ctxB = CreateContext();
await using var h = await ctxB.AcquireDistributedLockAsync(key);
await using var h = await ctxB.Database.AcquireDistributedLockAsync(key);
});

var completed = await Task.WhenAny(acquireTask, Task.Delay(300));
Expand All @@ -42,12 +41,12 @@ public async Task TwoContexts_DifferentConnections_CanBothHoldSameKey()
const string key = "pg-registry-scope";

await using var ctxA = CreateContext();
await using var hA = await ctxA.AcquireDistributedLockAsync(key);
await using var hA = await ctxA.Database.AcquireDistributedLockAsync(key);

await using var ctxB = CreateContext();
var hB = await ctxB.TryAcquireDistributedLockAsync(key);
var hB = await ctxB.Database.TryAcquireDistributedLockAsync(key);
await hA.DisposeAsync();
hB = await ctxB.TryAcquireDistributedLockAsync(key);
hB = await ctxB.Database.TryAcquireDistributedLockAsync(key);
hB.Should().NotBeNull("after ctxA releases, ctxB should acquire");
await hB!.DisposeAsync();
}
Expand All @@ -58,14 +57,14 @@ public async Task Acquire_Cancelled_WithTimeout_ThrowsOperationCanceled()
const string key = "pg-cancel-with-timeout";

await using var ctxA = CreateContext();
await using var handleA = await ctxA.AcquireDistributedLockAsync(key);
await using var handleA = await ctxA.Database.AcquireDistributedLockAsync(key);

await using var ctxB = CreateContext();
using var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromMilliseconds(200));

var sw = System.Diagnostics.Stopwatch.StartNew();
Func<Task> act = () => ctxB.AcquireDistributedLockAsync(key, TimeSpan.FromSeconds(10), cts.Token);
Func<Task> act = () => ctxB.Database.AcquireDistributedLockAsync(key, TimeSpan.FromSeconds(10), cts.Token);
await act.Should().ThrowAsync<Exception>();
sw.Stop();
sw.Elapsed.Should().BeLessThan(TimeSpan.FromSeconds(15));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using AwesomeAssertions;
using EntityFrameworkCore.Locking;
using EntityFrameworkCore.Locking.SqlServer.Tests.Fixtures;
using EntityFrameworkCore.Locking.Tests.Infrastructure;
using Microsoft.EntityFrameworkCore;
Expand All @@ -20,14 +19,14 @@ public async Task Acquire_Cancelled_WithTimeout_Throws()
{
const string key = "ss-cancel-timeout";
await using var ctxA = CreateContext();
await using var handleA = await ctxA.AcquireDistributedLockAsync(key);
await using var handleA = await ctxA.Database.AcquireDistributedLockAsync(key);

await using var ctxB = CreateContext();
using var cts = new CancellationTokenSource();
cts.CancelAfter(200);

var sw = System.Diagnostics.Stopwatch.StartNew();
Func<Task> act = () => ctxB.AcquireDistributedLockAsync(key, TimeSpan.FromSeconds(10), cts.Token);
Func<Task> act = () => ctxB.Database.AcquireDistributedLockAsync(key, TimeSpan.FromSeconds(10), cts.Token);
await act.Should().ThrowAsync<Exception>();
sw.Stop();
sw.Elapsed.Should().BeLessThan(TimeSpan.FromSeconds(15));
Expand Down
Loading
Loading