Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,4 @@ public enum DatabaseProvider

/// <summary>Microsoft SQL Server.</summary>
SqlServer,

/// <summary>Oracle Database.</summary>
Oracle,

/// <summary>SQLite (unsafe fallback; not recommended for production).</summary>
Sqlite,

/// <summary>An unrecognized or unsupported provider.</summary>
Unknown,
}
2 changes: 0 additions & 2 deletions src/Atomizer.EntityFrameworkCore/Providers/EntityMap.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ public static EntityMap Build(IModel model, Type clrType, DatabaseProvider provi
DatabaseProvider.SqlServer => name => $"[{name}]",
DatabaseProvider.PostgreSql => name => $"\"{name}\"",
DatabaseProvider.MySql => name => $"`{name}`",
DatabaseProvider.Oracle => name => $"\"{name}\"",
DatabaseProvider.Unknown => name => name,
_ => throw new NotSupportedException($"Database provider {provider} is not supported."),
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,23 @@ namespace Atomizer.EntityFrameworkCore.Providers;

internal sealed class RelationalProviderCache
{
public bool IsSupportedProvider => DetermineSupportedProvider(DatabaseProvider);
public bool IsSupportedProvider => _databaseProvider is not null;
public string ProviderName { get; }
public ISqlDialect? Dialect { get; }

private DatabaseProvider DatabaseProvider { get; }
private readonly DatabaseProvider? _databaseProvider;
private readonly EntityMap? _jobs;
private readonly EntityMap? _schedules;

private RelationalProviderCache(DatabaseProvider databaseProvider, EntityMap? jobs, EntityMap? schedules)
private RelationalProviderCache(
string providerName,
DatabaseProvider? databaseProvider,
EntityMap? jobs,
EntityMap? schedules
)
{
DatabaseProvider = databaseProvider;
ProviderName = providerName;
_databaseProvider = databaseProvider;
_jobs = jobs;
_schedules = schedules;

Expand All @@ -26,30 +33,29 @@ private RelationalProviderCache(DatabaseProvider databaseProvider, EntityMap? jo
}
}

private static readonly ConcurrentDictionary<DatabaseProvider, RelationalProviderCache> Instances = new();
private static readonly ConcurrentDictionary<string, RelationalProviderCache> Instances = new();

public static RelationalProviderCache Create<TDbContext>(TDbContext dbContext)
where TDbContext : DbContext
{
// Determine provider for this DbContext
var provider = DetectProvider(dbContext.Database.ProviderName ?? string.Empty);
var providerName = dbContext.Database.ProviderName ?? string.Empty;
var provider = DetectProvider(providerName);

// Freeze Unknown per-provider as well (same semantics as before, but keyed).
return Instances.GetOrAdd(
provider,
providerName,
_ =>
{
EntityMap? jobs = null,
schedules = null;

if (DetermineSupportedProvider(provider))
if (provider is not null)
{
var model = dbContext.Model; // capture once
jobs = EntityMap.Build(model, typeof(AtomizerJobEntity), provider);
schedules = EntityMap.Build(model, typeof(AtomizerScheduleEntity), provider);
jobs = EntityMap.Build(model, typeof(AtomizerJobEntity), provider.Value);
schedules = EntityMap.Build(model, typeof(AtomizerScheduleEntity), provider.Value);
}

return new RelationalProviderCache(provider, jobs, schedules);
return new RelationalProviderCache(providerName, provider, jobs, schedules);
}
);
}
Expand All @@ -61,34 +67,27 @@ private ISqlDialect CreateDialect()
throw new InvalidOperationException("Database provider is not supported or entity mappings are missing.");
}

return DatabaseProvider switch
return _databaseProvider switch
{
DatabaseProvider.PostgreSql => new PostgreSqlDialect(_jobs, _schedules),
DatabaseProvider.MySql => new MySqlDialect(_jobs, _schedules),
DatabaseProvider.SqlServer => new SqlServerDialect(_jobs, _schedules),
_ => throw new NotSupportedException($"Database provider {DatabaseProvider} is not supported."),
_ => throw new NotSupportedException($"Database provider {_databaseProvider} is not supported."),
};
}

private static DatabaseProvider DetectProvider(string name) =>
private static DatabaseProvider? DetectProvider(string name) =>
name switch
{
"Microsoft.EntityFrameworkCore.SqlServer" => DatabaseProvider.SqlServer,
"Npgsql.EntityFrameworkCore.PostgreSQL" => DatabaseProvider.PostgreSql,
"Pomelo.EntityFrameworkCore.MySql" or "MySql.EntityFrameworkCore" => DatabaseProvider.MySql,
"Oracle.EntityFrameworkCore" => DatabaseProvider.Oracle,
"Microsoft.EntityFrameworkCore.Sqlite" => DatabaseProvider.Sqlite,
_ => DatabaseProvider.Unknown,
_ => null,
};

private static bool DetermineSupportedProvider(DatabaseProvider provider)
{
return provider is DatabaseProvider.PostgreSql or DatabaseProvider.MySql or DatabaseProvider.SqlServer;
}

// Testing helpers
internal static bool TryGet(DatabaseProvider provider, out RelationalProviderCache? cache) =>
Instances.TryGetValue(provider, out cache);
Instances.TryGetValue(GetProviderName(provider), out cache);

internal static void ResetInstanceForTests(DatabaseProvider? provider = null)
{
Expand All @@ -98,6 +97,15 @@ internal static void ResetInstanceForTests(DatabaseProvider? provider = null)
return;
}

Instances.TryRemove(provider.Value, out _);
Instances.TryRemove(GetProviderName(provider.Value), out _);
}

private static string GetProviderName(DatabaseProvider provider) =>
provider switch
{
DatabaseProvider.SqlServer => "Microsoft.EntityFrameworkCore.SqlServer",
DatabaseProvider.PostgreSql => "Npgsql.EntityFrameworkCore.PostgreSQL",
DatabaseProvider.MySql => "Pomelo.EntityFrameworkCore.MySql",
_ => throw new NotSupportedException($"Database provider {provider} is not supported."),
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,6 @@
/// </summary>
public class EntityFrameworkCoreJobStorageOptions
{
/// <summary>
/// If true, allows falling back to providers that may not be
/// fully supported, tested or work in distributed environments (e.g. SQLite).
/// </summary>
/// <remarks>Default is false. See documentation for details and implications.</remarks>
public bool AllowUnsafeProviderFallback { get; set; } = false;

/// <summary>
/// Maximum time to wait when acquiring a database transaction lock before giving up.
/// </summary>
Expand Down
156 changes: 16 additions & 140 deletions src/Atomizer.EntityFrameworkCore/Storage/EntityFrameworkCoreStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ IAtomizerClock clock
_logger = logger;
_clock = clock;
_providerCache = RelationalProviderCache.Create(dbContext);
if (!_providerCache.IsSupportedProvider)
{
throw UnsupportedProviderException(_providerCache.ProviderName);
}
}

public async Task UpsertHeartbeatAsync(AtomizerActiveServer server, CancellationToken cancellationToken)
Expand Down Expand Up @@ -163,7 +167,7 @@ public async Task<Guid> InsertAsync(AtomizerJob job, CancellationToken cancellat
}
}

if (job.PartitionKey != null && _providerCache is { IsSupportedProvider: true, Dialect: not null })
if (job.PartitionKey != null && _providerCache.Dialect is not null)
{
var sql = _providerCache.Dialect.InsertJobWithSequence(job);
await _dbContext.Database.ExecuteSqlInterpolatedAsync(sql, cancellationToken);
Expand All @@ -176,19 +180,6 @@ public async Task<Guid> InsertAsync(AtomizerJob job, CancellationToken cancellat
return job.Id;
}

if (job.PartitionKey != null && !_providerCache.IsSupportedProvider && _options.AllowUnsafeProviderFallback)
{
// LINQ fallback sequence assignment: not atomic under concurrency but safe for single-process use.
var partitionKeyStr = job.PartitionKey.ToString();
var queueKeyStr = job.QueueKey.Key;
var maxSeq = await JobEntities
.AsNoTracking()
.Where(j => j.QueueKey == queueKeyStr && j.PartitionKey == partitionKeyStr)
.MaxAsync(j => j.SequenceNumber, cancellationToken);
entity.SequenceNumber = (maxSeq ?? 0L) + 1L;
job.SequenceNumber = entity.SequenceNumber;
}

JobEntities.Add(entity);
await _dbContext.SaveChangesAsync(cancellationToken);
return entity.Id;
Expand Down Expand Up @@ -221,7 +212,7 @@ CancellationToken cancellationToken
{
cancellationToken.ThrowIfCancellationRequested();

if (_providerCache is { IsSupportedProvider: true, Dialect: not null })
if (_providerCache.Dialect is not null)
{
var sql = _providerCache.Dialect.GetDueJobs(queueKey, now, batchSize);

Expand All @@ -230,62 +221,7 @@ CancellationToken cancellationToken
return entities.Select(job => job.ToAtomizerJob()).ToList();
}

if (!_providerCache.IsSupportedProvider && _options.AllowUnsafeProviderFallback)
{
// WARNING: AsNoTracking() with no row lock means two concurrent QueuePumps
// on the same process (or any second node) will both receive the same jobs.
// AllowUnsafeProviderFallback is only safe with DegreeOfParallelism=1 and
// a single process instance. It is not safe for production use.
var allForQueue = await JobEntities
.AsNoTracking()
.Where(j => j.QueueKey == queueKey.Key)
.ToListAsync(cancellationToken);

// 1) Collect blocked partitions: any partition key with a Processing job
// or a Pending job with prior attempts (retrying).
var blockedPartitions = allForQueue
.Where(j =>
j.PartitionKey != null
&& (
j.Status == AtomizerEntityJobStatus.Processing
|| (j.Status == AtomizerEntityJobStatus.Pending && j.Attempts > 0)
)
)
.Select(j => j.PartitionKey!)
.ToHashSet();

// 2) Find the lowest sequence number per unblocked partition (partition heads).
// Only consider Pending jobs that are due — Completed and Failed jobs must not
// block the next job from becoming the partition head.
var partitionHeads = allForQueue
.Where(j =>
j.PartitionKey != null
&& !blockedPartitions.Contains(j.PartitionKey)
&& j.Status == AtomizerEntityJobStatus.Pending
&& (j.VisibleAt == null || j.VisibleAt <= now)
&& j.ScheduledAt <= now
)
.GroupBy(j => j.PartitionKey!)
.Select(g => g.OrderBy(j => j.SequenceNumber).First().Id)
.ToHashSet();

// 3) Apply eligibility filter, FIFO partition-head filter, and batch size limit.
return allForQueue
.Where(j =>
(
j.Status == AtomizerEntityJobStatus.Pending
&& (j.VisibleAt == null || j.VisibleAt <= now)
&& j.ScheduledAt <= now
|| (j.Status == AtomizerEntityJobStatus.Processing && j.VisibleAt <= now) // lease expired
) && (j.PartitionKey == null || partitionHeads.Contains(j.Id))
)
.OrderBy(j => j.ScheduledAt)
.Take(batchSize)
.Select(j => j.ToAtomizerJob())
.ToList();
}

throw UnsupportedProviderException();
throw UnsupportedProviderException(_providerCache.ProviderName);
}

public async Task<int> ReleaseLeasedAsync(
Expand All @@ -294,41 +230,21 @@ public async Task<int> ReleaseLeasedAsync(
CancellationToken cancellationToken
)
{
if (_providerCache is { IsSupportedProvider: true, Dialect: not null })
if (_providerCache.Dialect is not null)
{
var sql = _providerCache.Dialect.ReleaseLeasedJobs(leaseToken, now);
var result = await _dbContext.Database.ExecuteSqlInterpolatedAsync(sql, cancellationToken);
return result;
}

var entities = await JobEntities
.Where(j => j.LeaseToken == leaseToken.Token && j.Status == AtomizerEntityJobStatus.Processing)
.ToListAsync(cancellationToken);

foreach (var entity in entities)
{
entity.Status = AtomizerEntityJobStatus.Pending;
entity.VisibleAt = null;
entity.LeaseToken = null;
entity.UpdatedAt = now;
}

try
{
return await _dbContext.SaveChangesAsync(cancellationToken);
}
catch (DbUpdateException ex)
{
_logger.LogError(ex, "Failed to release leased jobs for lease token {LeaseToken}", leaseToken.Token);
return 0;
}
throw UnsupportedProviderException(_providerCache.ProviderName);
}

public async Task<Guid> UpsertScheduleAsync(AtomizerSchedule schedule, CancellationToken cancellationToken)
{
var entity = schedule.ToEntity();

if (_providerCache is { IsSupportedProvider: true, Dialect: not null })
if (_providerCache.Dialect is not null)
{
var now = _clock.UtcNow;
var sql = _providerCache.Dialect.UpsertSchedule(schedule, now);
Expand All @@ -339,36 +255,7 @@ public async Task<Guid> UpsertScheduleAsync(AtomizerSchedule schedule, Cancellat
.FirstAsync(cancellationToken);
}

if (!_providerCache.IsSupportedProvider && _options.AllowUnsafeProviderFallback)
{
var existing = await ScheduleEntities
.AsNoTracking()
.FirstOrDefaultAsync(s => s.JobKey == entity.JobKey, cancellationToken);

if (existing is not null)
{
entity.Id = existing.Id;
ScheduleEntities.Update(entity);
}
else
{
ScheduleEntities.Add(entity);
}

try
{
await _dbContext.SaveChangesAsync(cancellationToken);
}
catch (DbUpdateException ex)
{
_logger.LogError(ex, "Failed to upsert schedule for job {JobKey}", schedule.JobKey);
throw;
}

return entity.Id;
}

throw UnsupportedProviderException();
throw UnsupportedProviderException(_providerCache.ProviderName);
}

public async Task UpdateSchedulesAsync(IEnumerable<AtomizerSchedule> schedules, CancellationToken cancellationToken)
Expand Down Expand Up @@ -396,7 +283,7 @@ CancellationToken cancellationToken
{
cancellationToken.ThrowIfCancellationRequested();

if (_providerCache is { IsSupportedProvider: true, Dialect: not null })
if (_providerCache.Dialect is not null)
{
var sql = _providerCache.Dialect.GetDueSchedules(now);

Expand All @@ -408,17 +295,7 @@ CancellationToken cancellationToken
return entities.Select(s => s.ToAtomizerSchedule()).ToList();
}

if (!_providerCache.IsSupportedProvider && _options.AllowUnsafeProviderFallback)
{
return await ScheduleEntities
.AsNoTracking()
.Where(s => s.Enabled && s.NextRunAt <= now)
.OrderBy(s => s.NextRunAt)
.Select(s => s.ToAtomizerSchedule())
.ToListAsync(cancellationToken);
}

throw UnsupportedProviderException();
throw UnsupportedProviderException(_providerCache.ProviderName);
}

public async Task<TResult> ExecuteInLeaseAsync<TResult>(
Expand Down Expand Up @@ -476,10 +353,9 @@ CancellationToken cancellationToken
}
}

private static NotSupportedException UnsupportedProviderException() =>
private static NotSupportedException UnsupportedProviderException(string providerName) =>
new(
"The current database provider is not supported. "
+ "To bypass this check, set AllowUnsafeProviderFallback to true in EntityFrameworkCoreJobStorageOptions. "
+ "Note that this may lead to unexpected behavior."
$"The current database provider '{providerName}' is not supported. "
+ "Atomizer.EntityFrameworkCore supports SQL Server, PostgreSQL and MySQL."
);
}
Loading
Loading