Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/publish-packages.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ jobs:
dotnet pack Mythetech.Framework/Mythetech.Framework.csproj -c Release --no-restore -o ./nupkgs
dotnet pack Mythetech.Framework.WebAssembly/Mythetech.Framework.WebAssembly.csproj -c Release --no-restore -o ./nupkgs
dotnet pack Mythetech.Framework.Desktop/Mythetech.Framework.Desktop.csproj -c Release --no-restore -o ./nupkgs
dotnet pack Mythetech.Framework.Observability/Mythetech.Framework.Observability.csproj -c Release --no-restore -o ./nupkgs

- name: Publish to NuGet.org
run: dotnet nuget push ./nupkgs/*.nupkg --source https://api.nuget.org/v3/index.json --api-key ${{ secrets.NUGET_ORG_API_KEY }} --skip-duplicate
Expand Down
40 changes: 40 additions & 0 deletions .github/workflows/publish-preview.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
name: Publish Preview NuGet Packages

on:
pull_request:
branches: [ "main" ]
types: [opened, synchronize, reopened]

jobs:
publish-preview:
runs-on: ubuntu-latest
permissions:
contents: read

steps:
- name: Checkout
uses: actions/checkout@v4

- name: Setup .NET SDK
uses: actions/setup-dotnet@v4
with:
dotnet-version: '10.0'

- name: Install .NET WASM workload
run: dotnet workload install wasm-tools

- name: Add GitHub Packages source
run: dotnet nuget add source --username ${{ github.actor }} --password ${{ secrets.GITHUB_TOKEN }} --store-password-in-clear-text --name github-gtksharp https://nuget.pkg.github.com/GtkSharp/index.json

- name: Restore
run: dotnet restore

- name: Build and Pack Preview
run: |
dotnet pack Mythetech.Framework/Mythetech.Framework.csproj -c Release --no-restore -o ./nupkgs --version-suffix "preview.${{ github.run_number }}"
dotnet pack Mythetech.Framework.WebAssembly/Mythetech.Framework.WebAssembly.csproj -c Release --no-restore -o ./nupkgs --version-suffix "preview.${{ github.run_number }}"
dotnet pack Mythetech.Framework.Desktop/Mythetech.Framework.Desktop.csproj -c Release --no-restore -o ./nupkgs --version-suffix "preview.${{ github.run_number }}"
dotnet pack Mythetech.Framework.Observability/Mythetech.Framework.Observability.csproj -c Release --no-restore -o ./nupkgs --version-suffix "preview.${{ github.run_number }}"

- name: Publish to NuGet.org
run: dotnet nuget push ./nupkgs/*.nupkg --source https://api.nuget.org/v3/index.json --api-key ${{ secrets.NUGET_ORG_API_KEY }} --skip-duplicate
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<!-- Prevent static asset conflicts with consuming app -->
<StaticWebAssetsEnabled>false</StaticWebAssetsEnabled>
<PackageId>Mythetech.Framework.Desktop</PackageId>
<Version>0.10.1</Version>
<VersionPrefix>0.11.0</VersionPrefix>
<Authors>Mythetech</Authors>
<Description>Desktop-specific components for cross platform Blazor applications using Photino or Hermes</Description>
<PackageTags>blazor;desktop;photino;hermes;components;ui;framework;cross-platform</PackageTags>
Expand Down
337 changes: 337 additions & 0 deletions Mythetech.Framework.Desktop/Queue/LiteDbQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,337 @@
using LiteDB;
using JsonSerializer = System.Text.Json.JsonSerializer;
using Microsoft.Extensions.Logging;
using Mythetech.Framework.Infrastructure.Queue;

namespace Mythetech.Framework.Desktop.Queue;

/// <summary>
/// LiteDB-based queue implementation for Desktop applications.
/// Provides persistent queue storage with retry semantics.
/// </summary>
/// <typeparam name="T">The type of items in the queue.</typeparam>
public class LiteDbQueue<T> : IQueue<T> where T : class
{
private readonly ILiteDatabase _database;
private readonly string _collectionName;
private readonly ILogger? _logger;
private readonly object _lock = new();

/// <summary>
/// Creates a new LiteDB queue instance.
/// </summary>
/// <param name="database">The LiteDB database instance.</param>
/// <param name="collectionName">Name of the collection to use for this queue.</param>
/// <param name="logger">Optional logger for error reporting.</param>
public LiteDbQueue(ILiteDatabase database, string collectionName, ILogger? logger = null)
{
_database = database ?? throw new ArgumentNullException(nameof(database));
_collectionName = collectionName ?? throw new ArgumentNullException(nameof(collectionName));
_logger = logger;

EnsureIndexes();
}

private void EnsureIndexes()
{
try
{
var collection = GetCollection();

collection.EnsureIndex(x => x.Status);
collection.EnsureIndex(x => x.CreatedAt);
}
catch (Exception ex)
{
_logger?.LogWarning(ex, "Failed to ensure indexes for queue {CollectionName}", _collectionName);
}
}

private ILiteCollection<LiteDbQueueDocument> GetCollection()
{
return _database.GetCollection<LiteDbQueueDocument>(_collectionName);
}

/// <inheritdoc />
public Task<string> EnqueueAsync(T item, CancellationToken ct = default)
{
ct.ThrowIfCancellationRequested();

try
{
var id = Guid.NewGuid().ToString("N");
var document = new LiteDbQueueDocument
{
Id = id,
ItemJson = JsonSerializer.Serialize(item),
Status = QueueEntryStatus.Pending,
CreatedAt = DateTime.UtcNow,
RetryCount = 0
};

lock (_lock)
{
GetCollection().Insert(document);
}

_logger?.LogDebug("Enqueued item {Id} to queue {QueueName}", id, _collectionName);
return Task.FromResult(id);
}
catch (Exception ex)
{
_logger?.LogError(ex, "Failed to enqueue item to queue {QueueName}", _collectionName);
throw;
}
}

/// <inheritdoc />
public Task<QueueEntry<T>?> DequeueAsync(CancellationToken ct = default)
{
ct.ThrowIfCancellationRequested();

try
{
lock (_lock)
{
var collection = GetCollection();

var document = collection
.Find(x => x.Status == QueueEntryStatus.Pending)
.OrderBy(x => x.CreatedAt)
.FirstOrDefault();

if (document == null)
{
return Task.FromResult<QueueEntry<T>?>(null);
}

document.Status = QueueEntryStatus.Processing;
collection.Update(document);

var entry = ToQueueEntry(document);
_logger?.LogDebug("Dequeued item {Id} from queue {QueueName}", document.Id, _collectionName);
return Task.FromResult<QueueEntry<T>?>(entry);
}
}
catch (Exception ex)
{
_logger?.LogError(ex, "Failed to dequeue from queue {QueueName}", _collectionName);
throw;
}
}

/// <inheritdoc />
public Task<QueueEntry<T>?> PeekAsync(CancellationToken ct = default)
{
ct.ThrowIfCancellationRequested();

try
{
var collection = GetCollection();

var document = collection
.Find(x => x.Status == QueueEntryStatus.Pending)
.OrderBy(x => x.CreatedAt)
.FirstOrDefault();

if (document == null)
{
return Task.FromResult<QueueEntry<T>?>(null);
}

return Task.FromResult<QueueEntry<T>?>(ToQueueEntry(document));
}
catch (Exception ex)
{
_logger?.LogError(ex, "Failed to peek queue {QueueName}", _collectionName);
throw;
}
}

/// <inheritdoc />
public Task CompleteAsync(string entryId, CancellationToken ct = default)
{
ct.ThrowIfCancellationRequested();

try
{
lock (_lock)
{
var collection = GetCollection();
var document = collection.FindById(entryId);

if (document == null)
{
_logger?.LogWarning("Attempted to complete non-existent entry {Id} in queue {QueueName}", entryId, _collectionName);
return Task.CompletedTask;
}

document.Status = QueueEntryStatus.Completed;
document.ProcessedAt = DateTime.UtcNow;
collection.Update(document);

_logger?.LogDebug("Completed entry {Id} in queue {QueueName}", entryId, _collectionName);
}

return Task.CompletedTask;
}
catch (Exception ex)
{
_logger?.LogError(ex, "Failed to complete entry {Id} in queue {QueueName}", entryId, _collectionName);
throw;
}
}

/// <inheritdoc />
public Task FailAsync(string entryId, string? reason = null, CancellationToken ct = default)
{
ct.ThrowIfCancellationRequested();

try
{
lock (_lock)
{
var collection = GetCollection();
var document = collection.FindById(entryId);

if (document == null)
{
_logger?.LogWarning("Attempted to fail non-existent entry {Id} in queue {QueueName}", entryId, _collectionName);
return Task.CompletedTask;
}

document.Status = QueueEntryStatus.Failed;
document.ProcessedAt = DateTime.UtcNow;
document.FailureReason = reason;
document.RetryCount++;
collection.Update(document);

_logger?.LogDebug("Failed entry {Id} in queue {QueueName}: {Reason}", entryId, _collectionName, reason);
}

return Task.CompletedTask;
}
catch (Exception ex)
{
_logger?.LogError(ex, "Failed to mark entry {Id} as failed in queue {QueueName}", entryId, _collectionName);
throw;
}
}

/// <inheritdoc />
public Task<int> GetPendingCountAsync(CancellationToken ct = default)
{
ct.ThrowIfCancellationRequested();

try
{
var count = GetCollection().Count(x => x.Status == QueueEntryStatus.Pending);
return Task.FromResult(count);
}
catch (Exception ex)
{
_logger?.LogError(ex, "Failed to get pending count for queue {QueueName}", _collectionName);
throw;
}
}

/// <inheritdoc />
public Task<IReadOnlyList<QueueEntry<T>>> GetFailedAsync(int limit = 100, CancellationToken ct = default)
{
ct.ThrowIfCancellationRequested();

try
{
var documents = GetCollection()
.Find(x => x.Status == QueueEntryStatus.Failed)
.OrderByDescending(x => x.ProcessedAt)
.Take(limit)
.ToList();

var entries = documents.Select(ToQueueEntry).ToList();
return Task.FromResult<IReadOnlyList<QueueEntry<T>>>(entries);
}
catch (Exception ex)
{
_logger?.LogError(ex, "Failed to get failed entries from queue {QueueName}", _collectionName);
throw;
}
}

/// <inheritdoc />
public Task RetryAsync(string entryId, CancellationToken ct = default)
{
ct.ThrowIfCancellationRequested();

try
{
lock (_lock)
{
var collection = GetCollection();
var document = collection.FindById(entryId);

if (document == null)
{
_logger?.LogWarning("Attempted to retry non-existent entry {Id} in queue {QueueName}", entryId, _collectionName);
return Task.CompletedTask;
}

document.Status = QueueEntryStatus.Pending;
document.ProcessedAt = null;
document.FailureReason = null;
collection.Update(document);

_logger?.LogDebug("Retrying entry {Id} in queue {QueueName} (attempt {RetryCount})", entryId, _collectionName, document.RetryCount + 1);
}

return Task.CompletedTask;
}
catch (Exception ex)
{
_logger?.LogError(ex, "Failed to retry entry {Id} in queue {QueueName}", entryId, _collectionName);
throw;
}
}

/// <inheritdoc />
public Task<int> PurgeCompletedAsync(DateTime olderThan, CancellationToken ct = default)
{
ct.ThrowIfCancellationRequested();

try
{
lock (_lock)
{
var collection = GetCollection();
var count = collection.DeleteMany(x =>
x.Status == QueueEntryStatus.Completed &&
x.ProcessedAt != null &&
x.ProcessedAt < olderThan);

_logger?.LogDebug("Purged {Count} completed entries from queue {QueueName}", count, _collectionName);
return Task.FromResult(count);
}
}
catch (Exception ex)
{
_logger?.LogError(ex, "Failed to purge completed entries from queue {QueueName}", _collectionName);
throw;
}
}

private QueueEntry<T> ToQueueEntry(LiteDbQueueDocument document)
{
var item = JsonSerializer.Deserialize<T>(document.ItemJson)!;

return new QueueEntry<T>
{
Id = document.Id,
Item = item,
Status = document.Status,
CreatedAt = document.CreatedAt,
ProcessedAt = document.ProcessedAt,
RetryCount = document.RetryCount,
FailureReason = document.FailureReason
};
}
}
Loading
Loading