diff --git a/.config/dotnet-tools.json b/.config/dotnet-tools.json
index c37c1b5..9045ea0 100644
--- a/.config/dotnet-tools.json
+++ b/.config/dotnet-tools.json
@@ -3,13 +3,13 @@
"isRoot": true,
"tools": {
"dotnet-sonarscanner": {
- "version": "5.9.2",
+ "version": "6.2.0",
"commands": [
"dotnet-sonarscanner"
]
},
"gitversion.tool": {
- "version": "5.11.1",
+ "version": "5.12.0",
"commands": [
"dotnet-gitversion"
]
diff --git a/.editorconfig b/.editorconfig
index b10163a..fb75f21 100644
--- a/.editorconfig
+++ b/.editorconfig
@@ -8,8 +8,11 @@ root = true
[*]
indent_style = space
+[*.sln]
+indent_style = tab
+
# Code files
-[*.{cs,csx,vb,vbx}]
+[*.{cs,csx,vb,vbx,fs,fsx,fsi}]
indent_size = 4
max_line_length = 120
insert_final_newline = true
@@ -33,3 +36,17 @@ indent_size = 4
[*.md]
trim_trailing_whitespace = true
insert_final_newline = true
+
+# See https://github.com/dotnet/aspnetcore/blob/main/.editorconfig
+[src/**/*.{cs,csx,vb,vbx,fs,fsx,fsi}]
+
+# See https://www.jetbrains.com/help/resharper/ConfigureAwait_Analysis.html
+configure_await_analysis_mode = library
+# CA2007: Consider calling ConfigureAwait on the awaited task
+#dotnet_diagnostic.CA2007.severity = warning
+
+# CA2012: Use ValueTask correctly
+dotnet_diagnostic.CA2012.severity = warning
+
+# CA2013: Do not use ReferenceEquals with value types
+dotnet_diagnostic.CA2013.severity = warning
diff --git a/.github/dependabot.yml b/.github/dependabot.yml
index 4c5c935..6fff16c 100644
--- a/.github/dependabot.yml
+++ b/.github/dependabot.yml
@@ -1,11 +1,6 @@
----
version: 2
updates:
- package-ecosystem: github-actions
directory: "/"
schedule:
- interval: weekly
- - package-ecosystem: nuget
- directory: "/"
- schedule:
- interval: weekly
+ interval: monthly
diff --git a/.github/workflows/publish.yaml b/.github/workflows/publish.yaml
index bfce883..d646d95 100644
--- a/.github/workflows/publish.yaml
+++ b/.github/workflows/publish.yaml
@@ -1,25 +1,24 @@
----
name: Publish
on:
release:
types: [ published ]
+
jobs:
test:
runs-on: ubuntu-latest
env:
DOTNET_NOLOGO: true
steps:
- - uses: actions/checkout@v3
+ - uses: actions/checkout@v4
with:
# Required for GitVersion
fetch-depth: 0
- - uses: actions/setup-dotnet@v3
+ - uses: actions/setup-dotnet@v4
with:
dotnet-version: |
- 3.1.x
- 6.0.x
- 7.0.x
+ 8.0.x
+ 9.0.x
- run: dotnet restore
- run: dotnet build -c Release --no-restore
- run: dotnet test -c Release --no-build --verbosity=minimal
@@ -29,14 +28,14 @@ jobs:
env:
DOTNET_NOLOGO: true
steps:
- - uses: actions/checkout@v3
+ - uses: actions/checkout@v4
with:
# Required for GitVersion
fetch-depth: 0
- - uses: actions/setup-dotnet@v3
+ - uses: actions/setup-dotnet@v4
with:
dotnet-version: |
- 7.0.x
+ 9.0.x
- run: dotnet pack -c Release
- name: Publish
run: |
diff --git a/.github/workflows/qa.yml b/.github/workflows/qa.yml
index 1f8e284..37815a1 100644
--- a/.github/workflows/qa.yml
+++ b/.github/workflows/qa.yml
@@ -1,4 +1,3 @@
----
name: QA
on:
@@ -6,21 +5,22 @@ on:
branches: [ master, main ]
pull_request:
branches: [ master, main ]
+
jobs:
lint:
runs-on: ubuntu-latest
steps:
- - uses: actions/checkout@v3
+ - uses: actions/checkout@v4
with:
# Full history is needed to get a proper list of changed files
fetch-depth: 0
- - uses: github/super-linter@v4
+ - uses: github/super-linter@v7
env:
DEFAULT_BRANCH: main
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- VALIDATE_ALL_CODEBASE: false # Only changed files
+ VALIDATE_ALL_CODEBASE: false # Only changed files
VALIDATE_EDITORCONFIG: true
- VALIDATE_CSHARP: true
+ VALIDATE_CSHARP: false # Checked by SonarQube
VALIDATE_JSON: true
VALIDATE_MARKDOWN: true
VALIDATE_YAML: true
@@ -30,20 +30,19 @@ jobs:
env:
DOTNET_NOLOGO: true
steps:
- - uses: actions/checkout@v3
+ - uses: actions/checkout@v4
with:
- # Disabling shallow clone is recommended by SonarCloud for improving relevancy of reporting
+ # Disabling shallow clone is recommended by SonarQube for improving relevancy of reporting
fetch-depth: 0
- - uses: actions/setup-java@v3
+ - uses: actions/setup-java@v4
with:
distribution: temurin
- java-version: 17
- - uses: actions/setup-dotnet@v3
+ java-version: 21
+ - uses: actions/setup-dotnet@v4
with:
dotnet-version: |
- 3.1.x
- 6.0.x
- 7.0.x
+ 8.0.x
+ 9.0.x
- run: dotnet tool restore
- run: dotnet gitversion /output buildserver
- run: ./sonar-scan.sh
diff --git a/CHANGELOG.md b/CHANGELOG.md
new file mode 100644
index 0000000..dffd79e
--- /dev/null
+++ b/CHANGELOG.md
@@ -0,0 +1,32 @@
+# Changelog
+
+All notable changes to this project will be documented in this file.
+
+The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
+and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
+
+## [Unreleased]
+
+### Fixed
+
+### Added
+
+### Changed
+
+### Removed
+
+## [0.2.0] - 2025-03-12
+
+### Changed
+
+- Whole new design
+
+### Removed
+
+- LocalPost.SnsPublisher package
+
+## [0.1.0] - 2023-01-01
+
+### Added
+
+- Initial release
diff --git a/Directory.Build.props b/Directory.Build.props
deleted file mode 100644
index 8bb8702..0000000
--- a/Directory.Build.props
+++ /dev/null
@@ -1,18 +0,0 @@
-
-
-
- 11
- enable
- enable
- true
-
-
-
-
- <_Parameter1>$(MSBuildProjectName).Tests
-
-
- <_Parameter1>DynamicProxyGenAssembly2
-
-
-
diff --git a/LocalPost.sln b/LocalPost.sln
index 2c7c844..76f0263 100644
--- a/LocalPost.sln
+++ b/LocalPost.sln
@@ -2,18 +2,24 @@
Microsoft Visual Studio Solution File, Format Version 12.00
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "LocalPost", "src\LocalPost\LocalPost.csproj", "{474D2C1A-5557-4ED9-AF20-FE195D4C1AF7}"
EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SampleWebApp", "samples\SampleWebApp\SampleWebApp.csproj", "{46FC61E6-D0FB-4D7D-A81B-20EF8D8D1F4E}"
-EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "LocalPost.SnsPublisher", "src\LocalPost.SnsPublisher\LocalPost.SnsPublisher.csproj", "{D256C568-2B42-4DCC-AB54-15B512A99C44}"
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BackgroundQueueApp", "examples\BackgroundQueueApp\BackgroundQueueApp.csproj", "{46FC61E6-D0FB-4D7D-A81B-20EF8D8D1F4E}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "LocalPost.Tests", "tests\LocalPost.Tests\LocalPost.Tests.csproj", "{0E69A423-5F70-4BA7-8015-0AB0BC4B6FD2}"
EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "LocalPost.SnsPublisher.Tests", "tests\LocalPost.SnsPublisher.Tests\LocalPost.SnsPublisher.Tests.csproj", "{0B8929F4-E220-45A9-A279-41F5D94A8C1B}"
-EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "LocalPost.SqsConsumer", "src\LocalPost.SqsConsumer\LocalPost.SqsConsumer.csproj", "{30232703-C103-4F7A-9822-80F2F680A88D}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "LocalPost.SqsConsumer.Tests", "tests\LocalPost.SqsConsumer.Tests\LocalPost.SqsConsumer.Tests.csproj", "{2F61DCD7-E4CB-4ECC-B24E-A663D12D9C03}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "LocalPost.KafkaConsumer", "src\LocalPost.KafkaConsumer\LocalPost.KafkaConsumer.csproj", "{D9139C53-5B9F-49E7-80DF-41C995C37E2F}"
+EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Examples", "Examples", "{405721DC-F290-4191-B638-9907D5EB042B}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaConsumerApp", "examples\KafkaConsumerApp\KafkaConsumerApp.csproj", "{C310487A-B976-4D3E-80AF-4ADBE1C63139}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SqsConsumerApp", "examples\SqsConsumerApp\SqsConsumerApp.csproj", "{2778AEBD-0345-4F79-9E93-73AFAB6C7BCD}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "LocalPost.KafkaConsumer.Tests", "tests\LocalPost.KafkaConsumer.Tests\LocalPost.KafkaConsumer.Tests.csproj", "{734C9C76-B3D8-4AD7-8E76-B14539C3CB4D}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -28,18 +34,10 @@ Global
{46FC61E6-D0FB-4D7D-A81B-20EF8D8D1F4E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{46FC61E6-D0FB-4D7D-A81B-20EF8D8D1F4E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{46FC61E6-D0FB-4D7D-A81B-20EF8D8D1F4E}.Release|Any CPU.Build.0 = Release|Any CPU
- {D256C568-2B42-4DCC-AB54-15B512A99C44}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
- {D256C568-2B42-4DCC-AB54-15B512A99C44}.Debug|Any CPU.Build.0 = Debug|Any CPU
- {D256C568-2B42-4DCC-AB54-15B512A99C44}.Release|Any CPU.ActiveCfg = Release|Any CPU
- {D256C568-2B42-4DCC-AB54-15B512A99C44}.Release|Any CPU.Build.0 = Release|Any CPU
{0E69A423-5F70-4BA7-8015-0AB0BC4B6FD2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{0E69A423-5F70-4BA7-8015-0AB0BC4B6FD2}.Debug|Any CPU.Build.0 = Debug|Any CPU
{0E69A423-5F70-4BA7-8015-0AB0BC4B6FD2}.Release|Any CPU.ActiveCfg = Release|Any CPU
{0E69A423-5F70-4BA7-8015-0AB0BC4B6FD2}.Release|Any CPU.Build.0 = Release|Any CPU
- {0B8929F4-E220-45A9-A279-41F5D94A8C1B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
- {0B8929F4-E220-45A9-A279-41F5D94A8C1B}.Debug|Any CPU.Build.0 = Debug|Any CPU
- {0B8929F4-E220-45A9-A279-41F5D94A8C1B}.Release|Any CPU.ActiveCfg = Release|Any CPU
- {0B8929F4-E220-45A9-A279-41F5D94A8C1B}.Release|Any CPU.Build.0 = Release|Any CPU
{30232703-C103-4F7A-9822-80F2F680A88D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{30232703-C103-4F7A-9822-80F2F680A88D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{30232703-C103-4F7A-9822-80F2F680A88D}.Release|Any CPU.ActiveCfg = Release|Any CPU
@@ -48,5 +46,26 @@ Global
{2F61DCD7-E4CB-4ECC-B24E-A663D12D9C03}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2F61DCD7-E4CB-4ECC-B24E-A663D12D9C03}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2F61DCD7-E4CB-4ECC-B24E-A663D12D9C03}.Release|Any CPU.Build.0 = Release|Any CPU
+ {D9139C53-5B9F-49E7-80DF-41C995C37E2F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {D9139C53-5B9F-49E7-80DF-41C995C37E2F}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {D9139C53-5B9F-49E7-80DF-41C995C37E2F}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {D9139C53-5B9F-49E7-80DF-41C995C37E2F}.Release|Any CPU.Build.0 = Release|Any CPU
+ {C310487A-B976-4D3E-80AF-4ADBE1C63139}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {C310487A-B976-4D3E-80AF-4ADBE1C63139}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {C310487A-B976-4D3E-80AF-4ADBE1C63139}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {C310487A-B976-4D3E-80AF-4ADBE1C63139}.Release|Any CPU.Build.0 = Release|Any CPU
+ {2778AEBD-0345-4F79-9E93-73AFAB6C7BCD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {2778AEBD-0345-4F79-9E93-73AFAB6C7BCD}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {2778AEBD-0345-4F79-9E93-73AFAB6C7BCD}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {2778AEBD-0345-4F79-9E93-73AFAB6C7BCD}.Release|Any CPU.Build.0 = Release|Any CPU
+ {734C9C76-B3D8-4AD7-8E76-B14539C3CB4D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {734C9C76-B3D8-4AD7-8E76-B14539C3CB4D}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {734C9C76-B3D8-4AD7-8E76-B14539C3CB4D}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {734C9C76-B3D8-4AD7-8E76-B14539C3CB4D}.Release|Any CPU.Build.0 = Release|Any CPU
+ EndGlobalSection
+ GlobalSection(NestedProjects) = preSolution
+ {46FC61E6-D0FB-4D7D-A81B-20EF8D8D1F4E} = {405721DC-F290-4191-B638-9907D5EB042B}
+ {C310487A-B976-4D3E-80AF-4ADBE1C63139} = {405721DC-F290-4191-B638-9907D5EB042B}
+ {2778AEBD-0345-4F79-9E93-73AFAB6C7BCD} = {405721DC-F290-4191-B638-9907D5EB042B}
EndGlobalSection
EndGlobal
diff --git a/README.md b/README.md
index 7ecd402..b7c09c9 100644
--- a/README.md
+++ b/README.md
@@ -1,8 +1,53 @@
# LocalPost
+[](https://www.nuget.org/packages/LocalPost/)
+[](https://sonarcloud.io/project/overview?id=alexeyshockov_LocalPost.NET)
+
Simple .NET in-memory background queue ([System.Threading.Channels](https://learn.microsoft.com/de-de/dotnet/api/system.threading.channels?view=net-6.0) based).
-## Alternatives
+## Background tasks
+
+There are multiple ways to run background tasks in .NET. The most common are:
+
+## Usage
+
+### Installation
+
+For the core library:
+
+```shell
+dotnet add package LocalPost
+```
+
+AWS SQS, Kafka and other integrations are provided as separate packages, like:
+
+```shell
+dotnet add package LocalPost.SqsConsumer
+dotnet add package LocalPost.KafkaConsumer
+```
+
+### .NET 8 asynchronous background services handling
+
+Before version 8 .NET runtime handled start/stop of the services only synchronously, but now it is possible to enable
+concurrent handling of the services. This is done by setting `HostOptions` property `ConcurrentServiceExecution`
+to `true`:
+
+See for details:
+- https://github.com/dotnet/runtime/blob/v8.0.0/src/libraries/Microsoft.Extensions.Hosting/src/Internal/Host.cs
+- https://github.com/dotnet/runtime/blob/main/src/libraries/Microsoft.Extensions.Hosting/src/HostOptions.cs
+
+## Similar projects
+
+- [Coravel queue](https://docs.coravel.net/Queuing/) — a simple job queue
+
+More complex jobs management / scheduling:
+- [Hangfire](https://www.hangfire.io/) — background job scheduler. Supports advanced scheduling, persistence and jobs distribution across multiple workers.
+
+Service bus (for bigger solutions):
+- [JustSaying](https://github.com/justeattakeaway/JustSaying)
+- [NServiceBus](https://docs.particular.net/nservicebus/)
+- [MassTransit](https://masstransit.io/)
+
+## Inspiration
-- [Coravel queue](https://docs.coravel.net/Queuing/)/event broadcasting — only invocable queueing, event broadcasting is different from consuming a queue
-- [Hangfire](https://www.hangfire.io/) — for persistent queues (means payload serialisation), LocalPost is completely about in-memory ones
+- [FastStream](https://github.com/airtai/faststream)
diff --git a/docker-compose.yml b/docker-compose.yml
new file mode 100644
index 0000000..2d3f7a3
--- /dev/null
+++ b/docker-compose.yml
@@ -0,0 +1,105 @@
+name: localpost
+networks:
+ redpanda_network:
+ driver: bridge
+volumes:
+ redpanda:
+ driver: local
+ localstack:
+ driver: local
+services:
+ # https://learn.microsoft.com/en-us/dotnet/aspire/fundamentals/dashboard/standalone
+ # https://hub.docker.com/r/microsoft/dotnet-aspire-dashboard
+ aspire:
+ image: mcr.microsoft.com/dotnet/aspire-dashboard:9.0
+ ports:
+ - "18888:18888" # HTTP
+ - "18889:18889" # OTEL collector GRPC
+ - "18890:18890" # OTEL collector HTTP
+ environment:
+ - DOTNET_DASHBOARD_UNSECURED_ALLOW_ANONYMOUS=true
+# - Dashboard__Otlp__AuthMode=Unsecured
+ # This setting is a shortcut to configuring Dashboard:Frontend:AuthMode and Dashboard:Otlp:AuthMode to Unsecured
+ - ASPIRE_ALLOW_UNSECURED_TRANSPORT=true
+ localstack:
+ # https://docs.localstack.cloud/getting-started/installation/#docker-compose
+ image: localstack/localstack:4
+ ports:
+ - "127.0.0.1:4566:4566" # LocalStack Gateway
+ - "127.0.0.1:4510-4559:4510-4559" # External services port range
+ environment:
+ # LocalStack configuration: https://docs.localstack.cloud/references/configuration/
+ - DEBUG=${DEBUG:-0}
+ - SERVICES=sqs
+ volumes:
+ # Local volume
+ - localstack:/var/lib/localstack
+ # Fixtures, see https://docs.localstack.cloud/references/init-hooks/
+ - ./localstack/init/ready.d:/etc/localstack/init/ready.d" # SQS hooks
+ # Only needed for Lambdas
+# - /var/run/docker.sock:/var/run/docker.sock
+ redpanda:
+ # Mainly from: https://docs.redpanda.com/redpanda-labs/docker-compose/single-broker/
+ # See also: https://docs.redpanda.com/current/deploy/deployment-option/self-hosted/docker-image/
+ image: docker.redpanda.com/redpandadata/redpanda:v24.3.3
+ container_name: redpanda
+ command:
+ - redpanda start
+ - --mode dev-container
+ - --smp 1
+ - --kafka-addr internal://0.0.0.0:9092,external://0.0.0.0:19092
+ # Address the broker advertises to clients that connect to the Kafka API.
+ # Use the internal addresses to connect to the Redpanda brokers
+ # from inside the same Docker network.
+ # Use the external addresses to connect to the Redpanda brokers
+ # from outside the Docker network.
+ - --advertise-kafka-addr internal://redpanda:9092,external://127.0.0.1:19092
+ - --pandaproxy-addr internal://0.0.0.0:8082,external://0.0.0.0:18082
+ # Address the broker advertises to clients that connect to the HTTP Proxy.
+ - --advertise-pandaproxy-addr internal://redpanda:8082,external://127.0.0.1:18082
+ - --schema-registry-addr internal://0.0.0.0:8081,external://0.0.0.0:18081
+ # Redpanda brokers use the RPC API to communicate with each other internally.
+ - --rpc-addr redpanda:33145
+ - --advertise-rpc-addr redpanda:33145
+ ports:
+ - "18081:18081"
+ - "18082:18082"
+ - "19092:19092"
+ - "19644:9644"
+ volumes:
+ - redpanda:/var/lib/redpanda/data
+ networks:
+ - redpanda_network
+# healthcheck:
+# test: [ "CMD-SHELL", "rpk cluster health | grep -E 'Healthy:.+true' || exit 1" ]
+# interval: 15s
+# timeout: 3s
+# retries: 5
+# start_period: 5s
+ redpanda-console:
+ image: docker.redpanda.com/redpandadata/console:v2.8.2
+ entrypoint: /bin/sh
+ command: -c "echo \"$$CONSOLE_CONFIG_FILE\" > /tmp/config.yml; /app/console"
+ environment:
+ CONFIG_FILEPATH: /tmp/config.yml
+ CONSOLE_CONFIG_FILE: |
+ kafka:
+ brokers: ["redpanda:9092"]
+ schemaRegistry:
+ enabled: true
+ urls: ["http://redpanda:8081"]
+ redpanda:
+ adminApi:
+ enabled: true
+ urls: ["http://redpanda:9644"]
+ connect:
+ enabled: true
+ clusters:
+ - name: local-connect-cluster
+ url: http://connect:8083
+ ports:
+ - "8080:8080"
+ networks:
+ - redpanda_network
+ depends_on:
+ - redpanda
diff --git a/examples/BackgroundQueueApp/BackgroundQueueApp.csproj b/examples/BackgroundQueueApp/BackgroundQueueApp.csproj
new file mode 100644
index 0000000..b0638c4
--- /dev/null
+++ b/examples/BackgroundQueueApp/BackgroundQueueApp.csproj
@@ -0,0 +1,26 @@
+
+
+
+ net8
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ appsettings.json
+
+
+
+
diff --git a/samples/SampleWebApp/Controllers/WeatherForecastController.cs b/examples/BackgroundQueueApp/Controllers/WeatherForecastController.cs
similarity index 51%
rename from samples/SampleWebApp/Controllers/WeatherForecastController.cs
rename to examples/BackgroundQueueApp/Controllers/WeatherForecastController.cs
index 05c8458..c3e88e1 100644
--- a/samples/SampleWebApp/Controllers/WeatherForecastController.cs
+++ b/examples/BackgroundQueueApp/Controllers/WeatherForecastController.cs
@@ -1,27 +1,16 @@
-using Amazon.SimpleNotificationService.Model;
using LocalPost;
-using LocalPost.SnsPublisher;
using Microsoft.AspNetCore.Mvc;
-namespace SampleWebApp.Controllers;
+namespace BackgroundQueueApp.Controllers;
[ApiController]
[Route("[controller]")]
-public class WeatherForecastController : ControllerBase
+public class WeatherForecastController(IBackgroundQueue queue) : ControllerBase
{
private static readonly string[] Summaries =
- {
+ [
"Freezing", "Bracing", "Chilly", "Cool", "Mild", "Warm", "Balmy", "Hot", "Sweltering", "Scorching"
- };
-
- private readonly IBackgroundQueue _queue;
- private readonly ISnsPublisher _sns;
-
- public WeatherForecastController(IBackgroundQueue queue, ISnsPublisher sns)
- {
- _queue = queue;
- _sns = sns;
- }
+ ];
[HttpGet(Name = "GetWeatherForecast")]
public async ValueTask> Get()
@@ -33,12 +22,7 @@ public async ValueTask> Get()
Summary = Summaries[Random.Shared.Next(Summaries.Length)]
}).ToArray();
- await _queue.Enqueue(forecasts[0]);
-
- await _sns.ForTopic("arn:aws:sns:eu-central-1:703886664977:test").Enqueue(new PublishBatchRequestEntry
- {
- Message = forecasts[0].Summary
- });
+ await queue.Enqueue(forecasts[0]);
return forecasts;
}
diff --git a/examples/BackgroundQueueApp/Program.cs b/examples/BackgroundQueueApp/Program.cs
new file mode 100644
index 0000000..b3247e9
--- /dev/null
+++ b/examples/BackgroundQueueApp/Program.cs
@@ -0,0 +1,53 @@
+using BackgroundQueueApp;
+using LocalPost;
+using LocalPost.BackgroundQueue;
+using LocalPost.BackgroundQueue.DependencyInjection;
+using LocalPost.Resilience;
+using Polly;
+using Polly.Retry;
+
+var builder = WebApplication.CreateBuilder(args);
+
+// See https://github.com/App-vNext/Polly/blob/main/docs/migration-v8.md
+var resiliencePipeline = new ResiliencePipelineBuilder()
+ .AddRetry(new RetryStrategyOptions
+ {
+ MaxRetryAttempts = 3,
+ Delay = TimeSpan.FromSeconds(1),
+ BackoffType = DelayBackoffType.Constant,
+ ShouldHandle = new PredicateBuilder().Handle()
+ })
+ .AddTimeout(TimeSpan.FromSeconds(3))
+ .Build();
+
+// A background queue with an inline handler
+builder.Services.AddBackgroundQueues(bq =>
+ bq.AddQueue(HandlerStack.For(async (weather, ct) =>
+ {
+ await Task.Delay(TimeSpan.FromSeconds(2), ct);
+ Console.WriteLine(weather.Summary);
+ })
+ .UseMessagePayload()
+ .Scoped()
+ .Trace()
+ .UsePollyPipeline(resiliencePipeline)
+ .LogExceptions()
+ )
+);
+
+builder.Services.AddControllers();
+// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
+builder.Services.AddEndpointsApiExplorer();
+builder.Services.AddSwaggerGen();
+
+var app = builder.Build();
+if (app.Environment.IsDevelopment())
+{
+ app.UseSwagger();
+ app.UseSwaggerUI();
+}
+
+app.UseHttpsRedirection();
+app.UseAuthorization();
+app.MapControllers();
+app.Run();
diff --git a/samples/SampleWebApp/Properties/launchSettings.json b/examples/BackgroundQueueApp/Properties/launchSettings.json
similarity index 81%
rename from samples/SampleWebApp/Properties/launchSettings.json
rename to examples/BackgroundQueueApp/Properties/launchSettings.json
index af9bed4..70edaa4 100644
--- a/samples/SampleWebApp/Properties/launchSettings.json
+++ b/examples/BackgroundQueueApp/Properties/launchSettings.json
@@ -8,8 +8,7 @@
"launchUrl": "swagger",
"applicationUrl": "https://localhost:7003;http://localhost:5103",
"environmentVariables": {
- "ASPNETCORE_ENVIRONMENT": "Development",
- "AWS_PROFILE": "kw-test"
+ "ASPNETCORE_ENVIRONMENT": "Development"
}
}
}
diff --git a/samples/SampleWebApp/WeatherForecast.cs b/examples/BackgroundQueueApp/WeatherForecast.cs
similarity index 88%
rename from samples/SampleWebApp/WeatherForecast.cs
rename to examples/BackgroundQueueApp/WeatherForecast.cs
index 72eee54..a7947f4 100644
--- a/samples/SampleWebApp/WeatherForecast.cs
+++ b/examples/BackgroundQueueApp/WeatherForecast.cs
@@ -1,4 +1,4 @@
-namespace SampleWebApp;
+namespace BackgroundQueueApp;
public class WeatherForecast
{
diff --git a/samples/SampleWebApp/appsettings.Development.json b/examples/BackgroundQueueApp/appsettings.Development.json
similarity index 100%
rename from samples/SampleWebApp/appsettings.Development.json
rename to examples/BackgroundQueueApp/appsettings.Development.json
diff --git a/samples/SampleWebApp/appsettings.json b/examples/BackgroundQueueApp/appsettings.json
similarity index 100%
rename from samples/SampleWebApp/appsettings.json
rename to examples/BackgroundQueueApp/appsettings.json
diff --git a/examples/Directory.Build.props b/examples/Directory.Build.props
new file mode 100644
index 0000000..aad1d7c
--- /dev/null
+++ b/examples/Directory.Build.props
@@ -0,0 +1,16 @@
+
+
+
+ 13
+ enable
+ enable
+ true
+
+ false
+
+
+
+
+
+
+
diff --git a/examples/KafkaConsumerApp/KafkaConsumerApp.csproj b/examples/KafkaConsumerApp/KafkaConsumerApp.csproj
new file mode 100644
index 0000000..d25a35e
--- /dev/null
+++ b/examples/KafkaConsumerApp/KafkaConsumerApp.csproj
@@ -0,0 +1,32 @@
+
+
+
+ net8
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ appsettings.json
+
+
+
+
diff --git a/examples/KafkaConsumerApp/Program.cs b/examples/KafkaConsumerApp/Program.cs
new file mode 100644
index 0000000..2cd5b02
--- /dev/null
+++ b/examples/KafkaConsumerApp/Program.cs
@@ -0,0 +1,55 @@
+using System.Text.Json;
+using Confluent.Kafka;
+using JetBrains.Annotations;
+using LocalPost;
+using LocalPost.KafkaConsumer;
+using LocalPost.KafkaConsumer.DependencyInjection;
+
+var builder = Host.CreateApplicationBuilder(args);
+
+builder.Services.Configure(options =>
+{
+ options.ServicesStartConcurrently = true;
+ options.ServicesStopConcurrently = true;
+});
+
+builder.Services
+ .AddScoped()
+ .AddKafkaConsumers(kafka =>
+ {
+ kafka.Defaults
+ .Bind(builder.Configuration.GetSection("Kafka"))
+ .ValidateDataAnnotations();
+ kafka.AddConsumer("example-consumer-group",
+ HandlerStack.From()
+ .Scoped()
+ .UseKafkaPayload()
+ .Deserialize(context => JsonSerializer.Deserialize(context.Payload)!)
+ .Trace()
+ // .Acknowledge()
+ .LogExceptions()
+ )
+ .Bind(builder.Configuration.GetSection("Kafka:Consumer"))
+ .Configure(options =>
+ {
+ options.ClientConfig.AutoOffsetReset = AutoOffsetReset.Earliest;
+ // options.ClientConfig.EnableAutoCommit = false; // DryRun
+ // options.ClientConfig.EnableAutoOffsetStore = false; // Manually acknowledge every message
+ })
+ .ValidateDataAnnotations();
+ });
+
+await builder.Build().RunAsync();
+
+
+[UsedImplicitly]
+public record WeatherForecast(int TemperatureC, int TemperatureF, string Summary);
+
+internal sealed class MessageHandler : IHandler
+{
+ public ValueTask InvokeAsync(WeatherForecast payload, CancellationToken ct)
+ {
+ Console.WriteLine(payload);
+ return ValueTask.CompletedTask;
+ }
+}
diff --git a/examples/KafkaConsumerApp/Properties/launchSettings.json b/examples/KafkaConsumerApp/Properties/launchSettings.json
new file mode 100644
index 0000000..0f2fe16
--- /dev/null
+++ b/examples/KafkaConsumerApp/Properties/launchSettings.json
@@ -0,0 +1,11 @@
+{
+ "profiles": {
+ "KafkaConsumerApp": {
+ "commandName": "Project",
+ "dotnetRunMessages": true,
+ "environmentVariables": {
+ "DOTNET_ENVIRONMENT": "Development"
+ }
+ }
+ }
+}
diff --git a/examples/KafkaConsumerApp/README.md b/examples/KafkaConsumerApp/README.md
new file mode 100644
index 0000000..3a9614d
--- /dev/null
+++ b/examples/KafkaConsumerApp/README.md
@@ -0,0 +1,3 @@
+# Kafka Consumer
+
+See https://github.com/confluentinc/confluent-kafka-dotnet/tree/master/examples/Protobuf
diff --git a/examples/KafkaConsumerApp/appsettings.Development.json b/examples/KafkaConsumerApp/appsettings.Development.json
new file mode 100644
index 0000000..0967ef4
--- /dev/null
+++ b/examples/KafkaConsumerApp/appsettings.Development.json
@@ -0,0 +1 @@
+{}
diff --git a/examples/KafkaConsumerApp/appsettings.json b/examples/KafkaConsumerApp/appsettings.json
new file mode 100644
index 0000000..c3e961b
--- /dev/null
+++ b/examples/KafkaConsumerApp/appsettings.json
@@ -0,0 +1,14 @@
+{
+ "Logging": {
+ "LogLevel": {
+ "Default": "Information",
+ "Microsoft.Hosting.Lifetime": "Information"
+ }
+ },
+ "Kafka": {
+ "BootstrapServers": "127.0.0.1:19092",
+ "Consumer": {
+ "Topic": "weather-forecasts"
+ }
+ }
+}
diff --git a/examples/SqsConsumerApp/Program.cs b/examples/SqsConsumerApp/Program.cs
new file mode 100644
index 0000000..4aae3c1
--- /dev/null
+++ b/examples/SqsConsumerApp/Program.cs
@@ -0,0 +1,103 @@
+using Amazon.SQS;
+using JetBrains.Annotations;
+using LocalPost;
+using LocalPost.SqsConsumer;
+using LocalPost.SqsConsumer.DependencyInjection;
+using OpenTelemetry;
+using OpenTelemetry.Metrics;
+using OpenTelemetry.Trace;
+using Serilog;
+using Serilog.Sinks.FingersCrossed;
+
+var builder = Host.CreateApplicationBuilder(args);
+
+builder.Services
+ .AddSerilog() // See https://nblumhardt.com/2024/04/serilog-net8-0-minimal/#hooking-up-aspnet-core-and-iloggert
+ .AddDefaultAWSOptions(builder.Configuration.GetAWSOptions())
+ .AddAWSService();
+
+builder.Services.Configure(options =>
+{
+ options.ServicesStartConcurrently = true;
+ options.ServicesStopConcurrently = true;
+});
+
+#region OpenTelemetry
+
+// See also: https://learn.microsoft.com/en-us/dotnet/core/diagnostics/observability-otlp-example
+
+// To use full potential of Serilog, it's better to use Serilog.Sinks.OpenTelemetry,
+// see https://github.com/Blind-Striker/dotnet-otel-aspire-localstack-demo as an example
+// builder.Logging.AddOpenTelemetry(logging =>
+// {
+// logging.IncludeFormattedMessage = true;
+// logging.IncludeScopes = true;
+// });
+
+builder.Services.AddOpenTelemetry()
+ .WithMetrics(metrics => metrics
+ .AddAWSInstrumentation())
+ .WithTracing(tracing => tracing
+ .AddSource("LocalPost.*")
+ .AddAWSInstrumentation())
+ .UseOtlpExporter();
+
+#endregion
+
+builder.Services
+ .AddScoped()
+ .AddSqsConsumers(sqs =>
+ {
+ sqs.Defaults.Configure(options => options.MaxNumberOfMessages = 1);
+ sqs.AddConsumer("weather-forecasts", // Also acts as a queue name
+ HandlerStack.From()
+ .Scoped()
+ .UseSqsPayload()
+ .DeserializeJson()
+ .Trace()
+ .Acknowledge() // Do not include DeleteMessage call in the OpenTelemetry root span (transaction)
+ .LogFingersCrossed()
+ .LogExceptions()
+ );
+ });
+
+await builder.Build().RunAsync();
+
+
+[UsedImplicitly]
+public record WeatherForecast(int TemperatureC, int TemperatureF, string Summary);
+
+public class MessageHandler : IHandler
+{
+ public async ValueTask InvokeAsync(WeatherForecast payload, CancellationToken ct)
+ {
+ await Task.Delay(1_000, ct);
+ Console.WriteLine(payload);
+
+ // To show the failure handling
+ if (payload.TemperatureC > 35)
+ throw new InvalidOperationException("Too hot");
+ }
+}
+
+public static class HandlerStackEx
+{
+ public static HandlerManagerFactory LogFingersCrossed(this HandlerManagerFactory hmf) =>
+ hmf.TouchHandler(next => async (context, ct) =>
+ {
+ using var logBuffer = LogBuffer.BeginScope();
+ try
+ {
+ await next(context, ct);
+ }
+ catch (OperationCanceledException e) when (e.CancellationToken == ct)
+ {
+ throw; // Do not treat cancellation as an error
+ }
+ catch (Exception)
+ {
+ logBuffer.Flush();
+ throw;
+ }
+ });
+}
diff --git a/examples/SqsConsumerApp/Properties/launchSettings.json b/examples/SqsConsumerApp/Properties/launchSettings.json
new file mode 100644
index 0000000..4328853
--- /dev/null
+++ b/examples/SqsConsumerApp/Properties/launchSettings.json
@@ -0,0 +1,16 @@
+{
+ "profiles": {
+ "SqsConsumerApp": {
+ "commandName": "Project",
+ "dotnetRunMessages": true,
+ "environmentVariables": {
+ "DOTNET_ENVIRONMENT": "Development",
+ "AWS_ACCESS_KEY_ID": "test",
+ "AWS_SECRET_ACCESS_KEY": "test",
+ "OTEL_SERVICE_NAME": "SampleSqsConsumer",
+ "OTEL_EXPORTER_OTLP_PROTOCOL": "grpc",
+ "OTEL_EXPORTER_OTLP_ENDPOINT": "http://127.0.0.1:18889"
+ }
+ }
+ }
+}
diff --git a/examples/SqsConsumerApp/README.md b/examples/SqsConsumerApp/README.md
new file mode 100644
index 0000000..6261c32
--- /dev/null
+++ b/examples/SqsConsumerApp/README.md
@@ -0,0 +1,32 @@
+# SQS Consumer Sample App
+
+## Setup
+
+### Local infrastructure
+
+`docker compose up -d` to spin up the localstack & Aspire containers
+
+### SQS queue
+
+```shell
+aws --endpoint-url=http://localhost:4566 --region=us-east-1 --no-sign-request \
+ sqs create-queue --queue-name "weather-forecasts"
+```
+
+To get the queue URL:
+
+```shell
+aws --endpoint-url=http://localhost:4566 --region=us-east-1 --no-sign-request \
+ sqs get-queue-url --queue-name "weather-forecasts" --query "QueueUrl"
+```
+
+## Run
+
+To see that the consumer is working, you can send a message to the queue using the AWS CLI:
+
+```shell
+aws --endpoint-url=http://localhost:4566 --region=us-east-1 --no-sign-request \
+ sqs send-message \
+ --queue-url "http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/weather-forecasts" \
+ --message-body '{"TemperatureC": 25, "TemperatureF": 77, "Summary": "not hot, not cold, perfect"}'
+```
diff --git a/examples/SqsConsumerApp/SqsConsumerApp.csproj b/examples/SqsConsumerApp/SqsConsumerApp.csproj
new file mode 100644
index 0000000..6105d44
--- /dev/null
+++ b/examples/SqsConsumerApp/SqsConsumerApp.csproj
@@ -0,0 +1,37 @@
+
+
+
+ net8
+
+ CA1050
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ appsettings.json
+
+
+
+
diff --git a/examples/SqsConsumerApp/appsettings.Development.json b/examples/SqsConsumerApp/appsettings.Development.json
new file mode 100644
index 0000000..ef8e3e4
--- /dev/null
+++ b/examples/SqsConsumerApp/appsettings.Development.json
@@ -0,0 +1,5 @@
+{
+ "AWS": {
+ "ServiceURL": "http://127.0.0.1:4566"
+ }
+}
diff --git a/examples/SqsConsumerApp/appsettings.json b/examples/SqsConsumerApp/appsettings.json
new file mode 100644
index 0000000..b2dcdb6
--- /dev/null
+++ b/examples/SqsConsumerApp/appsettings.json
@@ -0,0 +1,8 @@
+{
+ "Logging": {
+ "LogLevel": {
+ "Default": "Information",
+ "Microsoft.Hosting.Lifetime": "Information"
+ }
+ }
+}
diff --git a/localstack/init/ready.d/sqs.sh b/localstack/init/ready.d/sqs.sh
new file mode 100644
index 0000000..d47ca0d
--- /dev/null
+++ b/localstack/init/ready.d/sqs.sh
@@ -0,0 +1,13 @@
+#!/usr/bin/env bash
+
+set -euo pipefail
+
+# Enable debug
+#set -x
+
+awslocal sqs create-queue --queue-name lp-test
+QUEUE_URL=$(awslocal sqs get-queue-url --queue-name lp-test --query 'QueueUrl' --output text)
+
+awslocal sqs send-message \
+ --queue-url "$QUEUE_URL" \
+ --message-body '{"TemperatureC": 25, "TemperatureF": 77, "Summary": "not hot, not cold, perfect"}'
diff --git a/samples/SampleWebApp/Program.cs b/samples/SampleWebApp/Program.cs
deleted file mode 100644
index 2b58949..0000000
--- a/samples/SampleWebApp/Program.cs
+++ /dev/null
@@ -1,51 +0,0 @@
-using Amazon.SimpleNotificationService;
-using Amazon.SQS;
-using LocalPost.SnsPublisher.DependencyInjection;
-using LocalPost.DependencyInjection;
-using SampleWebApp;
-using LocalPost.SqsConsumer.DependencyInjection;
-
-var builder = WebApplication.CreateBuilder(args);
-
-
-
-// A background queue with an inline handler
-builder.Services.AddBackgroundQueue(_ => async (w, ct) =>
-{
- await Task.Delay(TimeSpan.FromSeconds(2), ct);
- Console.WriteLine(w.Summary);
-});
-
-
-
-// An async Amazon SNS sender, buffers messages and sends them in batches in the background
-builder.Services.AddAWSService();
-builder.Services.AddAmazonSnsBatchPublisher();
-
-
-
-// An Amazon SQS consumer
-builder.Services.AddAWSService();
-builder.Services.AddAmazonSqsMinimalConsumer("test", async (context, ct) =>
-{
- await Task.Delay(1_000, ct);
- Console.WriteLine(context.Body);
-});
-
-
-
-builder.Services.AddControllers();
-// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
-builder.Services.AddEndpointsApiExplorer();
-builder.Services.AddSwaggerGen();
-
-var app = builder.Build();
-if (app.Environment.IsDevelopment())
-{
- app.UseSwagger();
- app.UseSwaggerUI();
-}
-app.UseHttpsRedirection();
-app.UseAuthorization();
-app.MapControllers();
-app.Run();
diff --git a/samples/SampleWebApp/SampleWebApp.csproj b/samples/SampleWebApp/SampleWebApp.csproj
deleted file mode 100644
index 06fa56d..0000000
--- a/samples/SampleWebApp/SampleWebApp.csproj
+++ /dev/null
@@ -1,20 +0,0 @@
-
-
-
- net7
- enable
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/sonar-scan.sh b/sonar-scan.sh
index 3252ee4..7296c1f 100755
--- a/sonar-scan.sh
+++ b/sonar-scan.sh
@@ -1,9 +1,6 @@
#!/usr/bin/env bash
-# Print a command before actually executing it
-set -x
-# Break the script if one of the command fails (returns non-zero status code)
-set -e
+set -o xtrace,errexit
# $SONAR_TOKEN must be defined
# $GitVersion_FullSemVer can be used to specify the current version (see GitVersion)
@@ -15,9 +12,10 @@ fi
dotnet build-server shutdown
dotnet sonarscanner begin \
- /d:sonar.host.url="https://sonarcloud.io" /d:sonar.login="$SONAR_TOKEN" \
- /o:"alexeyshockov" /k:"alexeyshockov_LocalPost" $VERSION \
+ /d:sonar.host.url="https://sonarcloud.io" /d:sonar.token="$SONAR_TOKEN" \
+ /o:"alexeyshockov" /k:"alexeyshockov_LocalPost.NET" "$VERSION" \
/d:sonar.dotnet.excludeTestProjects=true \
+ /d:sonar.coverage.exclusions="**/examples/**" \
/d:sonar.cs.opencover.reportsPaths="tests/*/TestResults/*/coverage.opencover.xml" \
/d:sonar.cs.vstest.reportsPaths="tests/*/TestResults/*.trx"
@@ -25,4 +23,4 @@ dotnet sonarscanner begin \
dotnet build
dotnet test --no-build --collect:"XPlat Code Coverage" --settings coverlet.runsettings --logger=trx
-dotnet sonarscanner end /d:sonar.login="$SONAR_TOKEN"
+dotnet sonarscanner end /d:sonar.token="$SONAR_TOKEN"
diff --git a/src/LocalPost.SnsPublisher/LocalPost.SnsPublisher.csproj b/src/Directory.Build.props
similarity index 50%
rename from src/LocalPost.SnsPublisher/LocalPost.SnsPublisher.csproj
rename to src/Directory.Build.props
index bf2064c..6320993 100644
--- a/src/LocalPost.SnsPublisher/LocalPost.SnsPublisher.csproj
+++ b/src/Directory.Build.props
@@ -1,28 +1,23 @@
-
+
- netstandard2.0
+ 13
+ enable
+ enable
+ true
+
true
false
- LocalPost.SnsPublisher
- background;task;queue;amazon;sns;aws
- Local (in-process) background queue for sending to Amazon SNS.
Alexey Shokov
-
- README.md
+ https://github.com/alexeyshockov/LocalPost.NET/releases/tag/v$(Version)
MIT
- https://github.com/alexeyshockov/LocalPost
+ https://github.com/alexeyshockov/LocalPost.NET
git
true
-
-
-
-
-
true
@@ -40,17 +35,11 @@
-
-
-
-
-
-
-
+
+ <_Parameter1>$(MSBuildProjectName).Tests
+
+
+ <_Parameter1>DynamicProxyGenAssembly2
+
-
-
-
-
-
diff --git a/src/LocalPost.KafkaConsumer/Client.cs b/src/LocalPost.KafkaConsumer/Client.cs
new file mode 100644
index 0000000..f31938e
--- /dev/null
+++ b/src/LocalPost.KafkaConsumer/Client.cs
@@ -0,0 +1,98 @@
+using System.Collections;
+using Confluent.Kafka;
+
+namespace LocalPost.KafkaConsumer;
+
+internal sealed class ClientFactory(ILogger logger, ConsumerOptions settings)
+{
+ public async Task Create(CancellationToken ct)
+ {
+ return new Clients(await Task.WhenAll(Enumerable
+ .Range(0, settings.Consumers)
+ .Select(_ => Task.Run(CreateClient, ct))
+ ).ConfigureAwait(false));
+
+ Client CreateClient()
+ {
+ var consumer = new ConsumerBuilder(settings.ClientConfig)
+ .SetErrorHandler((_, e) => logger.LogError("{Error}", e))
+ .SetLogHandler((_, m) => logger.LogDebug(m.Message))
+ .Build();
+ consumer.Subscribe(settings.Topics);
+ return new Client(logger, consumer, settings.ClientConfig);
+ }
+ }
+}
+
+internal sealed class Clients(Client[] clients) : IReadOnlyCollection
+{
+ public Task Close(CancellationToken ct) => Task.WhenAll(clients.Select(client => Task.Run(client.Close, ct)));
+
+ public IEnumerator GetEnumerator() => ((IEnumerable)clients).GetEnumerator();
+
+ IEnumerator IEnumerable.GetEnumerator() => clients.GetEnumerator();
+
+ public int Count => clients.Length;
+}
+
+internal sealed class Client
+{
+ private readonly ILogger _logger;
+
+ public Client(ILogger logger, IConsumer consumer, ConsumerConfig config)
+ {
+ _logger = logger;
+ Consumer = consumer;
+ Config = config;
+ var server = config.BootstrapServers.Split(',')[0].Split(':');
+ ServerAddress = server[0];
+ if (server.Length > 1)
+ ServerPort = int.Parse(server[1]);
+ }
+
+ public ConsumeResult Consume(CancellationToken ct)
+ {
+ while (true)
+ {
+ try
+ {
+ var result = Consumer.Consume(ct);
+
+ if (result is not null && result.IsPartitionEOF)
+ _logger.LogInformation("End of {Partition} on {Topic}",
+ result.TopicPartition.Partition, result.TopicPartition.Topic);
+ else if (result?.Message is not null)
+ return result;
+ else
+ _logger.LogWarning("Kafka consumer empty receive");
+ }
+ // catch (ConsumeException e)
+ catch (KafkaException e) when (!e.Error.IsFatal)
+ {
+ _logger.LogCritical(e, "Kafka consumer (retryable) error: {Reason}", e.Error.Reason);
+ // Just continue receiving
+
+ // "generally, the producer should recover from all errors, except where marked fatal" as per
+ // https://github.com/confluentinc/confluent-kafka-dotnet/issues/1213#issuecomment-599772818, so
+ // just continue polling
+ }
+ }
+ }
+
+ public void Close()
+ {
+ try
+ {
+ Consumer.Close();
+ }
+ finally
+ {
+ Consumer.Dispose();
+ }
+ }
+
+ public IConsumer Consumer { get; }
+ public ConsumerConfig Config { get; }
+ public string ServerAddress { get; }
+ public int ServerPort { get; } = 9092;
+}
diff --git a/src/LocalPost.KafkaConsumer/ConsumeContext.cs b/src/LocalPost.KafkaConsumer/ConsumeContext.cs
new file mode 100644
index 0000000..6674045
--- /dev/null
+++ b/src/LocalPost.KafkaConsumer/ConsumeContext.cs
@@ -0,0 +1,52 @@
+using Confluent.Kafka;
+
+namespace LocalPost.KafkaConsumer;
+
+[PublicAPI]
+public readonly record struct ConsumeContext
+{
+ // librdkafka docs:
+ // When consumer restarts this is where it will start consuming from.
+ // The committed offset should be last_message_offset+1.
+ // See https://github.com/confluentinc/librdkafka/wiki/Consumer-offset-management#terminology
+// internal readonly TopicPartitionOffset NextOffset;
+
+ internal readonly Client Client;
+ internal readonly ConsumeResult ConsumeResult;
+ public readonly T Payload;
+
+ internal ConsumeContext(Client client, ConsumeResult consumeResult, T payload)
+ {
+ Client = client;
+ ConsumeResult = consumeResult;
+ Payload = payload;
+ }
+
+ public void Deconstruct(out T payload, out IReadOnlyList headers)
+ {
+ payload = Payload;
+ headers = Headers;
+ }
+
+ public Offset NextOffset => ConsumeResult.Offset + 1;
+
+ public Message Message => ConsumeResult.Message;
+
+ public string Topic => ConsumeResult.Topic;
+
+ public IReadOnlyList Headers => Message.Headers.BackingList;
+
+ public ConsumeContext Transform(TOut payload) => new(Client, ConsumeResult, payload);
+
+ public ConsumeContext Transform(Func, TOut> transform) => Transform(transform(this));
+
+ public async Task> Transform(Func, Task> transform) =>
+ Transform(await transform(this).ConfigureAwait(false));
+
+ public static implicit operator T(ConsumeContext context) => context.Payload;
+
+ public void StoreOffset() => Client.Consumer.StoreOffset(ConsumeResult);
+
+ // To be consistent across different message brokers
+ public void Acknowledge() => StoreOffset();
+}
diff --git a/src/LocalPost.KafkaConsumer/Consumer.cs b/src/LocalPost.KafkaConsumer/Consumer.cs
new file mode 100644
index 0000000..a8d4737
--- /dev/null
+++ b/src/LocalPost.KafkaConsumer/Consumer.cs
@@ -0,0 +1,126 @@
+using Confluent.Kafka;
+using LocalPost.DependencyInjection;
+using LocalPost.Flow;
+using Microsoft.Extensions.Diagnostics.HealthChecks;
+using Microsoft.Extensions.Hosting;
+
+namespace LocalPost.KafkaConsumer;
+
+internal sealed class Consumer(string name, ILogger logger,
+ ClientFactory clientFactory, IHandlerManager> hm)
+ : IHostedService, IHealthAwareService, IDisposable
+{
+ private Clients _clients = new([]);
+
+ private CancellationTokenSource? _execTokenSource;
+ private Task? _exec;
+ private Exception? _execException;
+ private string? _execExceptionDescription;
+
+ private CancellationToken _completionToken = CancellationToken.None;
+
+ private HealthCheckResult Ready => (_execTokenSource, _execution: _exec, _execException) switch
+ {
+ (null, _, _) => HealthCheckResult.Unhealthy("Not started"),
+ (_, { IsCompleted: true }, _) => HealthCheckResult.Unhealthy("Stopped"),
+ (not null, null, _) => HealthCheckResult.Degraded("Starting"),
+ (not null, not null, null) => HealthCheckResult.Healthy("Running"),
+ (_, _, not null) => HealthCheckResult.Unhealthy(_execExceptionDescription, _execException),
+ };
+
+ public IHealthCheck ReadinessCheck => HealthChecks.From(() => Ready);
+
+ private async Task RunConsumerAsync(Client client, Handler> handler, CancellationToken execToken)
+ {
+ // (Optionally) wait for app start
+
+ try
+ {
+ while (!execToken.IsCancellationRequested)
+ {
+ var result = client.Consume(execToken);
+ var context = new ConsumeContext(client, result, result.Message.Value);
+ await handler(context, CancellationToken.None).ConfigureAwait(false);
+ }
+ }
+ catch (OperationCanceledException e) when (e.CancellationToken == execToken)
+ {
+ // logger.LogInformation("Kafka consumer shutdown");
+ }
+ catch (KafkaException e)
+ {
+ logger.LogCritical(e, "Kafka consumer error: {Reason} (see {HelpLink})", e.Error.Reason, e.HelpLink);
+ (_execException, _execExceptionDescription) = (e, "Kafka consumer failed");
+ }
+ catch (Exception e)
+ {
+ logger.LogCritical(e, "Kafka message handler error");
+ // TODO Include headers or the partition key in check result's data
+ (_execException, _execExceptionDescription) = (e, "Message handler failed");
+ }
+ finally
+ {
+ CancelExecution(); // Stop other consumers too
+ }
+ }
+
+ public async Task StartAsync(CancellationToken ct)
+ {
+ if (_execTokenSource is not null)
+ throw new InvalidOperationException("Service is already started");
+
+ var execTokenSource = _execTokenSource = new CancellationTokenSource();
+
+ logger.LogInformation("Starting Kafka consumer...");
+ var clients = _clients = await clientFactory.Create(ct).ConfigureAwait(false);
+ logger.LogInformation("Kafka consumer started");
+
+ logger.LogDebug("Invoking the event handler...");
+ var handler = await hm.Start(ct).ConfigureAwait(false);
+ logger.LogDebug("Event handler started");
+
+ _exec = ObserveExecution();
+ return;
+
+ async Task ObserveExecution()
+ {
+ try
+ {
+ var executions = clients.Select(client =>
+ Task.Run(() => RunConsumerAsync(client, handler, execTokenSource.Token), ct)
+ ).ToArray();
+ await (executions.Length == 1 ? executions[0] : Task.WhenAll(executions)).ConfigureAwait(false);
+
+ await hm.Stop(_execException, _completionToken).ConfigureAwait(false);
+ }
+ finally
+ {
+ // Can happen before the service shutdown, in case of an error
+ await _clients.Close(_completionToken).ConfigureAwait(false);
+ logger.LogInformation("Kafka consumer stopped");
+ }
+ }
+ }
+
+ // await _execTokenSource.CancelAsync(); // .NET 8+
+ private void CancelExecution() => _execTokenSource?.Cancel();
+
+ public async Task StopAsync(CancellationToken forceShutdownToken)
+ {
+ if (_execTokenSource is null)
+ throw new InvalidOperationException("Service has not been started");
+
+ logger.LogInformation("Shutting down Kafka consumer...");
+
+ _completionToken = forceShutdownToken;
+ CancelExecution();
+ if (_exec is not null)
+ await _exec.ConfigureAwait(false);
+ }
+
+ public void Dispose()
+ {
+ _execTokenSource?.Dispose();
+ _exec?.Dispose();
+ }
+}
diff --git a/src/LocalPost.KafkaConsumer/DependencyInjection/HealthChecksBuilderEx.cs b/src/LocalPost.KafkaConsumer/DependencyInjection/HealthChecksBuilderEx.cs
new file mode 100644
index 0000000..a4fb1c3
--- /dev/null
+++ b/src/LocalPost.KafkaConsumer/DependencyInjection/HealthChecksBuilderEx.cs
@@ -0,0 +1,22 @@
+using LocalPost.DependencyInjection;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Diagnostics.HealthChecks;
+
+namespace LocalPost.KafkaConsumer.DependencyInjection;
+
+[PublicAPI]
+public static class HealthChecksBuilderEx
+{
+ public static IHealthChecksBuilder AddKafkaConsumer(this IHealthChecksBuilder builder,
+ string name, HealthStatus? failureStatus = null, IEnumerable? tags = null) =>
+ builder.Add(HealthChecks.Readiness(name, failureStatus, tags));
+
+ public static IHealthChecksBuilder AddKafkaConsumers(this IHealthChecksBuilder builder,
+ HealthStatus? failureStatus = null, IEnumerable? tags = null)
+ {
+ foreach (var name in builder.Services.GetKeysFor().OfType())
+ AddKafkaConsumer(builder, name, failureStatus, tags);
+
+ return builder;
+ }
+}
diff --git a/src/LocalPost.KafkaConsumer/DependencyInjection/KafkaBuilder.cs b/src/LocalPost.KafkaConsumer/DependencyInjection/KafkaBuilder.cs
new file mode 100644
index 0000000..cd5e200
--- /dev/null
+++ b/src/LocalPost.KafkaConsumer/DependencyInjection/KafkaBuilder.cs
@@ -0,0 +1,58 @@
+using Confluent.Kafka;
+using LocalPost.DependencyInjection;
+using LocalPost.Flow;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Options;
+
+namespace LocalPost.KafkaConsumer.DependencyInjection;
+
+[PublicAPI]
+public sealed class KafkaBuilder(IServiceCollection services)
+{
+ public OptionsBuilder Defaults { get; } = services.AddOptions();
+
+ ///
+ /// Add a Kafka consumer with a custom message handler.
+ ///
+ /// Message handler factory.
+ /// Consumer options builder.
+ public OptionsBuilder AddConsumer(HandlerManagerFactory> hmf) =>
+ AddConsumer(Options.DefaultName, hmf);
+
+ ///
+ /// Add a Kafka consumer with a custom message handler.
+ ///
+ /// Consumer name (should be unique in the application). Also, the default group ID.
+ /// Message handler factory.
+ /// Consumer options builder.
+ public OptionsBuilder AddConsumer(string name, HandlerManagerFactory> hmf)
+ {
+ var added = services.TryAddKeyedSingleton(name, (provider, _) =>
+ {
+ var clientFactory = new ClientFactory(
+ provider.GetLoggerFor(),
+ provider.GetOptions(name)
+ );
+
+ return new Consumer(name,
+ provider.GetLoggerFor(),
+ clientFactory,
+ hmf(provider)
+ );
+ });
+
+ if (!added)
+ throw new ArgumentException("Consumer is already registered", nameof(name));
+
+ services.AddHostedService(provider => provider.GetRequiredKeyedService(name));
+
+ return OptionsFor(name).Configure>((co, defaults) =>
+ {
+ co.EnrichFrom(defaults.Value);
+ if (!string.IsNullOrEmpty(name))
+ co.ClientConfig.GroupId = name;
+ });
+ }
+
+ public OptionsBuilder OptionsFor(string name) => services.AddOptions(name);
+}
diff --git a/src/LocalPost.KafkaConsumer/DependencyInjection/ServiceCollectionEx.cs b/src/LocalPost.KafkaConsumer/DependencyInjection/ServiceCollectionEx.cs
new file mode 100644
index 0000000..67d65f4
--- /dev/null
+++ b/src/LocalPost.KafkaConsumer/DependencyInjection/ServiceCollectionEx.cs
@@ -0,0 +1,14 @@
+using Microsoft.Extensions.DependencyInjection;
+
+namespace LocalPost.KafkaConsumer.DependencyInjection;
+
+[PublicAPI]
+public static class ServiceCollectionEx
+{
+ public static IServiceCollection AddKafkaConsumers(this IServiceCollection services, Action configure)
+ {
+ configure(new KafkaBuilder(services));
+
+ return services;
+ }
+}
diff --git a/src/LocalPost.KafkaConsumer/Exceptions.cs b/src/LocalPost.KafkaConsumer/Exceptions.cs
new file mode 100644
index 0000000..782d421
--- /dev/null
+++ b/src/LocalPost.KafkaConsumer/Exceptions.cs
@@ -0,0 +1,10 @@
+using Confluent.Kafka;
+
+namespace LocalPost.KafkaConsumer;
+
+internal static class Exceptions
+{
+ public static bool IsTransient(this ConsumeException exception) =>
+ // See https://github.com/confluentinc/confluent-kafka-dotnet/issues/1424#issuecomment-705749252
+ exception.Error.Code is ErrorCode.Local_KeyDeserialization or ErrorCode.Local_ValueDeserialization;
+}
diff --git a/src/LocalPost.KafkaConsumer/HandlerStackEx.cs b/src/LocalPost.KafkaConsumer/HandlerStackEx.cs
new file mode 100644
index 0000000..8d77325
--- /dev/null
+++ b/src/LocalPost.KafkaConsumer/HandlerStackEx.cs
@@ -0,0 +1,150 @@
+using Confluent.Kafka;
+
+namespace LocalPost.KafkaConsumer;
+
+using MessageHmf = HandlerManagerFactory>;
+using MessagesHmf = HandlerManagerFactory>>;
+
+[PublicAPI]
+public static class HandlerStackEx
+{
+ public static HandlerManagerFactory> UseKafkaPayload(this HandlerManagerFactory hmf) =>
+ hmf.MapHandler, T>(next => async (context, ct) =>
+ await next(context.Payload, ct).ConfigureAwait(false));
+
+ public static HandlerManagerFactory>> UseKafkaPayload(
+ this HandlerManagerFactory> hmf) =>
+ hmf.MapHandler>, IReadOnlyCollection>(next => async (batch, ct) =>
+ await next(batch.Select(context => context.Payload).ToArray(), ct).ConfigureAwait(false));
+
+ public static HandlerManagerFactory> Trace(
+ this HandlerManagerFactory> hmf) =>
+ hmf.TouchHandler(next => async (context, ct) =>
+ {
+ using var activity = Tracing.StartProcessing(context);
+ try
+ {
+ await next(context, ct).ConfigureAwait(false);
+ activity?.Success();
+ }
+ catch (Exception e)
+ {
+ activity?.Error(e);
+ throw;
+ }
+ });
+
+ public static HandlerManagerFactory>> Trace(
+ this HandlerManagerFactory>> hmf) =>
+ hmf.TouchHandler(next => async (batch, ct) =>
+ {
+ using var activity = Tracing.StartProcessing(batch);
+ try
+ {
+ await next(batch, ct).ConfigureAwait(false);
+ activity?.Success();
+ }
+ catch (Exception e)
+ {
+ activity?.Error(e);
+ throw;
+ }
+ });
+
+ ///
+ /// Manually acknowledge every message (store offset).
+ ///
+ /// Works only when EnableAutoOffsetStore is false!
+ ///
+ /// Message handler factory.
+ /// Message type.
+ /// Wrapped handler factory.
+ public static HandlerManagerFactory> Acknowledge(
+ this HandlerManagerFactory> hmf) =>
+ hmf.TouchHandler(next => async (context, ct) =>
+ {
+ await next(context, ct).ConfigureAwait(false);
+ context.StoreOffset();
+ });
+
+ ///
+ /// Manually acknowledge every message (store offset).
+ ///
+ /// Works only when EnableAutoOffsetStore is false!
+ ///
+ /// Message handler factory.
+ /// Message type.
+ /// Wrapped handler factory.
+ public static HandlerManagerFactory>> Acknowledge(
+ this HandlerManagerFactory>> hmf) =>
+ hmf.TouchHandler(next => async (batch, ct) =>
+ {
+ await next(batch, ct).ConfigureAwait(false);
+ foreach (var context in batch)
+ context.StoreOffset();
+ });
+
+ private static Func, Task> AsyncDeserializer(IAsyncDeserializer deserializer) =>
+ context => deserializer.DeserializeAsync(context.Payload, false, new SerializationContext(
+ MessageComponentType.Value, context.Topic, context.Message.Headers));
+
+ private static Func, T> Deserializer(IDeserializer deserializer) =>
+ context => deserializer.Deserialize(context.Payload, false, new SerializationContext(
+ MessageComponentType.Value, context.Topic, context.Message.Headers));
+
+ #region Deserialize()
+
+ public static MessageHmf Deserialize(this HandlerManagerFactory> hmf,
+ Func, T> deserialize) =>
+ hmf.MapHandler, ConsumeContext>(next => async (context, ct) =>
+ await next(context.Transform(deserialize), ct).ConfigureAwait(false));
+
+ public static MessageHmf Deserialize(this HandlerManagerFactory> hmf,
+ Func, Task> deserialize) =>
+ hmf.MapHandler, ConsumeContext>(next => async (context, ct) =>
+ await next(await context.Transform(deserialize).ConfigureAwait(false), ct).ConfigureAwait(false));
+
+ public static MessageHmf Deserialize(this HandlerManagerFactory> hmf,
+ IAsyncDeserializer deserializer) => hmf.Deserialize(AsyncDeserializer(deserializer));
+
+ public static MessageHmf Deserialize(this HandlerManagerFactory> hmf,
+ IDeserializer deserializer) => hmf.Deserialize(Deserializer(deserializer));
+
+
+
+ public static MessagesHmf Deserialize(
+ this HandlerManagerFactory>> hmf,
+ Func, T> deserialize) =>
+ hmf.MapHandler>, IReadOnlyCollection>>(next =>
+ async (batch, ct) =>
+ {
+ var modBatch = batch.Select(context => context.Transform(deserialize)).ToArray();
+ await next(modBatch, ct).ConfigureAwait(false);
+ });
+
+ public static MessagesHmf Deserialize(
+ this HandlerManagerFactory>> hmf,
+ Func, Task> deserialize) =>
+ hmf.MapHandler>, IReadOnlyCollection>>(next =>
+ async (batch, ct) =>
+ {
+ var modifications = batch.Select(context => context.Transform(deserialize));
+ // Task.WhenAll() preserves the order
+ var modBatch = await Task.WhenAll(modifications).ConfigureAwait(false);
+ await next(modBatch, ct).ConfigureAwait(false);
+ });
+
+ public static MessagesHmf Deserialize(
+ this HandlerManagerFactory>> hmf,
+ IAsyncDeserializer deserializer) => hmf.Deserialize(AsyncDeserializer(deserializer));
+
+ public static MessagesHmf Deserialize(
+ this HandlerManagerFactory>> hmf,
+ IDeserializer deserializer) => hmf.Deserialize(Deserializer(deserializer));
+
+ #endregion
+
+ // public static HandlerFactory> DeserializeJson(
+ // this HandlerFactory> hf, JsonSerializerOptions? options = null) =>
+ // hf.Deserialize(context => JsonSerializer.Deserialize(context.Payload, options)!);
+}
diff --git a/src/LocalPost.KafkaConsumer/LocalPost.KafkaConsumer.csproj b/src/LocalPost.KafkaConsumer/LocalPost.KafkaConsumer.csproj
new file mode 100644
index 0000000..9e4add7
--- /dev/null
+++ b/src/LocalPost.KafkaConsumer/LocalPost.KafkaConsumer.csproj
@@ -0,0 +1,34 @@
+
+
+
+ net6;net8
+
+ LocalPost.KafkaConsumer
+
+ Opinionated Kafka consumer library, build to be simple, but yet flexible.
+ background;task;queue;kafka
+
+ README.md
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/LocalPost.KafkaConsumer/Options.cs b/src/LocalPost.KafkaConsumer/Options.cs
new file mode 100644
index 0000000..368d500
--- /dev/null
+++ b/src/LocalPost.KafkaConsumer/Options.cs
@@ -0,0 +1,25 @@
+using System.ComponentModel.DataAnnotations;
+using Confluent.Kafka;
+
+namespace LocalPost.KafkaConsumer;
+
+[UsedImplicitly]
+public sealed record ConsumerOptions
+{
+ public ConsumerConfig ClientConfig { get; set; } = new();
+ // {
+ // EnableAutoOffsetStore = false // Store offsets manually, see Acknowledge middleware
+ // };
+
+ [MinLength(1)]
+ public ISet Topics { get; set; } = new HashSet();
+
+ [Range(1, ushort.MaxValue)]
+ public ushort Consumers { get; set; } = 1;
+
+ internal void EnrichFrom(Config config)
+ {
+ foreach (var kv in config)
+ ClientConfig.Set(kv.Key, kv.Value);
+ }
+}
diff --git a/src/LocalPost.KafkaConsumer/README.md b/src/LocalPost.KafkaConsumer/README.md
new file mode 100644
index 0000000..84e199c
--- /dev/null
+++ b/src/LocalPost.KafkaConsumer/README.md
@@ -0,0 +1,25 @@
+# LocalPost Kafka Consumer
+
+## librdkafka's background prefetching
+
+The Kafka client automatically prefetches messages in the background. This is done by the background thread that is
+started when the client is created. The background thread will fetch messages from the broker and enqueue them on the
+internal queue, so `Consume()` calls will return faster.
+
+Because of this behavior, there is no need to maintain our own in memory queue (channel).
+
+## Concurrent processing
+
+A Kafka consumer is designed to handle messages _from one partition_ sequentially, as it commits the offset of the last
+processed message.
+
+One of the common ways to speed up things (increase throughput) is to have multiple partitions for a topic and multiple
+parallel consumers.
+
+Another way is to batch process messages.
+
+## Message key ignorance
+
+Kafka's message key is used for almost one and only one purpose: to determine the partition for the message, when
+publishing. And in almost all the cases this information is also available (serialized) in the message itself
+(message value in Kafka terms). That's why we are ignoring the message key in this consumer.
diff --git a/src/LocalPost.KafkaConsumer/Tracing.cs b/src/LocalPost.KafkaConsumer/Tracing.cs
new file mode 100644
index 0000000..e67f1a3
--- /dev/null
+++ b/src/LocalPost.KafkaConsumer/Tracing.cs
@@ -0,0 +1,137 @@
+using System.Diagnostics;
+using System.Reflection;
+using System.Text;
+using Confluent.Kafka;
+
+namespace LocalPost.KafkaConsumer;
+
+internal static class MessageUtils
+{
+ public static void ExtractTraceFieldFromHeaders(object? carrier, string fieldName,
+ out string? fieldValue, out IEnumerable? fieldValues)
+ {
+ fieldValues = null;
+ fieldValue = null;
+ if (carrier is not IEnumerable message)
+ return;
+
+ var headerValue = message.FirstOrDefault(header => header.Key == fieldName)?.GetValueBytes();
+ if (headerValue is not null)
+ fieldValue = Encoding.UTF8.GetString(headerValue);
+ }
+}
+
+// See https://opentelemetry.io/docs/specs/semconv/messaging/kafka/
+internal static class KafkaActivityExtensions
+{
+ public static void AcceptDistributedTracingFrom(this Activity activity, Message message)
+ {
+ var propagator = DistributedContextPropagator.Current;
+ propagator.ExtractTraceIdAndState(message.Headers, MessageUtils.ExtractTraceFieldFromHeaders,
+ out var traceParent, out var traceState);
+
+ if (string.IsNullOrEmpty(traceParent))
+ return;
+ activity.SetParentId(traceParent!);
+ if (!string.IsNullOrEmpty(traceState))
+ activity.TraceStateString = traceState;
+
+ var baggage = propagator.ExtractBaggage(message.Headers, MessageUtils.ExtractTraceFieldFromHeaders);
+ if (baggage is null)
+ return;
+ foreach (var baggageItem in baggage)
+ activity.AddBaggage(baggageItem.Key, baggageItem.Value);
+ }
+
+ public static Activity? SetDefaultTags(this Activity? activity, Client client)
+ {
+ // See https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/#messaging-attributes
+ activity?.SetTag("messaging.system", "kafka");
+
+ // activity?.SetTag("messaging.kafka.consumer.group", context.ClientConfig.GroupId);
+ activity?.SetTag("messaging.consumer.group.name", client.Config.GroupId);
+
+ // activity?.SetTag("messaging.client.id", "service_name");
+ // activity?.SetTag("server.address", context.ClientConfig.BootstrapServers);
+ // activity?.SetTag("server.port", context.ClientConfig.BootstrapServers);
+
+ return activity;
+ }
+
+ public static Activity? SetTagsFor(this Activity? activity, ConsumeContext context)
+ {
+ // See https://github.com/open-telemetry/opentelemetry-specification/issues/2971#issuecomment-1324621326
+ // activity?.SetTag("messaging.message.id", context.MessageId);
+ activity?.SetTag("messaging.destination.name", context.Topic);
+ activity?.SetTag("messaging.destination.partition.id", context.ConsumeResult.Partition.Value);
+ activity?.SetTag("messaging.kafka.message.offset", (long)context.ConsumeResult.Offset);
+
+ activity?.SetTag("messaging.message.body.size", context.Message.Value.Length);
+
+ // Skip, as we always ignore the key on consumption
+ // activity.SetTag("messaging.kafka.message.key", context.Message.Key);
+
+ // TODO error.type
+
+ return activity;
+ }
+
+ public static Activity? SetTagsFor(this Activity? activity, IReadOnlyCollection> batch)
+ {
+ activity?.SetTag("messaging.batch.message_count", batch.Count);
+ if (batch.Count > 0)
+ activity?.SetTag("messaging.destination.name", batch.First().Topic);
+
+ return activity;
+ }
+
+ // public static Activity? SetTagsFor(this Activity? activity, IReadOnlyCollection> batch) =>
+ // activity?.SetTag("messaging.batch.message_count", batch.Count);
+}
+
+// Based on Semantic Conventions 1.30.0, see
+// https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/
+// Also Npgsql as an inspiration:
+// - https://github.com/npgsql/npgsql/blob/main/src/Npgsql/NpgsqlActivitySource.cs
+// - https://github.com/npgsql/npgsql/blob/main/src/Npgsql/NpgsqlCommand.cs
+internal static class Tracing
+{
+ private static readonly ActivitySource Source;
+
+ public static bool IsEnabled => Source.HasListeners();
+
+ static Tracing()
+ {
+ var assembly = Assembly.GetExecutingAssembly();
+ var version = assembly.GetName().Version?.ToString() ?? "0.0.0";
+ Source = new ActivitySource("LocalPost.KafkaConsumer", version);
+ }
+
+ public static Activity? StartProcessing(IReadOnlyCollection> batch)
+ {
+ Debug.Assert(batch.Count > 0);
+ var activity = Source.StartActivity($"process {batch.First().Topic}", ActivityKind.Consumer);
+ if (activity is not { IsAllDataRequested: true })
+ return activity;
+
+ activity.SetTag("messaging.operation.type", "process");
+ activity.SetDefaultTags(batch.First().Client);
+ activity.SetTagsFor(batch);
+
+ return activity;
+ }
+
+ public static Activity? StartProcessing(ConsumeContext context)
+ {
+ var activity = Source.StartActivity($"process {context.Topic}", ActivityKind.Consumer);
+ if (activity is not { IsAllDataRequested: true })
+ return activity;
+
+ activity.SetTag("messaging.operation.type", "process");
+ activity.SetDefaultTags(context.Client);
+ activity.SetTagsFor(context);
+ activity.AcceptDistributedTracingFrom(context.Message);
+
+ return activity;
+ }
+}
diff --git a/src/LocalPost.KafkaConsumer/globalusings.cs b/src/LocalPost.KafkaConsumer/globalusings.cs
new file mode 100644
index 0000000..2f865c2
--- /dev/null
+++ b/src/LocalPost.KafkaConsumer/globalusings.cs
@@ -0,0 +1,3 @@
+global using JetBrains.Annotations;
+global using System.Diagnostics.CodeAnalysis;
+global using Microsoft.Extensions.Logging;
diff --git a/src/LocalPost.SnsPublisher/DependencyInjection/ServiceCollectionExtensions.cs b/src/LocalPost.SnsPublisher/DependencyInjection/ServiceCollectionExtensions.cs
deleted file mode 100644
index f44a234..0000000
--- a/src/LocalPost.SnsPublisher/DependencyInjection/ServiceCollectionExtensions.cs
+++ /dev/null
@@ -1,27 +0,0 @@
-using Amazon.SimpleNotificationService.Model;
-using LocalPost.DependencyInjection;
-using Microsoft.Extensions.DependencyInjection;
-using Microsoft.Extensions.DependencyInjection.Extensions;
-using Microsoft.Extensions.Options;
-
-namespace LocalPost.SnsPublisher.DependencyInjection;
-
-public static class ServiceCollectionExtensions
-{
- public static OptionsBuilder AddAmazonSnsBatchPublisher(this IServiceCollection services)
- {
- services.TryAddSingleton();
-
- return services
- .AddAmazonSnsBatchPublisher(provider => provider.GetRequiredService().Send);
- }
-
- public static OptionsBuilder AddAmazonSnsBatchPublisher(this IServiceCollection services,
- Func> handlerFactory)
- {
- services.TryAddSingleton();
- services.TryAddSingleton(provider => provider.GetRequiredService());
-
- return services.AddCustomBackgroundQueue(handlerFactory);
- }
-}
diff --git a/src/LocalPost.SnsPublisher/PublishBatchRequestEntryExtensions.cs b/src/LocalPost.SnsPublisher/PublishBatchRequestEntryExtensions.cs
deleted file mode 100644
index 4abb6b6..0000000
--- a/src/LocalPost.SnsPublisher/PublishBatchRequestEntryExtensions.cs
+++ /dev/null
@@ -1,10 +0,0 @@
-using System.Text;
-using Amazon.SimpleNotificationService.Model;
-
-namespace LocalPost.SnsPublisher;
-
-internal static class PublishBatchRequestEntryExtensions
-{
- // Include attributes in the calculation later?..
- public static int CalculateSize(this PublishBatchRequestEntry entry) => Encoding.UTF8.GetByteCount(entry.Message);
-}
diff --git a/src/LocalPost.SnsPublisher/Publisher.cs b/src/LocalPost.SnsPublisher/Publisher.cs
deleted file mode 100644
index 6115d8d..0000000
--- a/src/LocalPost.SnsPublisher/Publisher.cs
+++ /dev/null
@@ -1,59 +0,0 @@
-using System.Threading.Channels;
-using Amazon.SimpleNotificationService.Model;
-
-namespace LocalPost.SnsPublisher;
-
-public interface ISnsPublisher
-{
- IBackgroundQueue ForTopic(string arn);
-}
-
-internal sealed class Publisher : ISnsPublisher, IAsyncEnumerable, IDisposable
-{
- private sealed class TopicPublishingQueue : IBackgroundQueue
- {
- private readonly Channel _batchEntries;
-
- public TopicPublishingQueue(string arn)
- {
- _batchEntries = Channel.CreateUnbounded(new UnboundedChannelOptions
- {
- SingleReader = true,
- SingleWriter = false
- });
- Results = _batchEntries.Reader.ReadAllAsync().Batch(() => new SnsBatchBuilder(arn));
- }
-
- public IAsyncEnumerable Results { get; }
-
- public ValueTask Enqueue(PublishBatchRequestEntry item, CancellationToken ct = default)
- {
- if (item.CalculateSize() > PublisherOptions.RequestMaxSize)
- throw new ArgumentOutOfRangeException(nameof(item), "Message is too big");
-
- return _batchEntries.Writer.WriteAsync(item, ct);
- }
- }
-
- private readonly Dictionary _channels = new();
- private readonly AsyncEnumerableMerger _combinedReader = new(true);
-
- private TopicPublishingQueue Create(string arn)
- {
- var q = _channels[arn] = new TopicPublishingQueue(arn);
- _combinedReader.Add(q.Results);
-
- return q;
- }
-
- public IBackgroundQueue ForTopic(string arn) =>
- _channels.TryGetValue(arn, out var queue) ? queue : Create(arn);
-
- public void Dispose()
- {
- _combinedReader.Dispose();
- }
-
- public IAsyncEnumerator GetAsyncEnumerator(CancellationToken ct = default) =>
- _combinedReader.GetAsyncEnumerator(ct);
-}
diff --git a/src/LocalPost.SnsPublisher/PublisherOptions.cs b/src/LocalPost.SnsPublisher/PublisherOptions.cs
deleted file mode 100644
index dd16e71..0000000
--- a/src/LocalPost.SnsPublisher/PublisherOptions.cs
+++ /dev/null
@@ -1,9 +0,0 @@
-namespace LocalPost.SnsPublisher;
-
-public sealed record PublisherOptions
-{
- // Same for Publish and PublishBatch
- public const int RequestMaxSize = 262_144;
-
- public const int BatchMaxSize = 10;
-}
diff --git a/src/LocalPost.SnsPublisher/Sender.cs b/src/LocalPost.SnsPublisher/Sender.cs
deleted file mode 100644
index da477cc..0000000
--- a/src/LocalPost.SnsPublisher/Sender.cs
+++ /dev/null
@@ -1,30 +0,0 @@
-using Amazon.SimpleNotificationService;
-using Amazon.SimpleNotificationService.Model;
-using Microsoft.Extensions.Logging;
-
-namespace LocalPost.SnsPublisher;
-
-///
-/// Default implementation
-///
-internal sealed class Sender
-{
- private readonly ILogger _logger;
- private readonly IAmazonSimpleNotificationService _sns;
-
- public Sender(ILogger logger, IAmazonSimpleNotificationService sns)
- {
- _logger = logger;
- _sns = sns;
- }
-
- public async Task Send(PublishBatchRequest payload, CancellationToken ct)
- {
- _logger.LogTrace("Sending a batch of {Amount} publish request(s) to SNS...", payload.PublishBatchRequestEntries.Count);
- var batchResponse = await _sns.PublishBatchAsync(payload, ct);
-
- if (batchResponse.Failed.Any())
- _logger.LogError("Batch entries failed: {FailedAmount} from {Amount}",
- batchResponse.Failed.Count, payload.PublishBatchRequestEntries.Count);
- }
-}
diff --git a/src/LocalPost.SnsPublisher/SnsBatchBuilder.cs b/src/LocalPost.SnsPublisher/SnsBatchBuilder.cs
deleted file mode 100644
index fa3b2b5..0000000
--- a/src/LocalPost.SnsPublisher/SnsBatchBuilder.cs
+++ /dev/null
@@ -1,56 +0,0 @@
-using Amazon.SimpleNotificationService.Model;
-using Nito.AsyncEx;
-
-namespace LocalPost.SnsPublisher;
-
-internal sealed class SnsBatchBuilder : IBatchBuilder
-{
- private readonly CancellationTokenSource _timeWindow = new(TimeSpan.FromSeconds(1)); // TODO Configurable
- private readonly CancellationTokenTaskSource _timeWindowTrigger;
- private PublishBatchRequest? _batchRequest;
-
- public SnsBatchBuilder(string topicArn)
- {
- _batchRequest = new PublishBatchRequest
- {
- TopicArn = topicArn
- };
-
- _timeWindowTrigger = new CancellationTokenTaskSource(_timeWindow.Token);
- }
-
- private PublishBatchRequest BatchRequest => _batchRequest ?? throw new ObjectDisposedException(nameof(SnsBatchBuilder));
-
- public CancellationToken TimeWindow => _timeWindow.Token;
- public Task TimeWindowTrigger => _timeWindowTrigger.Task;
- public bool IsEmpty => BatchRequest.PublishBatchRequestEntries.Count == 0;
-
- private bool CanFit(PublishBatchRequestEntry entry) =>
- PublisherOptions.BatchMaxSize > BatchRequest.PublishBatchRequestEntries.Count
- &&
- PublisherOptions.RequestMaxSize > BatchRequest.PublishBatchRequestEntries.Append(entry)
- .Aggregate(0, (total, e) => total + e.CalculateSize());
-
- public bool TryAdd(PublishBatchRequestEntry entry)
- {
- var canFit = CanFit(entry);
- if (!canFit)
- return false;
-
- if (string.IsNullOrEmpty(entry.Id))
- entry.Id = Guid.NewGuid().ToString();
-
- BatchRequest.PublishBatchRequestEntries.Add(entry);
-
- return true;
- }
-
- public PublishBatchRequest Build() => BatchRequest;
-
- public void Dispose()
- {
- _timeWindow.Dispose();
- _timeWindowTrigger.Dispose();
- _batchRequest = null; // Just make it unusable
- }
-}
diff --git a/src/LocalPost.SqsConsumer/ConsumeContext.cs b/src/LocalPost.SqsConsumer/ConsumeContext.cs
new file mode 100644
index 0000000..5dc8e58
--- /dev/null
+++ b/src/LocalPost.SqsConsumer/ConsumeContext.cs
@@ -0,0 +1,56 @@
+using Amazon.SQS.Model;
+
+namespace LocalPost.SqsConsumer;
+
+[PublicAPI]
+public readonly record struct ConsumeContext
+{
+ internal readonly QueueClient Client;
+ internal readonly Message Message;
+ public readonly T Payload;
+
+ public DateTimeOffset ReceivedAt { get; init; } = DateTimeOffset.Now;
+
+ internal ConsumeContext(QueueClient client, Message message, T payload)
+ {
+ Client = client;
+ Payload = payload;
+ Message = message;
+ }
+
+ public void Deconstruct(out T payload, out Message message)
+ {
+ payload = Payload;
+ message = Message;
+ }
+
+ public string MessageId => Message.MessageId;
+
+ public string ReceiptHandle => Message.ReceiptHandle;
+
+ public IReadOnlyDictionary Attributes => Message.Attributes;
+
+ public IReadOnlyDictionary MessageAttributes => Message.MessageAttributes;
+
+ public bool IsStale => false; // TODO Check the visibility timeout
+
+ public ConsumeContext Transform(TOut payload) =>
+ new(Client, Message, payload)
+ {
+ ReceivedAt = ReceivedAt
+ };
+
+ public ConsumeContext Transform(Func, TOut> transform) => Transform(transform(this));
+
+ public async Task> Transform(Func, Task> transform) =>
+ Transform(await transform(this).ConfigureAwait(false));
+
+ public static implicit operator T(ConsumeContext context) => context.Payload;
+
+ public Task DeleteMessage(CancellationToken ct = default) => Client.DeleteMessage(this, ct);
+
+ public Task Acknowledge(CancellationToken ct = default) => DeleteMessage(ct);
+
+ // public Task ChangeMessageVisibility(TimeSpan visibilityTimeout, CancellationToken ct = default) =>
+ // Client.ChangeMessageVisibility(this, visibilityTimeout, ct);
+}
diff --git a/src/LocalPost.SqsConsumer/Consumer.cs b/src/LocalPost.SqsConsumer/Consumer.cs
new file mode 100644
index 0000000..7ee6752
--- /dev/null
+++ b/src/LocalPost.SqsConsumer/Consumer.cs
@@ -0,0 +1,127 @@
+using Amazon.Runtime;
+using Amazon.SQS;
+using LocalPost.DependencyInjection;
+using LocalPost.Flow;
+using Microsoft.Extensions.Diagnostics.HealthChecks;
+using Microsoft.Extensions.Hosting;
+
+namespace LocalPost.SqsConsumer;
+
+internal sealed class Consumer(string name, ILogger logger, IAmazonSQS sqs,
+ ConsumerOptions settings, IHandlerManager> hm)
+ : IHostedService, IHealthAwareService, IDisposable
+{
+ private CancellationTokenSource? _execTokenSource;
+ private Task? _exec;
+ private Exception? _execException;
+ private string? _execExceptionDescription;
+
+ private CancellationToken _completionToken = CancellationToken.None;
+
+ private HealthCheckResult Ready => (_execTokenSource, _execution: _exec, _execException) switch
+ {
+ (null, _, _) => HealthCheckResult.Unhealthy("Not started"),
+ (_, { IsCompleted: true }, _) => HealthCheckResult.Unhealthy("Stopped"),
+ (not null, null, _) => HealthCheckResult.Degraded("Starting"),
+ (not null, not null, null) => HealthCheckResult.Healthy("Running"),
+ (_, _, not null) => HealthCheckResult.Unhealthy(_execExceptionDescription, _execException),
+ };
+
+ public IHealthCheck ReadinessCheck => HealthChecks.From(() => Ready);
+
+ private async Task RunConsumerAsync(
+ QueueClient client, Handler> handler, CancellationToken execToken)
+ {
+ // (Optionally) wait for app start
+
+ try
+ {
+ while (!execToken.IsCancellationRequested)
+ {
+ var messages = await client.PullMessages(execToken).ConfigureAwait(false);
+ await Task.WhenAll(messages
+ .Select(message => new ConsumeContext(client, message, message.Body))
+ .Select(context => handler(context, CancellationToken.None).AsTask()))
+ .ConfigureAwait(false);
+ }
+ }
+ catch (OperationCanceledException e) when (e.CancellationToken == execToken)
+ {
+ // logger.LogInformation("SQS consumer shutdown");
+ }
+ catch (AmazonServiceException e)
+ {
+ logger.LogCritical(e, "SQS consumer error: {ErrorCode} (see {HelpLink})", e.ErrorCode, e.HelpLink);
+ (_execException, _execExceptionDescription) = (e, "SQS consumer failed");
+ }
+ catch (Exception e)
+ {
+ logger.LogCritical(e, "SQS message handler error");
+ (_execException, _execExceptionDescription) = (e, "Message handler failed");
+ }
+ finally
+ {
+ CancelExecution(); // Stop other consumers too
+ }
+ }
+
+ public async Task StartAsync(CancellationToken ct)
+ {
+ if (_execTokenSource is not null)
+ throw new InvalidOperationException("Already started");
+
+ var execTokenSource = _execTokenSource = new CancellationTokenSource();
+
+ var client = new QueueClient(logger, sqs, settings);
+ await client.Connect(ct).ConfigureAwait(false);
+
+ var handler = await hm.Start(ct).ConfigureAwait(false);
+
+ _exec = ObserveExecution();
+ return;
+
+ async Task ObserveExecution()
+ {
+ try
+ {
+ var execution = settings.Consumers switch
+ {
+ 1 => RunConsumerAsync(client, handler, execTokenSource.Token),
+ _ => Task.WhenAll(Enumerable
+ .Range(0, settings.Consumers)
+ .Select(_ => RunConsumerAsync(client, handler, execTokenSource.Token)))
+ };
+ await execution.ConfigureAwait(false);
+
+ await hm.Stop(_execException, _completionToken).ConfigureAwait(false);
+ }
+ finally
+ {
+ // Can happen before the service shutdown, in case of an error
+ logger.LogInformation("SQS consumer stopped");
+ }
+ }
+ }
+
+ // await _execTokenSource.CancelAsync(); // .NET 8+
+ private void CancelExecution() => _execTokenSource?.Cancel();
+
+ public async Task StopAsync(CancellationToken forceShutdownToken)
+ {
+ if (_execTokenSource is null)
+ throw new InvalidOperationException("Has not been started");
+
+ logger.LogInformation("Shutting down SQS consumer...");
+
+ _completionToken = forceShutdownToken;
+ CancelExecution();
+ if (_exec is not null)
+ await _exec.ConfigureAwait(false);
+ }
+
+ public void Dispose()
+ {
+ _execTokenSource?.Dispose();
+ _exec?.Dispose();
+ }
+}
diff --git a/src/LocalPost.SqsConsumer/DependencyInjection/HealthChecksBuilderEx.cs b/src/LocalPost.SqsConsumer/DependencyInjection/HealthChecksBuilderEx.cs
new file mode 100644
index 0000000..591cd0e
--- /dev/null
+++ b/src/LocalPost.SqsConsumer/DependencyInjection/HealthChecksBuilderEx.cs
@@ -0,0 +1,22 @@
+using LocalPost.DependencyInjection;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Diagnostics.HealthChecks;
+
+namespace LocalPost.SqsConsumer.DependencyInjection;
+
+[PublicAPI]
+public static class HealthChecksBuilderEx
+{
+ public static IHealthChecksBuilder AddSqsConsumer(this IHealthChecksBuilder builder,
+ string name, HealthStatus? failureStatus = null, IEnumerable? tags = null) =>
+ builder.Add(HealthChecks.Readiness(name, failureStatus, tags));
+
+ public static IHealthChecksBuilder AddSqsConsumers(this IHealthChecksBuilder builder,
+ HealthStatus? failureStatus = null, IEnumerable? tags = null)
+ {
+ foreach (var name in builder.Services.GetKeysFor().OfType())
+ AddSqsConsumer(builder, name, failureStatus, tags);
+
+ return builder;
+ }
+}
diff --git a/src/LocalPost.SqsConsumer/DependencyInjection/ServiceCollectionEx.cs b/src/LocalPost.SqsConsumer/DependencyInjection/ServiceCollectionEx.cs
new file mode 100644
index 0000000..8fa53ec
--- /dev/null
+++ b/src/LocalPost.SqsConsumer/DependencyInjection/ServiceCollectionEx.cs
@@ -0,0 +1,14 @@
+using Microsoft.Extensions.DependencyInjection;
+
+namespace LocalPost.SqsConsumer.DependencyInjection;
+
+[PublicAPI]
+public static class ServiceCollectionEx
+{
+ public static IServiceCollection AddSqsConsumers(this IServiceCollection services, Action configure)
+ {
+ configure(new SqsBuilder(services));
+
+ return services;
+ }
+}
diff --git a/src/LocalPost.SqsConsumer/DependencyInjection/ServiceCollectionExtensions.cs b/src/LocalPost.SqsConsumer/DependencyInjection/ServiceCollectionExtensions.cs
deleted file mode 100644
index 71c0961..0000000
--- a/src/LocalPost.SqsConsumer/DependencyInjection/ServiceCollectionExtensions.cs
+++ /dev/null
@@ -1,83 +0,0 @@
-using Amazon.SQS.Model;
-using LocalPost;
-using LocalPost.DependencyInjection;
-using Microsoft.Extensions.DependencyInjection;
-using Microsoft.Extensions.DependencyInjection.Extensions;
-using Microsoft.Extensions.Options;
-
-namespace LocalPost.SqsConsumer.DependencyInjection;
-
-public static class ServiceCollectionExtensions
-{
- public static OptionsBuilder AddAmazonSqsMinimalConsumer(this IServiceCollection services,
- string name, MessageHandler handler) =>
- services.AddAmazonSqsConsumer(name, _ => handler);
-
- public static OptionsBuilder AddAmazonSqsMinimalConsumer(this IServiceCollection services,
- string name, Func handler) where TDep1 : notnull =>
- services.AddAmazonSqsConsumer(name, provider => (context, ct) =>
- {
- var dep1 = provider.GetRequiredService();
-
- return handler(dep1, context, ct);
- });
-
- public static OptionsBuilder AddAmazonSqsMinimalConsumer(this IServiceCollection services,
- string name, Func handler)
- where TDep1 : notnull
- where TDep2 : notnull =>
- services.AddAmazonSqsConsumer(name, provider => (context, ct) =>
- {
- var dep1 = provider.GetRequiredService();
- var dep2 = provider.GetRequiredService();
-
- return handler(dep1, dep2, context, ct);
- });
-
- public static OptionsBuilder AddAmazonSqsMinimalConsumer(this IServiceCollection services,
- string name, Func handler)
- where TDep1 : notnull
- where TDep2 : notnull
- where TDep3 : notnull =>
- services.AddAmazonSqsConsumer(name, provider => (context, ct) =>
- {
- var dep1 = provider.GetRequiredService();
- var dep2 = provider.GetRequiredService();
- var dep3 = provider.GetRequiredService();
-
- return handler(dep1, dep2, dep3, context, ct);
- });
-
- public static OptionsBuilder AddAmazonSqsConsumer(this IServiceCollection services,
- string name) where THandler : IMessageHandler =>
- services
- .AddAmazonSqsConsumer(name, provider => provider.GetRequiredService().Process);
-
- public static OptionsBuilder AddAmazonSqsConsumer(this IServiceCollection services,
- string name, Func> handlerFactory)
- {
- services.TryAddSingleton, SqsConsumerOptionsResolver>();
-
- services.TryAddSingleton();
- services.AddSingleton(provider => ActivatorUtilities.CreateInstance(provider, name));
-
- services
- .AddCustomBackgroundQueue($"SQS/{name}",
- provider => provider.GetSqs(name),
- provider => provider.GetSqs(name).Handler(handlerFactory(provider)))
- .Configure>(
- (options, sqsOptions) => { options.MaxConcurrency = sqsOptions.Get(name).MaxConcurrency; });
-
- services.TryAddSingleton();
- services
- .AddCustomBackgroundQueue($"SQS/{name}/ProcessedMessages",
- provider => provider.GetSqs(name).ProcessedMessages,
- provider => provider.GetRequiredService().Process)
- .Configure>(
- (options, sqsOptions) => { options.MaxConcurrency = sqsOptions.Get(name).MaxConcurrency; });
-
- // TODO Health check, metrics
-
- return services.AddOptions(name).Configure(options => options.QueueName = name);
- }
-}
diff --git a/src/LocalPost.SqsConsumer/DependencyInjection/SqsBuilder.cs b/src/LocalPost.SqsConsumer/DependencyInjection/SqsBuilder.cs
new file mode 100644
index 0000000..6668476
--- /dev/null
+++ b/src/LocalPost.SqsConsumer/DependencyInjection/SqsBuilder.cs
@@ -0,0 +1,50 @@
+using Amazon.SQS;
+using LocalPost.DependencyInjection;
+using LocalPost.Flow;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Options;
+
+namespace LocalPost.SqsConsumer.DependencyInjection;
+
+[PublicAPI]
+public sealed class SqsBuilder(IServiceCollection services)
+{
+ public OptionsBuilder Defaults { get; } = services.AddOptions();
+
+ ///
+ /// Add an SQS consumer with a custom message handler.
+ ///
+ /// Message handler factory.
+ /// Consumer options builder.
+ public OptionsBuilder AddConsumer(HandlerManagerFactory> hmf) =>
+ AddConsumer(Options.DefaultName, hmf);
+
+ ///
+ /// Add an SQS consumer with a custom message handler.
+ ///
+ /// Consumer name (should be unique in the application). Also, the default queue name.
+ /// Message handler factory.
+ /// Consumer options builder.
+ public OptionsBuilder AddConsumer(string name, HandlerManagerFactory> hmf)
+ {
+ var added = services.TryAddKeyedSingleton(name, (provider, _) => new Consumer(name,
+ provider.GetLoggerFor(),
+ provider.GetRequiredService(),
+ provider.GetOptions(name),
+ hmf(provider)
+ ));
+
+ if (!added)
+ throw new ArgumentException("Consumer is already registered", nameof(name));
+
+ services.AddHostedService(provider => provider.GetRequiredKeyedService(name));
+
+ return OptionsFor(name).Configure>((co, defaults) =>
+ {
+ co.UpdateFrom(defaults.Value);
+ co.QueueName = name;
+ });
+ }
+
+ public OptionsBuilder OptionsFor(string name) => services.AddOptions(name);
+}
diff --git a/src/LocalPost.SqsConsumer/HandlerStackEx.cs b/src/LocalPost.SqsConsumer/HandlerStackEx.cs
new file mode 100644
index 0000000..2a83086
--- /dev/null
+++ b/src/LocalPost.SqsConsumer/HandlerStackEx.cs
@@ -0,0 +1,104 @@
+using System.Text.Json;
+
+namespace LocalPost.SqsConsumer;
+
+using MessageHmf = HandlerManagerFactory>;
+using MessagesHmf = HandlerManagerFactory>>;
+
+[PublicAPI]
+public static class HandlerStackEx
+{
+ public static HandlerManagerFactory> UseSqsPayload(this HandlerManagerFactory hmf) =>
+ hmf.MapHandler, T>(next => async (context, ct) =>
+ await next(context.Payload, ct).ConfigureAwait(false));
+
+ public static HandlerManagerFactory>> UseSqsPayload(
+ this HandlerManagerFactory