Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
9183384
initial commit
stijnmoreels Jan 30, 2025
4492162
pr-fix: correct role assignments
stijnmoreels Jan 30, 2025
438da16
pr-add: remaining event filtering system + feature docs
stijnmoreels Jan 31, 2025
a63635a
pr-fix: correct xml docs for param
stijnmoreels Jan 31, 2025
a33c519
Update appsettings.json
stijnmoreels Jan 31, 2025
a9e4772
pr-add: event hubs resource code example in feature docs
stijnmoreels Jan 31, 2025
d859100
Merge branch 'feature/add-eventhubs-test-components' of https://githu…
stijnmoreels Jan 31, 2025
4421216
Merge branch 'main' into feature/add-eventhubs-test-components
stijnmoreels Feb 21, 2025
8cf60ef
fix(.sln): re-add service-bus to /Messaging solution folder
stijnmoreels Feb 21, 2025
3b6d78a
Merge branch 'main' into feature/add-eventhubs-test-components
stijnmoreels Apr 14, 2025
f4710db
chore(deps): use package ranges in csproj
stijnmoreels Apr 14, 2025
e719da2
chore(deps): remove net6.0 target framework
stijnmoreels Apr 14, 2025
5d184ff
Merge branch 'main' into feature/add-eventhubs-test-components
stijnmoreels Jun 24, 2025
007c340
Merge branch 'main' into feature/add-eventhubs-test-components
stijnmoreels Jan 2, 2026
ddb569e
chore(roslyn): apply latest roslyn analyser recommendations to project
stijnmoreels Jan 2, 2026
5382c70
docs(az.eh): use good-writing recommendations
stijnmoreels Jan 2, 2026
6c24f63
chore(az.eh): use correct xunit relationship
stijnmoreels Jan 2, 2026
9029c80
chore(az.eh): remove unit test project change
stijnmoreels Jan 2, 2026
1cfd58e
Update src/Arcus.Testing.Messaging.EventHubs/EventHubEventFilter.cs
stijnmoreels Jan 7, 2026
8348f5c
Update src/Arcus.Testing.Messaging.EventHubs/EventHubEventFilter.cs
stijnmoreels Jan 7, 2026
c63314a
Update src/Arcus.Testing.Messaging.EventHubs/EventHubEventFilter.cs
stijnmoreels Jan 7, 2026
5db46cd
chore(az.eh): use read-only list
stijnmoreels Jan 7, 2026
fb77eaa
Update docs/preview/03-Features/04-Azure/05-Messaging/02-eventhubs.md
stijnmoreels Jan 22, 2026
9194a1b
Update docs/preview/03-Features/04-Azure/05-Messaging/02-eventhubs.md
stijnmoreels Jan 27, 2026
b86cacb
chore(az.eh): provide cancellation support
stijnmoreels Feb 26, 2026
eb47de7
chore(az.eh): provide cancellation support
stijnmoreels Feb 26, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build/deploy-test-resources.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ stages:
--parameters cosmosDbMongoDbDatabaseName=${{ variables['Arcus.Testing.Cosmos.MongoDb.DatabaseName'] }} `
--parameters cosmosDbnoSqlName=${{ variables['Arcus.Testing.Cosmos.NoSql.Name'] }} `
--parameters cosmosDbNoSqlDatabaseName=${{ variables['Arcus.Testing.Cosmos.NoSql.DatabaseName'] }} `
--parameters eventHubsNamespaceName=${{ variables['Arcus.Testing.EventHubs.Namespace'] }} `
--parameters serviceBusNamespaceName=${{ variables['Arcus.Testing.ServiceBus.Namespace'] }} `
--parameters keyVaultName=${{ parameters.keyVaultName }} `
--parameters servicePrincipalObjectId=$objectId
Expand Down
26 changes: 25 additions & 1 deletion build/templates/test-resources.bicep
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ param cosmosDbNoSqlName string
// Define the name of the Cosmos DB for NoSQL database that will be created.
param cosmosDbNoSqlDatabaseName string

// Define the name of the Service Bus namespace resource that will be created.
// Define the name of the Azure Event Hubs namespace that will be created.
param eventHubsNamespaceName string

// Define the name of the Service bus namespace resource that will be created.
param serviceBusNamespaceName string

// Define the name of the Key Vault where the necessary secrets will be stored to access the deployed test resources.
Expand Down Expand Up @@ -128,6 +131,27 @@ module cosmosDb_noSql 'br/public:avm/res/document-db/database-account:0.6.0' = {
}
}

module eventHubsNamespace 'br/public:avm/res/event-hub/namespace:0.9.1' = {
name: 'eventHubsNamespaceDeployment'
params: {
name: eventHubsNamespaceName
location: location
skuName: 'Basic'
skuCapacity: 1
publicNetworkAccess: 'Enabled'
roleAssignments: [
{
principalId: servicePrincipal_objectId
roleDefinitionIdOrName: 'Azure Event Hubs Data Sender'
}
{
principalId: servicePrincipal_objectId
roleDefinitionIdOrName: 'Azure Event Hubs Data Receiver'
}
]
}
}

module serviceBusNamespace 'br/public:avm/res/service-bus/namespace:0.10.1' = {
name: 'serviceBusNamespaceDeployment'
params: {
Expand Down
3 changes: 2 additions & 1 deletion build/variables/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ variables:
Arcus.Testing.Cosmos.MongoDb.DatabaseName: 'arcus-testing-cosmos-mongo-db'
Arcus.Testing.Cosmos.NoSql.Name: 'arcus-testing-cosmos-nosql'
Arcus.Testing.Cosmos.NoSql.DatabaseName: 'arcus-testing-cosmos-nosql-db'
Arcus.Testing.KeyVault.Name: 'arcus-testing-kv'
Arcus.Testing.EventHubs.Namespace: 'arcus-testing-eventhubs'
Arcus.Testing.ServiceBus.Namespace: 'arcus-testing-servicebus'
Arcus.Testing.KeyVault.Name: 'arcus-testing-kv'
Arcus.Testing.StorageAccount.Name: 'arcustestingstorage'
Arcus.Testing.StorageAccount.Key.SecretName: 'Arcus-Testing-StorageAccount-Key'
82 changes: 82 additions & 0 deletions docs/preview/03-Features/04-Azure/05-Messaging/02-eventhubs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Event Hubs
The `Arcus.Testing.Messaging.EventHubs` package provides test fixtures related to Azure Event Hubs. It adheres to the 'clean environment' testing practice by providing a temporary Event Hub.

## Installation
The following functionality is available when installing this package:

```powershell
PM> Install-Package -Name Arcus.Testing.Messaging.EventHubs
```

## Temporary hub
The `TemporaryEventHub` provides a solution when the integration test requires an Azure Event Hub during the test run. The test fixture creates a hub upon the setup of the fixture and deletes it again when the fixture disposes.

> ✨ When the test fixture was responsible for creating the hub, the fixture will delete the hub when the test disposes the fixture. This follows the 'clean environment' testing principle that describes that a test should revert the environment to its original state after the test has run.

```csharp
using Arcus.Testing;

ResourceIdentifier eventHubsNamespaceResourceId =
EventHubsNamespaceResource.CreateResourceIdentifier("<subscription-id", "<resource-group>", "<namespace-name>");

await using var hub = await TemporaryEventHub.CreateIfNotExistsAsync(
eventHubsNamespaceResourceId, consumerGroup: "$Default", "<event-hub-name>", logger, CancellationToken.None);
```

> ⚡ Uses by default the [`DefaultAzureCredential`](https://learn.microsoft.com/en-us/dotnet/api/azure.identity.defaultazurecredential) but supports other authentication mechanisms with overloads that take in the `EventHubsNamespaceResource` directly:
> ```csharp
> var credential = new DefaultAzureCredential();
> var arm = new ArmClient(credential);
>
> EventHubsNamespaceResource eventHubsNamespace =
> await arm.GetEventHubsNamespaceResource(eventHubNamespaceResourceId)
> .GetAsync();
> ```

### Customization
The `TemporaryEventHub` allows testers to configure setup operations to manipulate the test fixture's behavior.

```csharp
using Arcus.Testing;

await TemporaryEventHub.CreateIfNotExistsAsync(..., options =>
{
// Options related to when the test fixture is set up.
// ---------------------------------------------------

// Change the default hub-creation behavior.
options.OnSetup.CreateHubWith((EventHubData hub) =>
{
hub.PartitionCount = 4;
});
});
```

### Search for events
The `TemporaryEventHub` has an event filtering system that allows testers to search for events during the lifetime of the test fixture. This can be useful to verify the current state of a hub, or as a test assertion to verify EventHubs-related implementations.

```csharp
using Arcus.Testing;

await using TemporaryEventHub hub = ...

IEnumerable<PartitionEvent> events =
await hub.Events

// Get subset events currently on the hub.
.Where(ev => ev.Data.Properties.ContainsKey("<my-key"))
.Where(ev => ev.Data.ContentType == "application/json")

// Get events only from a single partition.
.FromPartition("<partition-id>", EventPosition.Earliest)

// Configures the read options that will be associated with the search operation.
.ReadWith((ReadEventOptions opt) =>
{
opt.MaximumWaitTime = TimeSpan.FromSeconds(10);
opt.OwnerLevel = 10;
})

// Start searching for events.
.ToListAsync(CancellationToken.None);
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net10.0</TargetFramework>
<RootNamespace>Arcus.Testing</RootNamespace>
<Authors>Arcus</Authors>
<Company>Arcus</Company>
<Description>Provides messaging capabilities for Azure Event Hubs during Arcus testing</Description>
<Copyright>Copyright (c) Arcus</Copyright>
<PackageProjectUrl>https://github.com/arcus-azure/arcus.testing</PackageProjectUrl>
<RepositoryUrl>https://github.com/arcus-azure/arcus.testing</RepositoryUrl>
<PackageLicenseFile>LICENSE</PackageLicenseFile>
<PackageIcon>icon.png</PackageIcon>
<PackageReadmeFile>README.md</PackageReadmeFile>
<RepositoryType>Git</RepositoryType>
<PackageTags>Azure;Testing;Messaging;EventHubs</PackageTags>
<PackageId>Arcus.Testing.Messaging.EventHubs</PackageId>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<WarningsNotAsErrors>$(WarningsNotAsErrors);NU1901;NU1902;NU1903;NU1904</WarningsNotAsErrors>
<AnalysisMode>All</AnalysisMode>
</PropertyGroup>

<ItemGroup>
<None Include="..\..\README.md" Pack="true" PackagePath="\" />
<None Include="..\..\LICENSE" Pack="true" PackagePath="\" />
<None Include="..\..\docs\static\img\icon.png" Pack="true" PackagePath="\" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Azure.Identity" Version="[1.*,2.0.0)" />
<PackageReference Include="Azure.Messaging.EventHubs" Version="[5.*,6.0.0)" />
<PackageReference Include="Azure.Messaging.EventHubs.Processor" Version="[5.*,6.0.0)" />
<PackageReference Include="Azure.ResourceManager.EventHubs" Version="[1.*,2.0.0)" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Arcus.Testing.Core\Arcus.Testing.Core.csproj" />
</ItemGroup>

</Project>
143 changes: 143 additions & 0 deletions src/Arcus.Testing.Messaging.EventHubs/EventHubEventFilter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.EventHubs.Consumer;

namespace Arcus.Testing
{
/// <summary>
/// Represents a configurable filter instance that selects a subset of <see cref="PartitionEvent"/>s on an Azure EventHubs hub
/// (a.k.a. 'spy test fixture').
/// </summary>
public class EventHubEventFilter
{
private readonly EventHubConsumerClient _client;
private readonly ReadEventOptions _options = new() { MaximumWaitTime = TimeSpan.FromMinutes(1) };
private readonly Collection<Func<PartitionEvent, bool>> _predicates = [];

private string _partitionId;
private EventPosition _startingPosition;

internal EventHubEventFilter(EventHubConsumerClient client)
{
ArgumentNullException.ThrowIfNull(client);
_client = client;
}

/// <summary>
/// Indicate that only events from a given <paramref name="partitionId"/> should be searched for.
/// </summary>
/// <param name="partitionId">The identifier of the Event Hub partition from which events will be received.</param>
/// <param name="startingPosition">The position within the partition where the consumer should begin reading events.</param>
/// <exception cref="ArgumentException">Thrown when the <paramref name="partitionId"/> is blank.</exception>
public EventHubEventFilter FromPartition(string partitionId, EventPosition startingPosition)
{
ArgumentException.ThrowIfNullOrWhiteSpace(partitionId);
_partitionId = partitionId;
_startingPosition = startingPosition;

return this;
}

/// <summary>
/// Adds a <paramref name="predicate"/> to which the searched for events should match against.
/// </summary>
/// <param name="predicate">The custom filter function to select a subset of events.</param>
/// <exception cref="ArgumentNullException">Thrown when the <paramref name="predicate"/> is <c>null</c>.</exception>>
public EventHubEventFilter Where(Func<PartitionEvent, bool> predicate)
{
ArgumentNullException.ThrowIfNull(predicate);
_predicates.Add(predicate);

return this;
}

/// <summary>
/// <para>Configures the <see cref="ReadEventOptions"/> that will be associated with the event search operation.</para>
/// <para>Use for example the <see cref="ReadEventOptions.MaximumWaitTime"/> to shortcut the event searching early:</para>
/// <example>
/// <code>
/// .ReadWith(options =>
/// {
/// options.MaximumWaitTime = TimeSpan.FromSeconds(10);
/// })
/// </code>
/// </example>
/// <para>Or to change the <see cref="ReadEventOptions.OwnerLevel"/> when multiple event consumers are involved:</para>
/// <example>
/// <code>
/// .ReadWith(options =>
/// {
/// options.OwnerLevel = 10;
/// })
/// </code>
/// </example>
/// </summary>
/// <param name="configureOptions"></param>
/// <returns></returns>
public EventHubEventFilter ReadWith(Action<ReadEventOptions> configureOptions)
{
ArgumentNullException.ThrowIfNull(configureOptions);
configureOptions(_options);

return this;
}

/// <summary>
/// Gets the awaiter used to await the <see cref="ToListAsync(CancellationToken)"/>.
/// </summary>
public TaskAwaiter<IReadOnlyList<PartitionEvent>> GetAwaiter()
{
return ToListAsync(CancellationToken.None).GetAwaiter();
}

/// <summary>
/// Determines whether the configured Azure Event Hub contains any matching events.
/// </summary>
/// <param name="cancellationToken">An optional <see cref="CancellationToken" /> instance to signal the request to cancel the operation.</param>
/// <returns>
/// <see langword="true" /> if any events are found that matches the previously configured predicates; otherwise, <see langword="false" />.
/// </returns>
public async Task<bool> AnyAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

var events = await ToListAsync(cancellationToken).ConfigureAwait(false);
return events.Count > 0;
}

/// <summary>
/// Collects all events currently matching on the configured Azure Event Hub into a <see cref="List{T}"/>.
/// </summary>
/// <param name="cancellationToken">An optional <see cref="CancellationToken" /> instance to signal the request to cancel the operation.</param>
public async Task<IReadOnlyList<PartitionEvent>> ToListAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

IAsyncEnumerable<PartitionEvent> reading =
_partitionId is null
? _client.ReadEventsAsync(_options, cancellationToken)
: _client.ReadEventsFromPartitionAsync(_partitionId, _startingPosition, _options, cancellationToken);

var events = new Collection<PartitionEvent>();
await foreach (PartitionEvent ev in reading.ConfigureAwait(false).WithCancellation(cancellationToken))
{
if (ev.Data is null)
{
return events.ToList();
}

if (_predicates.All(predicate => predicate(ev)))
{
events.Add(ev);
}
}

return events.ToList();
}
}
}
Loading
Loading