From 2bb3c9b9d9513668e75d5e4dfb5c6e0547438df7 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 10 Apr 2026 11:06:01 +0000 Subject: [PATCH 1/5] Add detailed plan for AssignTask support in AgenticAggregate Created plans/assign-task/assign-task-plan.md with 5-phase design: - Phase 1: RequestingParty sealed interface (api module) - Phase 2: AssignTaskCommand and AgentTaskAssignedEvent (agentic module) - Phase 3: Built-in handlers and registration (agentic module) - Phase 4: TaskAwareState interface and AssignedTask record (api module) - Phase 5: Integration tests GitHub Issues: #322-#327 (tracking issue: #327) Agent-Logs-Url: https://github.com/elasticsoftwarefoundation/akces-framework/sessions/2c3e5400-d2cc-4fed-a03e-b7478d7dcc71 Co-authored-by: jwijgerd <914840+jwijgerd@users.noreply.github.com> --- plans/assign-task/assign-task-plan.md | 425 ++++++++++++++++++++++++++ 1 file changed, 425 insertions(+) create mode 100644 plans/assign-task/assign-task-plan.md diff --git a/plans/assign-task/assign-task-plan.md b/plans/assign-task/assign-task-plan.md new file mode 100644 index 00000000..f1f000d7 --- /dev/null +++ b/plans/assign-task/assign-task-plan.md @@ -0,0 +1,425 @@ +# AssignTask Support for AgenticAggregate + +## Overview + +This plan adds built-in task assignment support to the Akces Framework's agentic aggregates. When an `AssignTask` command is sent to an `AgenticAggregate`, the framework handles it with a built-in command handler that creates an Embabel `AgentProcess` and emits an `AgentTaskAssigned` event containing the process ID and requesting party information. + +This feature introduces: +1. A **`RequestingParty`** concept — a sealed interface representing who is requesting the task (an Agent or a Human), each with a `String role`. +2. A **built-in `AssignTaskCommand`** — the first framework-owned command, automatically registered for every `AgenticAggregate`. +3. A **built-in `AgentTaskAssignedEvent`** — emitted when the task is accepted, containing the Embabel `AgentProcess.getId()` value. +4. **Built-in command and event-sourcing handlers** — registered automatically by the framework, following the same pattern as the existing `MemoryStored`/`MemoryRevoked` built-in handlers. + +> **Relationship to other plans**: This plan builds on the [Embabel AgentPlatform Integration Plan](../embabel-integration-plan.md) and is a prerequisite for the [Incremental Agent Task Processing Plan](../agenttasks.md). The `AgentTaskAssigned` event creates the link between a requesting party and an active Embabel `AgentProcess`, which the incremental tick model will later use to advance processes across poll loop iterations. + +--- + +## Phase 1: Core Types — `RequestingParty` (api module) + +### What + +Introduce the `RequestingParty` concept as a sealed interface in the `api` module. This represents the entity that requested the task assignment — either another AI agent or a human user. + +### Module + +`main/api` — package `org.elasticsoftware.akces.aggregate` + +### New Files + +| File | Description | +|------|-------------| +| `RequestingParty.java` | Sealed interface with `String role()` method, Jackson `@JsonTypeInfo` / `@JsonSubTypes` for polymorphic serialization | +| `AgentRequestingParty.java` | Record implementing `RequestingParty` for agent requestors — fields: `String agentId`, `String agentName`, `String role` | +| `HumanRequestingParty.java` | Record implementing `RequestingParty` for human requestors — fields: `String userId`, `String displayName`, `String role` | + +### Design Decisions + +**Sealed interface vs. single record with enum discriminator:** + +A sealed interface with `AgentRequestingParty` and `HumanRequestingParty` permits is preferred because: +- It leverages Java 25 sealed types and pattern matching (`switch` expressions with exhaustiveness checking) +- It allows each variant to carry distinct fields (agents have `agentId`/`agentName`, humans have `userId`/`displayName`) +- It aligns with the framework's preference for immutable records +- Jackson 3.x supports sealed type serialization via `@JsonTypeInfo` + +**Location in `api` module:** + +`RequestingParty` is placed in the `api` module (not `agentic`) because: +- It is a core domain concept that may be referenced by application-level aggregates, query models, and database models +- It follows the pattern of `AgenticAggregateMemory` and `MemoryAwareState`, which are also in `api` +- The `agentic` module depends on `api`, not the other way around + +### Serialization + +```java +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonSubTypes({ + @JsonSubTypes.Type(value = AgentRequestingParty.class, name = "agent"), + @JsonSubTypes.Type(value = HumanRequestingParty.class, name = "human") +}) +public sealed interface RequestingParty permits AgentRequestingParty, HumanRequestingParty { + /** + * The role of this requesting party in the system (e.g., "administrator", "analyst", "orchestrator"). + */ + String role(); +} +``` + +```java +public record AgentRequestingParty( + String agentId, + String agentName, + String role +) implements RequestingParty {} +``` + +```java +public record HumanRequestingParty( + String userId, + String displayName, + String role +) implements RequestingParty {} +``` + +### Dependencies + +None — uses only Jackson annotations already available in the `api` module. + +--- + +## Phase 2: Built-in Command and Event (agentic module) + +### What + +Create the `AssignTaskCommand` and `AgentTaskAssignedEvent` as framework-owned types in the `agentic` module, following the established pattern of `MemoryStoredEvent` and `MemoryRevokedEvent`. + +### Module + +`main/agentic` — new package `org.elasticsoftware.akces.agentic.commands` for the command, existing package `org.elasticsoftware.akces.agentic.events` for the event. + +### New Files + +| File | Package | Description | +|------|---------|-------------| +| `AssignTaskCommand.java` | `o.e.a.agentic.commands` | Built-in command — fields: `agenticAggregateId` (aggregate identifier), `taskDescription` (what the agent should do), `requestingParty` (who requested it), `taskMetadata` (optional Map of additional context) | +| `AgentTaskAssignedEvent.java` | `o.e.a.agentic.events` | Built-in event — fields: `agenticAggregateId`, `agentProcessId` (from `AgentProcess.getId()`), `taskDescription`, `requestingParty`, `taskMetadata`, `assignedAt` (Instant) | + +### AssignTaskCommand + +```java +@CommandInfo(type = "AssignTask", version = 1, description = "Assigns a task to an agentic aggregate for AI-assisted processing") +public record AssignTaskCommand( + @AggregateIdentifier @NotNull String agenticAggregateId, + @NotNull String taskDescription, + @NotNull RequestingParty requestingParty, + Map taskMetadata +) implements Command { + @Override + @Nonnull + public String getAggregateId() { + return agenticAggregateId; + } +} +``` + +### AgentTaskAssignedEvent + +```java +@DomainEventInfo(type = "AgentTaskAssigned", version = 1, description = "Emitted when a task has been assigned to an agentic aggregate and an AgentProcess has been created") +public record AgentTaskAssignedEvent( + @AggregateIdentifier String agenticAggregateId, + @NotNull String agentProcessId, + @NotNull String taskDescription, + @NotNull RequestingParty requestingParty, + Map taskMetadata, + @NotNull Instant assignedAt +) implements DomainEvent { + @Override + @Nonnull + public String getAggregateId() { + return agenticAggregateId; + } +} +``` + +### Design Decisions + +**First built-in command:** + +This is the first framework-owned `Command` implementation. Until now, all commands were application-defined. The `AssignTask` command is a framework concern because it directly interacts with the Embabel runtime to create an `AgentProcess`, which is an infrastructure operation. + +**`taskMetadata` field:** + +A `Map` provides extensibility for passing additional context to the agent process without changing the command schema. Examples: `correlationId`, `priority`, `deadline`, `sourceSystem`. + +**Command module location:** + +A new `commands` package is created in the `agentic` module, mirroring the existing `events` package. This keeps the layout symmetric and predictable. + +### Type Registration Constants + +In `AgenticAggregateRuntime` (or a dedicated constants class), add: + +```java +CommandType ASSIGN_TASK_COMMAND_TYPE = new CommandType<>( + "AssignTask", 1, AssignTaskCommand.class, false, false, false); + +DomainEventType AGENT_TASK_ASSIGNED_TYPE = new DomainEventType<>( + "AgentTaskAssigned", 1, AgentTaskAssignedEvent.class, false, false, false, false); +``` + +--- + +## Phase 3: Built-in Command Handler and Event-Sourcing Handler (agentic module) + +### What + +Implement the built-in handler that processes `AssignTaskCommand` and emits `AgentTaskAssignedEvent`, plus the event-sourcing handler that updates the aggregate state when the event is applied. + +### Module + +`main/agentic` — package `org.elasticsoftware.akces.agentic.runtime` + +### Changes to Existing Files + +#### `KafkaAgenticAggregateRuntime` + +Add two new static methods following the existing `handleMemoryEvent` pattern: + +1. **`handleAssignTask(Command command, AggregateState state)`** — Built-in command handler: + - Casts command to `AssignTaskCommand` + - Creates an `AgentProcess` via `agentPlatform.createAgentProcess(agent, ProcessOptions.DEFAULT, bindings)` with the task description and requesting party in the bindings + - Retrieves the process ID via `agentProcess.getId()` + - Returns a `Stream` containing a single `AgentTaskAssignedEvent` with all relevant fields + + > **Note:** Unlike the memory handlers (which are event-sourcing handlers and are `static`), this is a command handler that needs access to the `AgentPlatform` and `Agent` instances. This means the handler **cannot** be a static method reference. Instead, it will be implemented as a non-static method or as a dedicated adapter class. + +2. **`handleAgentTaskAssignedEvent(DomainEvent event, AggregateState state)`** — Built-in event-sourcing handler: + - Pattern matches on `AgentTaskAssignedEvent` + - Updates the state to track the assigned task (see Phase 4 for state interface) + - Returns the updated state + +#### `AgenticAggregateRuntimeFactory` + +Register the built-in command and event types, following the same pattern as memory events: + +```java +// Register built-in AssignTask command handler +runtimeBuilder + .addCommandHandler(ASSIGN_TASK_COMMAND_TYPE, assignTaskHandler) + .addCommand(ASSIGN_TASK_COMMAND_TYPE); + +// Register built-in AgentTaskAssigned event-sourcing handler +runtimeBuilder + .addEventSourcingHandler(AGENT_TASK_ASSIGNED_TYPE, KafkaAgenticAggregateRuntime::handleTaskEvent) + .addDomainEvent(AGENT_TASK_ASSIGNED_TYPE); +``` + +#### `AkcesAgenticAggregateController` + +Register the `AssignTask` and `AgentTaskAssigned` schemas in the schema registry, following the pattern used for `MemoryStored` and `MemoryRevoked`. + +### Handler Design — Command Handler Adapter + +Because the `AssignTask` handler needs runtime access to `AgentPlatform` and `Agent`, it cannot use the simple `static` method reference pattern of memory handlers. Two options: + +**Option A: Dedicated `AssignTaskCommandHandler` class** (recommended) + +A new `CommandHandlerFunction` implementation in `o.e.a.agentic.runtime`: + +```java +public class AssignTaskCommandHandler + implements CommandHandlerFunction { + + private final AgentPlatform agentPlatform; + private final Agent agent; + private final AgenticAggregate aggregate; + + @Override + public Stream apply(AssignTaskCommand command, S state) { + // 1. Build bindings from command metadata + // 2. Create AgentProcess + // 3. Emit AgentTaskAssignedEvent with process ID + } + + @Override + public boolean isCreate() { + return false; + } +} +``` + +**Option B: Lambda registered inline in the factory** + +Less clean, but avoids a new class. Not recommended due to complexity of captured variables. + +### Dependencies + +- Phase 1 (RequestingParty types for the command and event fields) +- Phase 2 (AssignTaskCommand and AgentTaskAssignedEvent for handler signatures) + +--- + +## Phase 4: State Interface for Task Tracking (api module) + +### What + +Extend the state contracts to support tracking assigned tasks, so the event-sourcing handler for `AgentTaskAssignedEvent` can update the aggregate state. + +### Module + +`main/api` — package `org.elasticsoftware.akces.aggregate` + +### New Files + +| File | Description | +|------|-------------| +| `AssignedTask.java` | Record representing an assigned task: `String agentProcessId`, `String taskDescription`, `RequestingParty requestingParty`, `Map taskMetadata`, `Instant assignedAt` | +| `TaskAwareState.java` | Interface for states that track assigned tasks: `List getAssignedTasks()`, `TaskAwareState withAssignedTask(AssignedTask task)`, `TaskAwareState withoutAssignedTask(String agentProcessId)` | + +### Design Decisions + +**Separate interface vs. extending `MemoryAwareState`:** + +A separate `TaskAwareState` interface is preferred because: +- Not all agentic aggregates need to track both memories and assigned tasks +- It follows the Interface Segregation Principle +- States can implement both `MemoryAwareState` and `TaskAwareState` when needed +- The framework can check `instanceof TaskAwareState` independently + +**Relationship to `MemoryAwareState`:** + +The `TaskAwareState` pattern mirrors `MemoryAwareState`: +- Immutable update methods (`withAssignedTask`, `withoutAssignedTask`) +- The framework's built-in handler checks for the interface and fails with `IllegalStateException` if not implemented + +### Event-Sourcing Handler + +The built-in event-sourcing handler in `KafkaAgenticAggregateRuntime`: + +```java +public static AggregateState onAgentTaskAssigned(AgentTaskAssignedEvent event, AggregateState state) { + if (!(state instanceof TaskAwareState tas)) { + throw new IllegalStateException( + "Aggregate state " + state.getClass().getName() + + " does not implement TaskAwareState"); + } + AssignedTask task = new AssignedTask( + event.agentProcessId(), + event.taskDescription(), + event.requestingParty(), + event.taskMetadata(), + event.assignedAt()); + return (AggregateState) tas.withAssignedTask(task); +} +``` + +--- + +## Phase 5: Tests + +### What + +Add comprehensive tests for all new components across both modules. + +### Module + +`main/api` (for Phase 1 & 4 types) and `main/agentic` (for Phase 2 & 3 handlers) + +### Test Categories + +#### Unit Tests (api module) + +| Test Class | What is Tested | +|------------|----------------| +| `RequestingPartySerializationTest` | Jackson serialization/deserialization of `AgentRequestingParty` and `HumanRequestingParty`, including polymorphic type handling | +| `AssignedTaskTest` | Record equality, construction, immutability | +| `TaskAwareStateTest` | Default behavior of `TaskAwareState` methods with a test implementation | + +#### Unit Tests (agentic module) + +| Test Class | What is Tested | +|------------|----------------| +| `AssignTaskCommandTest` | Command construction, aggregate ID extraction, validation annotations | +| `AgentTaskAssignedEventTest` | Event construction, aggregate ID extraction | +| `AssignTaskCommandHandlerTest` | Handler creates AgentProcess, emits correct event with process ID, handles errors | +| `AgentTaskAssignedEventSourcingTest` | Event-sourcing handler correctly updates TaskAwareState | + +#### Integration Tests (agentic module) + +| Test Class | What is Tested | +|------------|----------------| +| `AssignTaskIntegrationTest` | Full flow: send AssignTaskCommand → handler creates AgentProcess → AgentTaskAssignedEvent emitted → state updated with assigned task | + +### Test Strategy + +- **Mock `AgentPlatform` and `AgentProcess`** for unit tests of the command handler — verify `getId()` is called and the returned ID is used in the event +- **Use existing test infrastructure** (Mockito, AssertJ, TestNG/JUnit 5) — no new test dependencies +- **Follow existing test patterns** from `AgentProcessResultTranslatorTest` and other agentic module tests + +--- + +## Summary of All New and Changed Files + +### New Files + +| Module | Package | File | Phase | +|--------|---------|------|-------| +| api | `o.e.a.aggregate` | `RequestingParty.java` | 1 | +| api | `o.e.a.aggregate` | `AgentRequestingParty.java` | 1 | +| api | `o.e.a.aggregate` | `HumanRequestingParty.java` | 1 | +| api | `o.e.a.aggregate` | `AssignedTask.java` | 4 | +| api | `o.e.a.aggregate` | `TaskAwareState.java` | 4 | +| agentic | `o.e.a.agentic.commands` | `AssignTaskCommand.java` | 2 | +| agentic | `o.e.a.agentic.events` | `AgentTaskAssignedEvent.java` | 2 | +| agentic | `o.e.a.agentic.runtime` | `AssignTaskCommandHandler.java` | 3 | + +### Changed Files + +| Module | File | Phase | Change | +|--------|------|-------|--------| +| agentic | `AgenticAggregateRuntime.java` | 2 | Add `ASSIGN_TASK_COMMAND_TYPE` and `AGENT_TASK_ASSIGNED_TYPE` constants | +| agentic | `KafkaAgenticAggregateRuntime.java` | 3 | Add `onAgentTaskAssigned` and `handleTaskEvent` static methods | +| agentic | `AgenticAggregateRuntimeFactory.java` | 3 | Register built-in command handler and event-sourcing handler | +| agentic | `AkcesAgenticAggregateController.java` | 3 | Register schemas for `AssignTask` and `AgentTaskAssigned` | + +### Test Files + +| Module | Package | File | Phase | +|--------|---------|------|-------| +| api | test | `RequestingPartySerializationTest.java` | 1 | +| api | test | `AssignedTaskTest.java` | 4 | +| api | test | `TaskAwareStateTest.java` | 4 | +| agentic | test | `AssignTaskCommandTest.java` | 2 | +| agentic | test | `AgentTaskAssignedEventTest.java` | 2 | +| agentic | test | `AssignTaskCommandHandlerTest.java` | 3 | +| agentic | test | `AgentTaskAssignedEventSourcingTest.java` | 3 | +| agentic | test | `AssignTaskIntegrationTest.java` | 5 | + +--- + +## Open Questions + +1. **Should `AssignTaskCommand` trigger the actual Embabel `AgentProcess` creation, or should it only record the assignment and defer process creation to a later stage?** The current design creates the process immediately and captures the ID. An alternative is to only emit the event and defer process creation to when the task is actually started (e.g., via a separate `StartTask` command). + +2. **Should there be a `TaskCompleted` / `TaskFailed` event?** The current scope only covers assignment. Task completion tracking would naturally follow and could be planned as a separate feature. + +3. **Should `taskMetadata` be `Map` or `Map`?** `String` values are simpler and schema-safe. `Object` values are more flexible but require careful serialization handling. + +4. **Relationship to `AgentTask` from the incremental tick plan:** The `AssignedTask` record in this plan represents the *persisted* task assignment in aggregate state. The `AgentTask` record from the [agenttasks plan](../agenttasks.md) represents an *in-memory* reference to an active process. These are complementary — `AssignedTask` is the source of truth, `AgentTask` is the runtime handle. + +--- + +## Phase Dependencies + +``` +Phase 1 (RequestingParty) + ↓ +Phase 2 (Command & Event) ← Phase 4 (State Interfaces) + ↓ ↓ +Phase 3 (Handlers & Registration) + ↓ +Phase 5 (Tests) +``` + +Phases 1 and 4 can be implemented in parallel since they are both in the `api` module and have no mutual dependencies. Phases 2 and 3 depend on Phase 1. Phase 3 also depends on Phase 4. Phase 5 depends on all other phases. From b97faa5c8409f1a49bbc56a0c2e09590f91bc951 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 10 Apr 2026 11:41:53 +0000 Subject: [PATCH 2/5] Add AssignTask built-in command and AgentTaskAssigned event for AgenticAggregate Phase 1: RequestingParty sealed interface with AgentRequestingParty and HumanRequestingParty (api module) Phase 2: AssignTaskCommand and AgentTaskAssignedEvent with type constants (agentic module) Phase 3: AssignTaskCommandHandler, event-sourcing handler, factory registration, controller schema registration Phase 4: AssignedTask record and TaskAwareState interface (api module) Agent-Logs-Url: https://github.com/elasticsoftwarefoundation/akces-framework/sessions/b6306087-e325-41fe-9b0f-63e42ac8bd4b Co-authored-by: jwijgerd <914840+jwijgerd@users.noreply.github.com> --- .../agentic/AgenticAggregateRuntime.java | 14 ++ .../beans/AgenticAggregateRuntimeFactory.java | 15 ++ .../agentic/commands/AssignTaskCommand.java | 65 ++++++ .../events/AgentTaskAssignedEvent.java | 71 +++++++ .../AkcesAgenticAggregateController.java | 37 +++- .../runtime/AssignTaskCommandHandler.java | 190 ++++++++++++++++++ .../runtime/KafkaAgenticAggregateRuntime.java | 56 ++++++ .../akces/aggregate/AgentRequestingParty.java | 34 ++++ .../akces/aggregate/AssignedTask.java | 45 +++++ .../akces/aggregate/HumanRequestingParty.java | 34 ++++ .../akces/aggregate/RequestingParty.java | 56 ++++++ .../akces/aggregate/TaskAwareState.java | 61 ++++++ 12 files changed, 673 insertions(+), 5 deletions(-) create mode 100644 main/agentic/src/main/java/org/elasticsoftware/akces/agentic/commands/AssignTaskCommand.java create mode 100644 main/agentic/src/main/java/org/elasticsoftware/akces/agentic/events/AgentTaskAssignedEvent.java create mode 100644 main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AssignTaskCommandHandler.java create mode 100644 main/api/src/main/java/org/elasticsoftware/akces/aggregate/AgentRequestingParty.java create mode 100644 main/api/src/main/java/org/elasticsoftware/akces/aggregate/AssignedTask.java create mode 100644 main/api/src/main/java/org/elasticsoftware/akces/aggregate/HumanRequestingParty.java create mode 100644 main/api/src/main/java/org/elasticsoftware/akces/aggregate/RequestingParty.java create mode 100644 main/api/src/main/java/org/elasticsoftware/akces/aggregate/TaskAwareState.java diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/AgenticAggregateRuntime.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/AgenticAggregateRuntime.java index 97149be8..7bf0f636 100644 --- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/AgenticAggregateRuntime.java +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/AgenticAggregateRuntime.java @@ -18,10 +18,13 @@ package org.elasticsoftware.akces.agentic; import com.embabel.agent.core.AgentPlatform; +import org.elasticsoftware.akces.agentic.commands.AssignTaskCommand; +import org.elasticsoftware.akces.agentic.events.AgentTaskAssignedEvent; import org.elasticsoftware.akces.agentic.events.MemoryRevokedEvent; import org.elasticsoftware.akces.agentic.events.MemoryStoredEvent; import org.elasticsoftware.akces.aggregate.AgenticAggregateMemory; import org.elasticsoftware.akces.aggregate.AggregateRuntime; +import org.elasticsoftware.akces.aggregate.CommandType; import org.elasticsoftware.akces.aggregate.DomainEventType; import org.elasticsoftware.akces.aggregate.MemoryAwareState; import org.elasticsoftware.akces.protocol.AggregateStateRecord; @@ -47,12 +50,23 @@ * */ public interface AgenticAggregateRuntime extends AggregateRuntime { + + /** Built-in domain event type for {@link MemoryStoredEvent}. */ DomainEventType MEMORY_STORED_TYPE = new DomainEventType<>( "MemoryStored", 1, MemoryStoredEvent.class, false, false, false, false); + /** Built-in domain event type for {@link MemoryRevokedEvent}. */ DomainEventType MEMORY_REVOKED_TYPE = new DomainEventType<>( "MemoryRevoked", 1, MemoryRevokedEvent.class, false, false, false, false); + /** Built-in command type for {@link AssignTaskCommand}. */ + CommandType ASSIGN_TASK_COMMAND_TYPE = new CommandType<>( + "AssignTask", 1, AssignTaskCommand.class, false, false, false); + + /** Built-in domain event type for {@link AgentTaskAssignedEvent}. */ + DomainEventType AGENT_TASK_ASSIGNED_TYPE = new DomainEventType<>( + "AgentTaskAssigned", 1, AgentTaskAssignedEvent.class, false, false, false, false); + /** * Returns the Embabel {@link AgentPlatform} used to create and run agent processes. * diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java index d3a90693..b4f6e2d0 100644 --- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java @@ -22,6 +22,7 @@ import org.elasticsoftware.akces.agentic.AgenticAggregateRuntime; import org.elasticsoftware.akces.agentic.runtime.AgenticCommandHandlerFunctionAdapter; import org.elasticsoftware.akces.agentic.runtime.AgenticEventHandlerFunctionAdapter; +import org.elasticsoftware.akces.agentic.runtime.AssignTaskCommandHandler; import org.elasticsoftware.akces.agentic.runtime.KafkaAgenticAggregateRuntime; import org.elasticsoftware.akces.aggregate.*; import org.elasticsoftware.akces.aggregate.AgenticAggregate; @@ -45,6 +46,8 @@ import java.util.Collections; import java.util.List; +import static org.elasticsoftware.akces.agentic.AgenticAggregateRuntime.AGENT_TASK_ASSIGNED_TYPE; +import static org.elasticsoftware.akces.agentic.AgenticAggregateRuntime.ASSIGN_TASK_COMMAND_TYPE; import static org.elasticsoftware.akces.agentic.AgenticAggregateRuntime.MEMORY_REVOKED_TYPE; import static org.elasticsoftware.akces.agentic.AgenticAggregateRuntime.MEMORY_STORED_TYPE; @@ -277,6 +280,18 @@ private KafkaAggregateRuntime createRuntime(AgenticAggregateInfo agenticInfo, .addEventSourcingHandler(MEMORY_REVOKED_TYPE, KafkaAgenticAggregateRuntime::handleMemoryEvent) .addDomainEvent(MEMORY_REVOKED_TYPE); + // Register built-in AssignTask command handler and AgentTaskAssigned event-sourcing handler. + // The AssignTask command creates an Embabel AgentProcess and emits AgentTaskAssignedEvent. + @SuppressWarnings({"unchecked", "rawtypes"}) + AssignTaskCommandHandler assignTaskHandler = new AssignTaskCommandHandler<>( + (AgenticAggregate) aggregate, agentPlatform, agenticInfo.value()); + runtimeBuilder + .addCommandHandler(ASSIGN_TASK_COMMAND_TYPE, assignTaskHandler) + .addCommand(ASSIGN_TASK_COMMAND_TYPE); + runtimeBuilder + .addEventSourcingHandler(AGENT_TASK_ASSIGNED_TYPE, KafkaAgenticAggregateRuntime::handleTaskEvent) + .addDomainEvent(AGENT_TASK_ASSIGNED_TYPE); + // Collect agent-produced error types for registration and inclusion in adapters. List> agentProducedErrorTypes = buildAgentProducedErrorTypes(agenticInfo.agentProducedErrors()); diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/commands/AssignTaskCommand.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/commands/AssignTaskCommand.java new file mode 100644 index 00000000..32a8f4d4 --- /dev/null +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/commands/AssignTaskCommand.java @@ -0,0 +1,65 @@ +/* + * Copyright 2022 - 2026 The Original Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.elasticsoftware.akces.agentic.commands; + +import jakarta.annotation.Nonnull; +import jakarta.validation.constraints.NotNull; +import org.elasticsoftware.akces.aggregate.RequestingParty; +import org.elasticsoftware.akces.annotations.AggregateIdentifier; +import org.elasticsoftware.akces.annotations.CommandInfo; +import org.elasticsoftware.akces.commands.Command; + +import java.util.Map; + +/** + * Built-in framework command that assigns a task to an + * {@link org.elasticsoftware.akces.aggregate.AgenticAggregate} for AI-assisted processing. + * + *

When processed by the built-in command handler, this command triggers the creation + * of an Embabel {@code AgentProcess} and emits an + * {@link org.elasticsoftware.akces.agentic.events.AgentTaskAssignedEvent} containing the + * process identifier. + * + *

This is the first framework-owned {@link Command} implementation. Unlike + * application-defined commands, it is automatically registered for every + * {@link org.elasticsoftware.akces.aggregate.AgenticAggregate}. + * + * @param agenticAggregateId the identifier of the target agentic aggregate + * @param taskDescription a human-readable description of what the agent should do + * @param requestingParty the entity (agent or human) requesting this task + * @param taskMetadata optional key-value metadata for additional context + * (e.g. correlationId, priority, deadline); may be {@code null} + */ +@CommandInfo(type = "AssignTask", version = 1, + description = "Assigns a task to an agentic aggregate for AI-assisted processing") +public record AssignTaskCommand( + @AggregateIdentifier @NotNull String agenticAggregateId, + @NotNull String taskDescription, + @NotNull RequestingParty requestingParty, + Map taskMetadata +) implements Command { + + /** + * {@inheritDoc} + */ + @Override + @Nonnull + public String getAggregateId() { + return agenticAggregateId; + } +} diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/events/AgentTaskAssignedEvent.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/events/AgentTaskAssignedEvent.java new file mode 100644 index 00000000..e7e12b33 --- /dev/null +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/events/AgentTaskAssignedEvent.java @@ -0,0 +1,71 @@ +/* + * Copyright 2022 - 2026 The Original Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.elasticsoftware.akces.agentic.events; + +import jakarta.annotation.Nonnull; +import jakarta.validation.constraints.NotNull; +import org.elasticsoftware.akces.aggregate.RequestingParty; +import org.elasticsoftware.akces.annotations.AggregateIdentifier; +import org.elasticsoftware.akces.annotations.DomainEventInfo; +import org.elasticsoftware.akces.events.DomainEvent; + +import java.time.Instant; +import java.util.Map; + +/** + * Domain event emitted when a task has been successfully assigned to an + * {@link org.elasticsoftware.akces.aggregate.AgenticAggregate} and an Embabel + * {@code AgentProcess} has been created. + * + *

This event is produced by the built-in command handler when processing an + * {@link org.elasticsoftware.akces.agentic.commands.AssignTaskCommand}. It contains + * the Embabel {@code AgentProcess} identifier, which links the Akces domain to the + * Embabel runtime. + * + *

The built-in event-sourcing handler processes this event by updating the aggregate + * state's {@link org.elasticsoftware.akces.aggregate.TaskAwareState} with a new + * {@link org.elasticsoftware.akces.aggregate.AssignedTask}. + * + * @param agenticAggregateId the identifier of the agentic aggregate + * @param agentProcessId the Embabel {@code AgentProcess.getId()} value + * @param taskDescription the task description from the originating command + * @param requestingParty the entity that requested the task assignment + * @param taskMetadata optional key-value metadata from the originating command; + * may be {@code null} + * @param assignedAt the instant at which the task was assigned + */ +@DomainEventInfo(type = "AgentTaskAssigned", version = 1, + description = "Emitted when a task has been assigned to an agentic aggregate and an AgentProcess has been created") +public record AgentTaskAssignedEvent( + @AggregateIdentifier String agenticAggregateId, + @NotNull String agentProcessId, + @NotNull String taskDescription, + @NotNull RequestingParty requestingParty, + Map taskMetadata, + @NotNull Instant assignedAt +) implements DomainEvent { + + /** + * {@inheritDoc} + */ + @Override + @Nonnull + public String getAggregateId() { + return agenticAggregateId; + } +} diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AkcesAgenticAggregateController.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AkcesAgenticAggregateController.java index 6dfe17a2..cafb03d8 100644 --- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AkcesAgenticAggregateController.java +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AkcesAgenticAggregateController.java @@ -119,7 +119,14 @@ public class AkcesAgenticAggregateController extends Thread @SuppressWarnings("unchecked") private static final List> BUILTIN_EVENT_TYPES = List.of( AgenticAggregateRuntime.MEMORY_STORED_TYPE, - AgenticAggregateRuntime.MEMORY_REVOKED_TYPE + AgenticAggregateRuntime.MEMORY_REVOKED_TYPE, + AgenticAggregateRuntime.AGENT_TASK_ASSIGNED_TYPE + ); + + /** Built-in command types provided by the agentic framework. */ + @SuppressWarnings("unchecked") + private static final List> BUILTIN_COMMAND_TYPES = List.of( + AgenticAggregateRuntime.ASSIGN_TASK_COMMAND_TYPE ); private final ConsumerFactory schemaConsumerFactory; @@ -265,10 +272,11 @@ public void run() { } /** - * Registers the built-in {@link org.elasticsoftware.akces.agentic.events.MemoryStoredEvent} - * and {@link org.elasticsoftware.akces.agentic.events.MemoryRevokedEvent} schemas with the - * schema registry. These events are produced internally by the Embabel layer when the - * agent stores or revokes memories. + * Registers the built-in agentic schemas with the schema registry: the memory events + * ({@link org.elasticsoftware.akces.agentic.events.MemoryStoredEvent}, + * {@link org.elasticsoftware.akces.agentic.events.MemoryRevokedEvent}), + * the {@link org.elasticsoftware.akces.agentic.events.AgentTaskAssignedEvent}, and + * the {@link org.elasticsoftware.akces.agentic.commands.AssignTaskCommand}. */ private void registerBuiltinSchemas() { logger.info("Registering built-in agentic schemas for {}Aggregate", @@ -292,6 +300,25 @@ private void registerBuiltinSchemas() { + eventType.typeName(), e); } } + for (CommandType commandType : BUILTIN_COMMAND_TYPES) { + try { + aggregateRuntime.registerAndValidate(commandType, schemaRegistry); + } catch (IncompatibleSchemaException e) { + logger.warn("Built-in command schema {} is incompatible — attempting force-register", + commandType.typeName(), e); + try { + aggregateRuntime.registerAndValidate(commandType, schemaRegistry, true); + } catch (Exception ex) { + logger.error("Failed to force-register built-in command schema {}", + commandType.typeName(), ex); + throw new RuntimeException("Failed to register built-in command schema: " + + commandType.typeName(), ex); + } + } catch (Exception e) { + throw new RuntimeException("Failed to register built-in command schema: " + + commandType.typeName(), e); + } + } } /** diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AssignTaskCommandHandler.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AssignTaskCommandHandler.java new file mode 100644 index 00000000..8b7d806d --- /dev/null +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AssignTaskCommandHandler.java @@ -0,0 +1,190 @@ +/* + * Copyright 2022 - 2026 The Original Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.elasticsoftware.akces.agentic.runtime; + +import com.embabel.agent.core.Agent; +import com.embabel.agent.core.AgentPlatform; +import com.embabel.agent.core.AgentProcess; +import com.embabel.agent.core.ProcessOptions; +import jakarta.annotation.Nonnull; +import org.elasticsoftware.akces.agentic.commands.AssignTaskCommand; +import org.elasticsoftware.akces.agentic.events.AgentTaskAssignedEvent; +import org.elasticsoftware.akces.aggregate.*; +import org.elasticsoftware.akces.events.DomainEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Stream; + +import static org.elasticsoftware.akces.agentic.AgenticAggregateRuntime.AGENT_TASK_ASSIGNED_TYPE; +import static org.elasticsoftware.akces.agentic.AgenticAggregateRuntime.ASSIGN_TASK_COMMAND_TYPE; + +/** + * Built-in {@link CommandHandlerFunction} that processes + * {@link AssignTaskCommand} by creating an Embabel {@link AgentProcess} and + * emitting an {@link AgentTaskAssignedEvent}. + * + *

This handler needs runtime access to the Embabel {@link AgentPlatform} and + * {@link Agent}, so it cannot use the simple static method reference pattern used + * by the memory event-sourcing handlers. Instead, it is a dedicated class that + * captures these dependencies at construction time. + * + *

The handler populates the agent's blackboard with the task description, + * requesting party, and current aggregate state, then creates an agent process. + * The process ID from {@link AgentProcess#getId()} is included in the emitted event, + * linking the Akces domain to the Embabel runtime. + * + * @param the aggregate state type; must implement {@link AggregateState} + */ +public class AssignTaskCommandHandler + implements CommandHandlerFunction { + + private static final Logger logger = + LoggerFactory.getLogger(AssignTaskCommandHandler.class); + + private final AgenticAggregate aggregate; + private final AgentPlatform agentPlatform; + private final String aggregateName; + + /** + * Creates a new {@code AssignTaskCommandHandler}. + * + * @param aggregate the owning agentic aggregate instance + * @param agentPlatform the Embabel platform used to create agent processes + * @param aggregateName the name of the aggregate (used for agent resolution) + */ + public AssignTaskCommandHandler( + AgenticAggregate aggregate, + AgentPlatform agentPlatform, + String aggregateName) { + this.aggregate = Objects.requireNonNull(aggregate, "aggregate must not be null"); + this.agentPlatform = Objects.requireNonNull(agentPlatform, "agentPlatform must not be null"); + this.aggregateName = Objects.requireNonNull(aggregateName, "aggregateName must not be null"); + } + + /** + * Processes the {@link AssignTaskCommand} by creating an Embabel {@link AgentProcess} + * and emitting an {@link AgentTaskAssignedEvent}. + * + *

The agent process is created with bindings containing the task description, + * requesting party information, and the current aggregate state. The handler resolves + * the appropriate {@link Agent} by name from the platform. + * + * @param command the assign-task command; never {@code null} + * @param state the current aggregate state + * @return a stream containing a single {@link AgentTaskAssignedEvent} + */ + @Nonnull + @Override + public Stream apply(@Nonnull AssignTaskCommand command, S state) { + logger.debug("Processing AssignTask command for aggregate {}, taskDescription='{}'", + aggregateName, command.taskDescription()); + + Map bindings = new LinkedHashMap<>(); + bindings.put("command", command); + bindings.put("state", state); + bindings.put("agenticAggregateId", command.agenticAggregateId()); + bindings.put("taskDescription", command.taskDescription()); + bindings.put("requestingParty", command.requestingParty()); + if (command.taskMetadata() != null) { + bindings.put("taskMetadata", command.taskMetadata()); + } + + // Resolve the agent for this aggregate + Agent agent = resolveAgent(); + + AgentProcess agentProcess = + agentPlatform.createAgentProcess(agent, ProcessOptions.DEFAULT, bindings); + + String processId = agentProcess.getId(); + + logger.debug("Created AgentProcess with id={} for AssignTask on aggregate {}", + processId, aggregateName); + + AgentTaskAssignedEvent event = new AgentTaskAssignedEvent( + command.agenticAggregateId(), + processId, + command.taskDescription(), + command.requestingParty(), + command.taskMetadata(), + Instant.now()); + + return Stream.of(event); + } + + /** + * {@inheritDoc} + * + *

Always returns {@code false} — built-in commands cannot create aggregate state. + */ + @Override + public boolean isCreate() { + return false; + } + + @Override + public CommandType getCommandType() { + return ASSIGN_TASK_COMMAND_TYPE; + } + + @Override + public Aggregate getAggregate() { + return aggregate; + } + + @Override + public List> getProducedDomainEventTypes() { + @SuppressWarnings("unchecked") + DomainEventType type = (DomainEventType) (DomainEventType) AGENT_TASK_ASSIGNED_TYPE; + return List.of(type); + } + + @Override + public List> getErrorEventTypes() { + return List.of(); + } + + /** + * Resolves the {@link Agent} for the aggregate from the platform's registered agents. + * + *

Looks for an agent matching either the exact aggregate name or the + * {@code {aggregateName}Agent} convention. + * + * @return the resolved {@link Agent}; never {@code null} + * @throws IllegalStateException if no matching agent is found + */ + private Agent resolveAgent() { + String agentBeanName = aggregateName + "Agent"; + for (Agent candidate : agentPlatform.agents()) { + String candidateName = candidate.getName(); + if (aggregateName.equals(candidateName) || agentBeanName.equals(candidateName)) { + return candidate; + } + } + throw new IllegalStateException( + "No Agent found with name '" + aggregateName + "' or '" + agentBeanName + + "' in the AgentPlatform for AssignTask handling. " + + "The implementing application must provide an Agent named '" + + agentBeanName + "'."); + } +} diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntime.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntime.java index 3597ea8b..614bbb7c 100644 --- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntime.java +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntime.java @@ -20,6 +20,7 @@ import com.embabel.agent.core.AgentPlatform; import org.apache.kafka.common.errors.SerializationException; import org.elasticsoftware.akces.agentic.AgenticAggregateRuntime; +import org.elasticsoftware.akces.agentic.events.AgentTaskAssignedEvent; import org.elasticsoftware.akces.agentic.events.MemoryRevokedEvent; import org.elasticsoftware.akces.agentic.events.MemoryStoredEvent; import org.elasticsoftware.akces.events.DomainEvent; @@ -35,6 +36,7 @@ import tools.jackson.databind.ObjectMapper; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Objects; @@ -300,4 +302,58 @@ public static AggregateState handleMemoryEvent(DomainEvent event, AggregateState "Unsupported memory event type: " + event.getClass().getName()); } } + + // ------------------------------------------------------------------------- + // Built-in EventSourcingHandler for task assignment + // ------------------------------------------------------------------------- + + /** + * Built-in event-sourcing handler for {@link AgentTaskAssignedEvent}. + * + *

Creates an {@link AssignedTask} from the event and appends it to the state's + * assigned tasks list. The state must implement {@link TaskAwareState}; otherwise an + * {@link IllegalStateException} is thrown. + * + * @param event the {@code AgentTaskAssignedEvent} to apply + * @param state the current aggregate state + * @return a new state instance with the assigned task added + * @throws IllegalStateException if {@code state} does not implement {@link TaskAwareState} + */ + @SuppressWarnings("unchecked") + public static AggregateState onAgentTaskAssigned(AgentTaskAssignedEvent event, AggregateState state) { + if (!(state instanceof TaskAwareState tas)) { + throw new IllegalStateException( + "Aggregate state " + state.getClass().getName() + + " does not implement TaskAwareState"); + } + AssignedTask task = new AssignedTask( + event.agentProcessId(), + event.taskDescription(), + event.requestingParty(), + event.taskMetadata(), + event.assignedAt()); + return (AggregateState) tas.withAssignedTask(task); + } + + /** + * Single-dispatch event-sourcing handler that routes {@link AgentTaskAssignedEvent} + * to the appropriate typed handler. + * + *

Intended to be used as a method reference + * ({@code KafkaAgenticAggregateRuntime::handleTaskEvent}) so that no anonymous adapter + * class is required at the registration site. + * + * @param event the task domain event to apply; must be an {@code AgentTaskAssignedEvent} + * @param state the current aggregate state + * @return the updated aggregate state + * @throws IllegalArgumentException if {@code event} is not a recognised task event type + */ + public static AggregateState handleTaskEvent(DomainEvent event, AggregateState state) { + if (event instanceof AgentTaskAssignedEvent assigned) { + return onAgentTaskAssigned(assigned, state); + } else { + throw new IllegalArgumentException( + "Unsupported task event type: " + event.getClass().getName()); + } + } } diff --git a/main/api/src/main/java/org/elasticsoftware/akces/aggregate/AgentRequestingParty.java b/main/api/src/main/java/org/elasticsoftware/akces/aggregate/AgentRequestingParty.java new file mode 100644 index 00000000..f65bbc59 --- /dev/null +++ b/main/api/src/main/java/org/elasticsoftware/akces/aggregate/AgentRequestingParty.java @@ -0,0 +1,34 @@ +/* + * Copyright 2022 - 2026 The Original Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.elasticsoftware.akces.aggregate; + +/** + * A {@link RequestingParty} implementation representing an AI agent that requested + * a task assignment. + * + * @param agentId the unique identifier of the requesting agent + * @param agentName the display name of the requesting agent + * @param role the role of the requesting agent in the system + * (e.g. "orchestrator", "supervisor", "analyst") + */ +public record AgentRequestingParty( + String agentId, + String agentName, + String role +) implements RequestingParty { +} diff --git a/main/api/src/main/java/org/elasticsoftware/akces/aggregate/AssignedTask.java b/main/api/src/main/java/org/elasticsoftware/akces/aggregate/AssignedTask.java new file mode 100644 index 00000000..538642f7 --- /dev/null +++ b/main/api/src/main/java/org/elasticsoftware/akces/aggregate/AssignedTask.java @@ -0,0 +1,45 @@ +/* + * Copyright 2022 - 2026 The Original Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.elasticsoftware.akces.aggregate; + +import java.time.Instant; +import java.util.Map; + +/** + * An immutable record representing a task that has been assigned to an + * {@link AgenticAggregate} for AI-assisted processing. + * + *

Instances are created by the built-in event-sourcing handler when an + * {@code AgentTaskAssignedEvent} is applied, and are stored in aggregate states + * that implement {@link TaskAwareState}. + * + * @param agentProcessId the Embabel {@code AgentProcess} identifier created for this task + * @param taskDescription a human-readable description of what the agent should do + * @param requestingParty the entity (agent or human) that requested this task + * @param taskMetadata optional key-value metadata for additional context + * (e.g. correlationId, priority, deadline) + * @param assignedAt the instant at which the task was assigned + */ +public record AssignedTask( + String agentProcessId, + String taskDescription, + RequestingParty requestingParty, + Map taskMetadata, + Instant assignedAt +) { +} diff --git a/main/api/src/main/java/org/elasticsoftware/akces/aggregate/HumanRequestingParty.java b/main/api/src/main/java/org/elasticsoftware/akces/aggregate/HumanRequestingParty.java new file mode 100644 index 00000000..d0fbf5ec --- /dev/null +++ b/main/api/src/main/java/org/elasticsoftware/akces/aggregate/HumanRequestingParty.java @@ -0,0 +1,34 @@ +/* + * Copyright 2022 - 2026 The Original Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.elasticsoftware.akces.aggregate; + +/** + * A {@link RequestingParty} implementation representing a human user that requested + * a task assignment. + * + * @param userId the unique identifier of the human user + * @param displayName the display name of the human user + * @param role the role of the human user in the system + * (e.g. "administrator", "analyst", "end-user") + */ +public record HumanRequestingParty( + String userId, + String displayName, + String role +) implements RequestingParty { +} diff --git a/main/api/src/main/java/org/elasticsoftware/akces/aggregate/RequestingParty.java b/main/api/src/main/java/org/elasticsoftware/akces/aggregate/RequestingParty.java new file mode 100644 index 00000000..d2999349 --- /dev/null +++ b/main/api/src/main/java/org/elasticsoftware/akces/aggregate/RequestingParty.java @@ -0,0 +1,56 @@ +/* + * Copyright 2022 - 2026 The Original Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.elasticsoftware.akces.aggregate; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; + +/** + * Represents the entity that requested a task assignment to an + * {@link AgenticAggregate} — either another AI agent or a human user. + * + *

This is a sealed interface with two permitted implementations: + *

    + *
  • {@link AgentRequestingParty} — when the requesting party is another AI agent
  • + *
  • {@link HumanRequestingParty} — when the requesting party is a human user
  • + *
+ * + *

Each variant carries a {@link #role()} describing the requesting party's role in the + * system (e.g. "administrator", "analyst", "orchestrator"). + * + *

Jackson polymorphic serialization is configured via {@link JsonTypeInfo} and + * {@link JsonSubTypes}, using a {@code "type"} discriminator property with values + * {@code "agent"} and {@code "human"}. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonSubTypes({ + @JsonSubTypes.Type(value = AgentRequestingParty.class, name = "agent"), + @JsonSubTypes.Type(value = HumanRequestingParty.class, name = "human") +}) +public sealed interface RequestingParty permits AgentRequestingParty, HumanRequestingParty { + + /** + * The role of this requesting party in the system. + * + *

Examples: {@code "administrator"}, {@code "analyst"}, {@code "orchestrator"}, + * {@code "supervisor"}. + * + * @return the role; never {@code null} + */ + String role(); +} diff --git a/main/api/src/main/java/org/elasticsoftware/akces/aggregate/TaskAwareState.java b/main/api/src/main/java/org/elasticsoftware/akces/aggregate/TaskAwareState.java new file mode 100644 index 00000000..8d5281dd --- /dev/null +++ b/main/api/src/main/java/org/elasticsoftware/akces/aggregate/TaskAwareState.java @@ -0,0 +1,61 @@ +/* + * Copyright 2022 - 2026 The Original Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.elasticsoftware.akces.aggregate; + +import java.util.List; + +/** + * State interface for agentic aggregates that track assigned tasks. + * + *

Mirrors the {@link MemoryAwareState} pattern: the framework's built-in event-sourcing + * handler for {@code AgentTaskAssignedEvent} checks whether the aggregate state implements + * this interface, and if so, calls {@link #withAssignedTask(AssignedTask)} to update the + * state. If the state does not implement this interface, an {@link IllegalStateException} + * is thrown. + * + *

States may implement both {@link MemoryAwareState} and {@code TaskAwareState} when + * an agentic aggregate needs to track both memories and assigned tasks. + * + *

All mutation methods return new instances — implementations must be immutable. + */ +public interface TaskAwareState { + + /** + * Returns the list of currently assigned tasks. + * + * @return an unmodifiable list of assigned tasks; never {@code null} + */ + List getAssignedTasks(); + + /** + * Returns a new state instance with the given task appended to the assigned tasks list. + * + * @param task the task to add; must not be {@code null} + * @return a new state instance with the task added + */ + TaskAwareState withAssignedTask(AssignedTask task); + + /** + * Returns a new state instance with the task identified by the given + * {@code agentProcessId} removed from the assigned tasks list. + * + * @param agentProcessId the Embabel AgentProcess ID of the task to remove + * @return a new state instance with the matching task removed + */ + TaskAwareState withoutAssignedTask(String agentProcessId); +} From 1940d3859a6d5d7456cb73e2ed24fcac0010dfaa Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 10 Apr 2026 11:47:40 +0000 Subject: [PATCH 3/5] Add comprehensive tests for AssignTask support API module tests: - RequestingPartySerializationTest: sealed interface, annotations, equality, pattern matching - TaskAwareStateTest: AssignedTask record, TaskAwareState interface contract Agentic module tests: - AssignTaskCommandHandlerTest: handler creates AgentProcess, emits event, error cases - TaskEventSourcingTest: event-sourcing handler updates TaskAwareState, dispatch, sequences - AssignTaskTypeConstantsTest: type registration constant verification Agent-Logs-Url: https://github.com/elasticsoftwarefoundation/akces-framework/sessions/b6306087-e325-41fe-9b0f-63e42ac8bd4b Co-authored-by: jwijgerd <914840+jwijgerd@users.noreply.github.com> --- .../runtime/AssignTaskCommandHandlerTest.java | 210 ++++++++++++++++++ .../runtime/AssignTaskTypeConstantsTest.java | 53 +++++ .../runtime/TaskEventSourcingTest.java | 190 ++++++++++++++++ .../RequestingPartySerializationTest.java | 178 +++++++++++++++ .../akces/aggregate/TaskAwareStateTest.java | 194 ++++++++++++++++ 5 files changed, 825 insertions(+) create mode 100644 main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/AssignTaskCommandHandlerTest.java create mode 100644 main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/AssignTaskTypeConstantsTest.java create mode 100644 main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/TaskEventSourcingTest.java create mode 100644 main/api/src/test/java/org/elasticsoftware/akces/aggregate/RequestingPartySerializationTest.java create mode 100644 main/api/src/test/java/org/elasticsoftware/akces/aggregate/TaskAwareStateTest.java diff --git a/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/AssignTaskCommandHandlerTest.java b/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/AssignTaskCommandHandlerTest.java new file mode 100644 index 00000000..106299fc --- /dev/null +++ b/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/AssignTaskCommandHandlerTest.java @@ -0,0 +1,210 @@ +/* + * Copyright 2022 - 2026 The Original Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.elasticsoftware.akces.agentic.runtime; + +import com.embabel.agent.core.Agent; +import com.embabel.agent.core.AgentPlatform; +import com.embabel.agent.core.AgentProcess; +import com.embabel.agent.core.ProcessOptions; +import org.elasticsoftware.akces.agentic.commands.AssignTaskCommand; +import org.elasticsoftware.akces.agentic.events.AgentTaskAssignedEvent; +import org.elasticsoftware.akces.aggregate.*; +import org.elasticsoftware.akces.events.DomainEvent; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.*; + +/** + * Unit tests for {@link AssignTaskCommandHandler}, verifying that it creates an + * Embabel {@link AgentProcess} and emits an {@link AgentTaskAssignedEvent} with + * the correct process ID. + */ +@ExtendWith(MockitoExtension.class) +class AssignTaskCommandHandlerTest { + + /** Simple test state implementing AggregateState. */ + record TestState(String id) implements AggregateState { + @Override + public String getAggregateId() { + return id; + } + } + + /** Test AgenticAggregate. */ + static class TestAggregate implements AgenticAggregate { + @Override + public Class getStateClass() { + return TestState.class; + } + } + + @Mock + private AgentPlatform agentPlatform; + + @Mock + private AgentProcess agentProcess; + + @Mock + private Agent agent; + + private AssignTaskCommandHandler handler; + + @BeforeEach + void setUp() { + handler = new AssignTaskCommandHandler<>(new TestAggregate(), agentPlatform, "TestAggregate"); + } + + private void setUpAgentResolution() { + when(agent.getName()).thenReturn("TestAggregateAgent"); + when(agentPlatform.agents()).thenReturn(List.of(agent)); + } + + @Test + void applyShouldCreateAgentProcessAndEmitEvent() { + setUpAgentResolution(); + var party = new HumanRequestingParty("user-1", "Alice", "analyst"); + var command = new AssignTaskCommand("agg-1", "Analyze data", party, Map.of("key", "value")); + var state = new TestState("agg-1"); + + when(agentPlatform.createAgentProcess(eq(agent), eq(ProcessOptions.DEFAULT), any())) + .thenReturn(agentProcess); + when(agentProcess.getId()).thenReturn("embabel-proc-42"); + + Stream result = handler.apply(command, state); + List events = result.toList(); + + assertThat(events).hasSize(1); + assertThat(events.getFirst()).isInstanceOf(AgentTaskAssignedEvent.class); + + var event = (AgentTaskAssignedEvent) events.getFirst(); + assertThat(event.agenticAggregateId()).isEqualTo("agg-1"); + assertThat(event.agentProcessId()).isEqualTo("embabel-proc-42"); + assertThat(event.taskDescription()).isEqualTo("Analyze data"); + assertThat(event.requestingParty()).isEqualTo(party); + assertThat(event.taskMetadata()).isEqualTo(Map.of("key", "value")); + assertThat(event.assignedAt()).isNotNull(); + + verify(agentPlatform).createAgentProcess(eq(agent), eq(ProcessOptions.DEFAULT), any()); + verify(agentProcess).getId(); + } + + @Test + void applyShouldPropagateAgentRequestingParty() { + setUpAgentResolution(); + var party = new AgentRequestingParty("agent-99", "Orchestrator", "supervisor"); + var command = new AssignTaskCommand("agg-1", "Process task", party, null); + var state = new TestState("agg-1"); + + when(agentPlatform.createAgentProcess(eq(agent), eq(ProcessOptions.DEFAULT), any())) + .thenReturn(agentProcess); + when(agentProcess.getId()).thenReturn("proc-abc"); + + List events = handler.apply(command, state).toList(); + + var event = (AgentTaskAssignedEvent) events.getFirst(); + assertThat(event.requestingParty()).isInstanceOf(AgentRequestingParty.class); + assertThat(event.requestingParty().role()).isEqualTo("supervisor"); + } + + @Test + void applyShouldHandleNullMetadata() { + setUpAgentResolution(); + var party = new HumanRequestingParty("user-1", "Bob", "admin"); + var command = new AssignTaskCommand("agg-1", "Simple task", party, null); + var state = new TestState("agg-1"); + + when(agentPlatform.createAgentProcess(eq(agent), eq(ProcessOptions.DEFAULT), any())) + .thenReturn(agentProcess); + when(agentProcess.getId()).thenReturn("proc-xyz"); + + List events = handler.apply(command, state).toList(); + + var event = (AgentTaskAssignedEvent) events.getFirst(); + assertThat(event.taskMetadata()).isNull(); + } + + @Test + void isCreateShouldReturnFalse() { + assertThat(handler.isCreate()).isFalse(); + } + + @Test + void getCommandTypeShouldReturnAssignTaskType() { + assertThat(handler.getCommandType().typeName()).isEqualTo("AssignTask"); + assertThat(handler.getCommandType().version()).isEqualTo(1); + } + + @Test + void getProducedDomainEventTypesShouldContainAgentTaskAssigned() { + assertThat(handler.getProducedDomainEventTypes()).hasSize(1); + assertThat(handler.getProducedDomainEventTypes().getFirst().typeName()) + .isEqualTo("AgentTaskAssigned"); + } + + @Test + void getErrorEventTypesShouldBeEmpty() { + assertThat(handler.getErrorEventTypes()).isEmpty(); + } + + @Test + void constructorShouldRejectNullAggregate() { + assertThatThrownBy(() -> new AssignTaskCommandHandler<>(null, agentPlatform, "TestAggregate")) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("aggregate"); + } + + @Test + void constructorShouldRejectNullAgentPlatform() { + assertThatThrownBy(() -> new AssignTaskCommandHandler<>(new TestAggregate(), null, "TestAggregate")) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("agentPlatform"); + } + + @Test + void constructorShouldRejectNullAggregateName() { + assertThatThrownBy(() -> new AssignTaskCommandHandler<>(new TestAggregate(), agentPlatform, null)) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("aggregateName"); + } + + @Test + void applyShouldThrowWhenNoAgentFound() { + // Agent platform has no matching agents + when(agentPlatform.agents()).thenReturn(List.of()); + var noAgentHandler = new AssignTaskCommandHandler<>(new TestAggregate(), agentPlatform, "Unknown"); + var party = new HumanRequestingParty("user-1", "Alice", "analyst"); + var command = new AssignTaskCommand("agg-1", "task", party, null); + var state = new TestState("agg-1"); + + assertThatThrownBy(() -> noAgentHandler.apply(command, state)) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("No Agent found"); + } +} diff --git a/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/AssignTaskTypeConstantsTest.java b/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/AssignTaskTypeConstantsTest.java new file mode 100644 index 00000000..c5670244 --- /dev/null +++ b/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/AssignTaskTypeConstantsTest.java @@ -0,0 +1,53 @@ +/* + * Copyright 2022 - 2026 The Original Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.elasticsoftware.akces.agentic.runtime; + +import org.elasticsoftware.akces.agentic.AgenticAggregateRuntime; +import org.elasticsoftware.akces.agentic.commands.AssignTaskCommand; +import org.elasticsoftware.akces.agentic.events.AgentTaskAssignedEvent; +import org.elasticsoftware.akces.aggregate.CommandType; +import org.elasticsoftware.akces.aggregate.DomainEventType; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for the built-in type registration constants on {@link AgenticAggregateRuntime}, + * verifying the {@code ASSIGN_TASK_COMMAND_TYPE} and {@code AGENT_TASK_ASSIGNED_TYPE} + * constants are correctly configured. + */ +class AssignTaskTypeConstantsTest { + + @Test + void assignTaskCommandTypeShouldBeConfiguredCorrectly() { + CommandType type = AgenticAggregateRuntime.ASSIGN_TASK_COMMAND_TYPE; + + assertThat(type.typeName()).isEqualTo("AssignTask"); + assertThat(type.version()).isEqualTo(1); + assertThat(type.typeClass()).isEqualTo(AssignTaskCommand.class); + } + + @Test + void agentTaskAssignedTypeShouldBeConfiguredCorrectly() { + DomainEventType type = AgenticAggregateRuntime.AGENT_TASK_ASSIGNED_TYPE; + + assertThat(type.typeName()).isEqualTo("AgentTaskAssigned"); + assertThat(type.version()).isEqualTo(1); + assertThat(type.typeClass()).isEqualTo(AgentTaskAssignedEvent.class); + } +} diff --git a/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/TaskEventSourcingTest.java b/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/TaskEventSourcingTest.java new file mode 100644 index 00000000..f43c6fe5 --- /dev/null +++ b/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/TaskEventSourcingTest.java @@ -0,0 +1,190 @@ +/* + * Copyright 2022 - 2026 The Original Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.elasticsoftware.akces.agentic.runtime; + +import org.elasticsoftware.akces.agentic.events.AgentTaskAssignedEvent; +import org.elasticsoftware.akces.aggregate.*; +import org.elasticsoftware.akces.events.DomainEvent; +import org.junit.jupiter.api.Test; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * Tests for the built-in event-sourcing handler in {@link KafkaAgenticAggregateRuntime} + * that processes {@link AgentTaskAssignedEvent} by updating {@link TaskAwareState}. + * + *

Follows the same pattern as {@link MemoryEventSourcingTest}: event-sourcing handler + * methods are tested through their actual invocations. + */ +class TaskEventSourcingTest { + + /** Concrete {@link TaskAwareState} implementation for test assertions. */ + record TestTaskState( + String id, + List assignedTasks + ) implements AggregateState, TaskAwareState { + + @Override + public String getAggregateId() { + return id; + } + + @Override + public List getAssignedTasks() { + return assignedTasks; + } + + @Override + public TaskAwareState withAssignedTask(AssignedTask task) { + var updated = new ArrayList<>(assignedTasks); + updated.add(task); + return new TestTaskState(id, List.copyOf(updated)); + } + + @Override + public TaskAwareState withoutAssignedTask(String agentProcessId) { + var updated = assignedTasks.stream() + .filter(t -> !t.agentProcessId().equals(agentProcessId)) + .toList(); + return new TestTaskState(id, updated); + } + } + + /** A non-TaskAwareState state for testing error paths. */ + record PlainState(String id) implements AggregateState { + @Override + public String getAggregateId() { + return id; + } + } + + // ------------------------------------------------------------------------- + // onAgentTaskAssigned tests + // ------------------------------------------------------------------------- + + @Test + void onAgentTaskAssignedShouldAppendTaskToState() { + Instant now = Instant.parse("2026-04-10T12:00:00Z"); + var party = new HumanRequestingParty("user-1", "Alice", "analyst"); + var metadata = Map.of("correlationId", "corr-123"); + var event = new AgentTaskAssignedEvent("agg-1", "proc-1", "Analyze data", + party, metadata, now); + var initialState = new TestTaskState("agg-1", List.of()); + + AggregateState result = KafkaAgenticAggregateRuntime.onAgentTaskAssigned(event, initialState); + + assertThat(result).isInstanceOf(TestTaskState.class); + var state = (TestTaskState) result; + assertThat(state.getAssignedTasks()).hasSize(1); + AssignedTask assigned = state.getAssignedTasks().getFirst(); + assertThat(assigned.agentProcessId()).isEqualTo("proc-1"); + assertThat(assigned.taskDescription()).isEqualTo("Analyze data"); + assertThat(assigned.requestingParty()).isEqualTo(party); + assertThat(assigned.taskMetadata()).isEqualTo(metadata); + assertThat(assigned.assignedAt()).isEqualTo(now); + } + + @Test + void onAgentTaskAssignedShouldThrowWhenStateIsNotTaskAware() { + var event = new AgentTaskAssignedEvent("agg-1", "proc-1", "task", + new HumanRequestingParty("u", "n", "r"), null, Instant.now()); + var plainState = new PlainState("agg-1"); + + assertThatThrownBy(() -> KafkaAgenticAggregateRuntime.onAgentTaskAssigned(event, plainState)) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("does not implement TaskAwareState"); + } + + @Test + void onAgentTaskAssignedShouldPreserveExistingTasks() { + Instant t1 = Instant.parse("2026-01-01T00:00:00Z"); + Instant t2 = Instant.parse("2026-01-02T00:00:00Z"); + var party = new AgentRequestingParty("agent-1", "TestAgent", "supervisor"); + var existing = new AssignedTask("proc-old", "Old task", party, null, t1); + var state = new TestTaskState("agg-1", List.of(existing)); + + var event = new AgentTaskAssignedEvent("agg-1", "proc-new", "New task", + party, Map.of("priority", "high"), t2); + + AggregateState result = KafkaAgenticAggregateRuntime.onAgentTaskAssigned(event, state); + + var newState = (TestTaskState) result; + assertThat(newState.getAssignedTasks()).hasSize(2); + assertThat(newState.getAssignedTasks().get(0).agentProcessId()).isEqualTo("proc-old"); + assertThat(newState.getAssignedTasks().get(1).agentProcessId()).isEqualTo("proc-new"); + } + + // ------------------------------------------------------------------------- + // handleTaskEvent dispatch tests + // ------------------------------------------------------------------------- + + @Test + void handleTaskEventShouldDispatchAgentTaskAssignedEvent() { + Instant now = Instant.now(); + var party = new HumanRequestingParty("user-1", "Alice", "analyst"); + var event = new AgentTaskAssignedEvent("agg-1", "proc-1", "task", party, null, now); + var state = new TestTaskState("agg-1", List.of()); + + AggregateState result = KafkaAgenticAggregateRuntime.handleTaskEvent(event, state); + + assertThat(result).isInstanceOf(TestTaskState.class); + assertThat(((TestTaskState) result).getAssignedTasks()).hasSize(1); + } + + @Test + void handleTaskEventShouldThrowForUnknownEventType() { + var unknownEvent = new DomainEvent() { + @Override + public String getAggregateId() { + return "agg-1"; + } + }; + var state = new TestTaskState("agg-1", List.of()); + + assertThatThrownBy(() -> KafkaAgenticAggregateRuntime.handleTaskEvent(unknownEvent, state)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Unsupported task event type"); + } + + // ------------------------------------------------------------------------- + // State reconstruction from event sequences + // ------------------------------------------------------------------------- + + @Test + void shouldReconstructStateFromMultipleTaskAssignedEvents() { + AggregateState state = new TestTaskState("agg-1", List.of()); + var party = new HumanRequestingParty("user-1", "Alice", "analyst"); + + for (int i = 1; i <= 3; i++) { + var event = new AgentTaskAssignedEvent("agg-1", "proc-" + i, "Task " + i, + party, null, Instant.now().plusSeconds(i)); + state = KafkaAgenticAggregateRuntime.handleTaskEvent(event, state); + } + + var finalState = (TestTaskState) state; + assertThat(finalState.getAssignedTasks()).hasSize(3); + assertThat(finalState.getAssignedTasks()).extracting(AssignedTask::agentProcessId) + .containsExactly("proc-1", "proc-2", "proc-3"); + } +} diff --git a/main/api/src/test/java/org/elasticsoftware/akces/aggregate/RequestingPartySerializationTest.java b/main/api/src/test/java/org/elasticsoftware/akces/aggregate/RequestingPartySerializationTest.java new file mode 100644 index 00000000..8319cf4c --- /dev/null +++ b/main/api/src/test/java/org/elasticsoftware/akces/aggregate/RequestingPartySerializationTest.java @@ -0,0 +1,178 @@ +/* + * Copyright 2022 - 2026 The Original Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.elasticsoftware.akces.aggregate; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for the {@link RequestingParty} sealed interface and its implementations + * ({@link AgentRequestingParty} and {@link HumanRequestingParty}), verifying + * record construction, equality, and Jackson annotation configuration. + * + *

Since the api module only has {@code jackson-annotations} (not {@code jackson-databind}), + * serialization round-trips are verified in the agentic module tests where a full Jackson + * {@code ObjectMapper} is available. + */ +class RequestingPartySerializationTest { + + // ------------------------------------------------------------------------- + // AgentRequestingParty tests + // ------------------------------------------------------------------------- + + @Test + void agentRequestingPartyShouldPopulateAllFields() { + var agent = new AgentRequestingParty("agent-1", "Orchestrator", "supervisor"); + + assertThat(agent.agentId()).isEqualTo("agent-1"); + assertThat(agent.agentName()).isEqualTo("Orchestrator"); + assertThat(agent.role()).isEqualTo("supervisor"); + } + + @Test + void agentRequestingPartyShouldImplementRequestingParty() { + var agent = new AgentRequestingParty("agent-1", "Orchestrator", "supervisor"); + + assertThat(agent).isInstanceOf(RequestingParty.class); + assertThat(((RequestingParty) agent).role()).isEqualTo("supervisor"); + } + + // ------------------------------------------------------------------------- + // HumanRequestingParty tests + // ------------------------------------------------------------------------- + + @Test + void humanRequestingPartyShouldPopulateAllFields() { + var human = new HumanRequestingParty("user-1", "John Doe", "administrator"); + + assertThat(human.userId()).isEqualTo("user-1"); + assertThat(human.displayName()).isEqualTo("John Doe"); + assertThat(human.role()).isEqualTo("administrator"); + } + + @Test + void humanRequestingPartyShouldImplementRequestingParty() { + var human = new HumanRequestingParty("user-1", "John Doe", "administrator"); + + assertThat(human).isInstanceOf(RequestingParty.class); + assertThat(((RequestingParty) human).role()).isEqualTo("administrator"); + } + + // ------------------------------------------------------------------------- + // Equality tests + // ------------------------------------------------------------------------- + + @Test + void agentRequestingPartyEqualityShouldHold() { + var a = new AgentRequestingParty("id-1", "Name", "role"); + var b = new AgentRequestingParty("id-1", "Name", "role"); + + assertThat(a).isEqualTo(b); + assertThat(a.hashCode()).isEqualTo(b.hashCode()); + } + + @Test + void humanRequestingPartyEqualityShouldHold() { + var a = new HumanRequestingParty("id-1", "Name", "role"); + var b = new HumanRequestingParty("id-1", "Name", "role"); + + assertThat(a).isEqualTo(b); + assertThat(a.hashCode()).isEqualTo(b.hashCode()); + } + + @Test + void agentAndHumanShouldNotBeEqual() { + var agent = new AgentRequestingParty("id-1", "Name", "role"); + var human = new HumanRequestingParty("id-1", "Name", "role"); + + assertThat(agent).isNotEqualTo(human); + } + + @Test + void agentRequestingPartyInequalityWhenIdDiffers() { + var a = new AgentRequestingParty("id-1", "Name", "role"); + var b = new AgentRequestingParty("id-2", "Name", "role"); + + assertThat(a).isNotEqualTo(b); + } + + @Test + void humanRequestingPartyInequalityWhenIdDiffers() { + var a = new HumanRequestingParty("id-1", "Name", "role"); + var b = new HumanRequestingParty("id-2", "Name", "role"); + + assertThat(a).isNotEqualTo(b); + } + + // ------------------------------------------------------------------------- + // Jackson annotation tests + // ------------------------------------------------------------------------- + + @Test + void requestingPartyShouldHaveJsonTypeInfoAnnotation() { + JsonTypeInfo typeInfo = RequestingParty.class.getAnnotation(JsonTypeInfo.class); + + assertThat(typeInfo).isNotNull(); + assertThat(typeInfo.use()).isEqualTo(JsonTypeInfo.Id.NAME); + assertThat(typeInfo.property()).isEqualTo("type"); + } + + @Test + void requestingPartyShouldHaveJsonSubTypesAnnotation() { + JsonSubTypes subTypes = RequestingParty.class.getAnnotation(JsonSubTypes.class); + + assertThat(subTypes).isNotNull(); + assertThat(subTypes.value()).hasSize(2); + + JsonSubTypes.Type[] types = subTypes.value(); + assertThat(types[0].value()).isEqualTo(AgentRequestingParty.class); + assertThat(types[0].name()).isEqualTo("agent"); + assertThat(types[1].value()).isEqualTo(HumanRequestingParty.class); + assertThat(types[1].name()).isEqualTo("human"); + } + + // ------------------------------------------------------------------------- + // Sealed interface tests + // ------------------------------------------------------------------------- + + @Test + void requestingPartyShouldBeSealed() { + assertThat(RequestingParty.class.isSealed()).isTrue(); + assertThat(RequestingParty.class.getPermittedSubclasses()) + .containsExactlyInAnyOrder(AgentRequestingParty.class, HumanRequestingParty.class); + } + + @Test + void patternMatchingShouldWorkWithSwitch() { + RequestingParty agent = new AgentRequestingParty("a-1", "TestAgent", "supervisor"); + RequestingParty human = new HumanRequestingParty("u-1", "TestUser", "analyst"); + + assertThat(describe(agent)).isEqualTo("Agent: TestAgent (supervisor)"); + assertThat(describe(human)).isEqualTo("Human: TestUser (analyst)"); + } + + private static String describe(RequestingParty party) { + return switch (party) { + case AgentRequestingParty a -> "Agent: " + a.agentName() + " (" + a.role() + ")"; + case HumanRequestingParty h -> "Human: " + h.displayName() + " (" + h.role() + ")"; + }; + } +} diff --git a/main/api/src/test/java/org/elasticsoftware/akces/aggregate/TaskAwareStateTest.java b/main/api/src/test/java/org/elasticsoftware/akces/aggregate/TaskAwareStateTest.java new file mode 100644 index 00000000..36c1f266 --- /dev/null +++ b/main/api/src/test/java/org/elasticsoftware/akces/aggregate/TaskAwareStateTest.java @@ -0,0 +1,194 @@ +/* + * Copyright 2022 - 2026 The Original Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.elasticsoftware.akces.aggregate; + +import org.junit.jupiter.api.Test; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for the {@link TaskAwareState} interface contract and the {@link AssignedTask} record, + * following the same pattern as {@link MemoryAwareStateTest}. Per framework testing guidelines, + * interfaces and records are tested through concrete implementations rather than directly. + */ +class TaskAwareStateTest { + + /** Concrete {@link TaskAwareState} implementation for testing. */ + record TestTaskState( + String id, + List assignedTasks + ) implements AggregateState, TaskAwareState { + + @Override + public String getAggregateId() { + return id; + } + + @Override + public List getAssignedTasks() { + return assignedTasks; + } + + @Override + public TaskAwareState withAssignedTask(AssignedTask task) { + var updated = new ArrayList<>(assignedTasks); + updated.add(task); + return new TestTaskState(id, List.copyOf(updated)); + } + + @Override + public TaskAwareState withoutAssignedTask(String agentProcessId) { + var updated = assignedTasks.stream() + .filter(t -> !t.agentProcessId().equals(agentProcessId)) + .toList(); + return new TestTaskState(id, updated); + } + } + + // ------------------------------------------------------------------------- + // AssignedTask record tests + // ------------------------------------------------------------------------- + + @Test + void assignedTaskShouldPopulateAllFields() { + Instant now = Instant.now(); + var party = new HumanRequestingParty("user-1", "Alice", "analyst"); + var metadata = Map.of("correlationId", "corr-123"); + + var task = new AssignedTask("proc-1", "Analyze data", party, metadata, now); + + assertThat(task.agentProcessId()).isEqualTo("proc-1"); + assertThat(task.taskDescription()).isEqualTo("Analyze data"); + assertThat(task.requestingParty()).isEqualTo(party); + assertThat(task.taskMetadata()).isEqualTo(metadata); + assertThat(task.assignedAt()).isEqualTo(now); + } + + @Test + void assignedTaskEqualityShouldHold() { + Instant now = Instant.parse("2026-04-10T12:00:00Z"); + var party = new AgentRequestingParty("agent-1", "TestAgent", "supervisor"); + var a = new AssignedTask("proc-1", "Do task", party, Map.of(), now); + var b = new AssignedTask("proc-1", "Do task", party, Map.of(), now); + + assertThat(a).isEqualTo(b); + assertThat(a.hashCode()).isEqualTo(b.hashCode()); + } + + @Test + void assignedTaskInequalityWhenProcessIdDiffers() { + Instant now = Instant.now(); + var party = new HumanRequestingParty("user-1", "Alice", "analyst"); + var a = new AssignedTask("proc-1", "task", party, null, now); + var b = new AssignedTask("proc-2", "task", party, null, now); + + assertThat(a).isNotEqualTo(b); + } + + @Test + void assignedTaskShouldAllowNullMetadata() { + Instant now = Instant.now(); + var party = new HumanRequestingParty("user-1", "Alice", "analyst"); + + var task = new AssignedTask("proc-1", "task", party, null, now); + + assertThat(task.taskMetadata()).isNull(); + } + + // ------------------------------------------------------------------------- + // TaskAwareState contract tests + // ------------------------------------------------------------------------- + + @Test + void emptyStateShouldReturnEmptyTaskList() { + var state = new TestTaskState("agg-1", List.of()); + + assertThat(state.getAssignedTasks()).isEmpty(); + } + + @Test + void withAssignedTaskShouldAppendToEnd() { + Instant now = Instant.now(); + var party = new HumanRequestingParty("user-1", "Alice", "analyst"); + var task1 = new AssignedTask("proc-1", "Task 1", party, null, now); + var task2 = new AssignedTask("proc-2", "Task 2", party, null, now.plusSeconds(1)); + + TaskAwareState state = new TestTaskState("agg-1", List.of()); + state = state.withAssignedTask(task1); + state = state.withAssignedTask(task2); + + assertThat(state.getAssignedTasks()).hasSize(2); + assertThat(state.getAssignedTasks().get(0).agentProcessId()).isEqualTo("proc-1"); + assertThat(state.getAssignedTasks().get(1).agentProcessId()).isEqualTo("proc-2"); + } + + @Test + void withoutAssignedTaskShouldRemoveByProcessId() { + Instant now = Instant.now(); + var party = new AgentRequestingParty("agent-1", "Agent", "supervisor"); + var task1 = new AssignedTask("proc-1", "Task 1", party, null, now); + var task2 = new AssignedTask("proc-2", "Task 2", party, null, now); + var task3 = new AssignedTask("proc-3", "Task 3", party, null, now); + + TaskAwareState state = new TestTaskState("agg-1", List.of(task1, task2, task3)); + state = state.withoutAssignedTask("proc-2"); + + assertThat(state.getAssignedTasks()).hasSize(2); + assertThat(state.getAssignedTasks()).extracting(AssignedTask::agentProcessId) + .containsExactly("proc-1", "proc-3"); + } + + @Test + void withoutNonExistentProcessIdShouldReturnEquivalentState() { + Instant now = Instant.now(); + var party = new HumanRequestingParty("user-1", "Alice", "analyst"); + var task = new AssignedTask("proc-1", "Task 1", party, null, now); + + TaskAwareState state = new TestTaskState("agg-1", List.of(task)); + TaskAwareState unchanged = state.withoutAssignedTask("non-existent"); + + assertThat(unchanged.getAssignedTasks()).isEqualTo(state.getAssignedTasks()); + } + + @Test + void taskRoundTripThroughTaskAwareState() { + Instant now = Instant.now(); + var party = new HumanRequestingParty("user-42", "Bob", "manager"); + var metadata = Map.of("priority", "high", "deadline", "2026-05-01"); + var task = new AssignedTask("proc-42", "Urgent analysis", party, metadata, now); + + var emptyState = new TestTaskState("agg-1", List.of()); + var stateWithTask = (TestTaskState) emptyState.withAssignedTask(task); + + assertThat(stateWithTask.getAssignedTasks()).hasSize(1); + assertThat(stateWithTask.getAssignedTasks().getFirst()).isEqualTo(task); + + // Verify all fields survived the round-trip + AssignedTask retrieved = stateWithTask.getAssignedTasks().getFirst(); + assertThat(retrieved.agentProcessId()).isEqualTo("proc-42"); + assertThat(retrieved.taskDescription()).isEqualTo("Urgent analysis"); + assertThat(retrieved.requestingParty()).isEqualTo(party); + assertThat(retrieved.taskMetadata()).isEqualTo(metadata); + assertThat(retrieved.assignedAt()).isEqualTo(now); + } +} From 424e30b20aa20269c0f8319fceb95555e5d3593d Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 10 Apr 2026 12:52:55 +0000 Subject: [PATCH 4/5] Address PR review feedback: GDPR, unified handlers, forceRegister flag - Remove displayName from HumanRequestingParty for GDPR (comment #3064274600) - Replace AssignTaskCommandHandler class with static builtInCommandHandler() factory method on KafkaAgenticAggregateRuntime (comment #3064259933) - Rename handleMemoryEvent to handleBuiltInEvent, merge AgentTaskAssignedEvent support into it, remove handleTaskEvent (comment #3064268880) - Use forceRegisterOnIncompatible flag from environment property instead of automatic force-register in registerBuiltinSchemas (comment #3064241901) - Remove unused ArrayList import from KafkaAgenticAggregateRuntime - Update all tests for new method signatures and 2-arg HumanRequestingParty Agent-Logs-Url: https://github.com/elasticsoftwarefoundation/akces-framework/sessions/83c4f48e-f510-429f-8c85-0f14c17d75d4 Co-authored-by: jwijgerd <914840+jwijgerd@users.noreply.github.com> --- .../beans/AgenticAggregateRuntimeFactory.java | 28 ++- .../AkcesAgenticAggregateController.java | 56 ++++-- .../runtime/AssignTaskCommandHandler.java | 190 ------------------ .../runtime/KafkaAgenticAggregateRuntime.java | 164 +++++++++++---- .../runtime/AssignTaskCommandHandlerTest.java | 89 +++----- .../runtime/MemoryEventSourcingTest.java | 26 +-- .../runtime/TaskEventSourcingTest.java | 22 +- .../akces/aggregate/HumanRequestingParty.java | 11 +- .../RequestingPartySerializationTest.java | 21 +- .../akces/aggregate/TaskAwareStateTest.java | 12 +- 10 files changed, 248 insertions(+), 371 deletions(-) delete mode 100644 main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AssignTaskCommandHandler.java diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java index b4f6e2d0..d990d0f9 100644 --- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java @@ -22,7 +22,6 @@ import org.elasticsoftware.akces.agentic.AgenticAggregateRuntime; import org.elasticsoftware.akces.agentic.runtime.AgenticCommandHandlerFunctionAdapter; import org.elasticsoftware.akces.agentic.runtime.AgenticEventHandlerFunctionAdapter; -import org.elasticsoftware.akces.agentic.runtime.AssignTaskCommandHandler; import org.elasticsoftware.akces.agentic.runtime.KafkaAgenticAggregateRuntime; import org.elasticsoftware.akces.aggregate.*; import org.elasticsoftware.akces.aggregate.AgenticAggregate; @@ -270,27 +269,26 @@ private KafkaAggregateRuntime createRuntime(AgenticAggregateInfo agenticInfo, } }); - // Register built-in event-sourcing handlers for memory management. - // These handle the framework-owned MemoryStored and MemoryRevoked events, - // which every AgenticAggregate state must process via MemoryAwareState. + // Register built-in event-sourcing handlers for memory management and task assignment. + // These handle the framework-owned MemoryStored, MemoryRevoked, and AgentTaskAssigned + // events, using a unified single-dispatch handler. runtimeBuilder - .addEventSourcingHandler(MEMORY_STORED_TYPE, KafkaAgenticAggregateRuntime::handleMemoryEvent) + .addEventSourcingHandler(MEMORY_STORED_TYPE, KafkaAgenticAggregateRuntime::handleBuiltInEvent) .addDomainEvent(MEMORY_STORED_TYPE); runtimeBuilder - .addEventSourcingHandler(MEMORY_REVOKED_TYPE, KafkaAgenticAggregateRuntime::handleMemoryEvent) + .addEventSourcingHandler(MEMORY_REVOKED_TYPE, KafkaAgenticAggregateRuntime::handleBuiltInEvent) .addDomainEvent(MEMORY_REVOKED_TYPE); + runtimeBuilder + .addEventSourcingHandler(AGENT_TASK_ASSIGNED_TYPE, KafkaAgenticAggregateRuntime::handleBuiltInEvent) + .addDomainEvent(AGENT_TASK_ASSIGNED_TYPE); - // Register built-in AssignTask command handler and AgentTaskAssigned event-sourcing handler. - // The AssignTask command creates an Embabel AgentProcess and emits AgentTaskAssignedEvent. - @SuppressWarnings({"unchecked", "rawtypes"}) - AssignTaskCommandHandler assignTaskHandler = new AssignTaskCommandHandler<>( - (AgenticAggregate) aggregate, agentPlatform, agenticInfo.value()); + // Register built-in AssignTask command handler using a single-dispatch handler on + // KafkaAgenticAggregateRuntime. The handler resolves the agent from the platform, + // creates an AgentProcess, and emits AgentTaskAssignedEvent. runtimeBuilder - .addCommandHandler(ASSIGN_TASK_COMMAND_TYPE, assignTaskHandler) + .addCommandHandler(ASSIGN_TASK_COMMAND_TYPE, + KafkaAgenticAggregateRuntime.builtInCommandHandler(agentPlatform, agenticInfo.value())) .addCommand(ASSIGN_TASK_COMMAND_TYPE); - runtimeBuilder - .addEventSourcingHandler(AGENT_TASK_ASSIGNED_TYPE, KafkaAgenticAggregateRuntime::handleTaskEvent) - .addDomainEvent(AGENT_TASK_ASSIGNED_TYPE); // Collect agent-produced error types for registration and inclusion in adapters. List> agentProducedErrorTypes = diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AkcesAgenticAggregateController.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AkcesAgenticAggregateController.java index cafb03d8..ab4eb4f8 100644 --- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AkcesAgenticAggregateController.java +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AkcesAgenticAggregateController.java @@ -157,6 +157,10 @@ public class AkcesAgenticAggregateController extends Thread * subclasses or future extensions that may need environment properties. */ private Environment environment; + /** Whether to force-register built-in schemas when they are incompatible with existing ones. + * Configured via the {@code akces.aggregate.schemas.forceRegister} environment property. */ + private boolean forceRegisterOnIncompatible = false; + /** * Creates a new {@code AkcesAgenticAggregateController}. * @@ -277,6 +281,11 @@ public void run() { * {@link org.elasticsoftware.akces.agentic.events.MemoryRevokedEvent}), * the {@link org.elasticsoftware.akces.agentic.events.AgentTaskAssignedEvent}, and * the {@link org.elasticsoftware.akces.agentic.commands.AssignTaskCommand}. + * + *

If a schema is incompatible with the existing registered schema, the behaviour + * depends on the {@code akces.aggregate.schemas.forceRegister} environment property: + * when {@code true}, the schema is force-registered; otherwise the incompatibility is + * propagated as a fatal error. */ private void registerBuiltinSchemas() { logger.info("Registering built-in agentic schemas for {}Aggregate", @@ -285,15 +294,20 @@ private void registerBuiltinSchemas() { try { aggregateRuntime.registerAndValidate(eventType, schemaRegistry); } catch (IncompatibleSchemaException e) { - logger.warn("Built-in event schema {} is incompatible — attempting force-register", - eventType.typeName(), e); - try { - aggregateRuntime.registerAndValidate(eventType, schemaRegistry, true); - } catch (Exception ex) { - logger.error("Failed to force-register built-in event schema {}", - eventType.typeName(), ex); - throw new RuntimeException("Failed to register built-in event schema: " - + eventType.typeName(), ex); + if (forceRegisterOnIncompatible) { + logger.warn("Built-in event schema {} is incompatible — force-registering (forceRegister=true)", + eventType.typeName(), e); + try { + aggregateRuntime.registerAndValidate(eventType, schemaRegistry, true); + } catch (Exception ex) { + logger.error("Failed to force-register built-in event schema {}", + eventType.typeName(), ex); + throw new RuntimeException("Failed to register built-in event schema: " + + eventType.typeName(), ex); + } + } else { + throw new RuntimeException("Built-in event schema " + eventType.typeName() + + " is incompatible. Set akces.aggregate.schemas.forceRegister=true to override.", e); } } catch (Exception e) { throw new RuntimeException("Failed to register built-in event schema: " @@ -304,15 +318,20 @@ private void registerBuiltinSchemas() { try { aggregateRuntime.registerAndValidate(commandType, schemaRegistry); } catch (IncompatibleSchemaException e) { - logger.warn("Built-in command schema {} is incompatible — attempting force-register", - commandType.typeName(), e); - try { - aggregateRuntime.registerAndValidate(commandType, schemaRegistry, true); - } catch (Exception ex) { - logger.error("Failed to force-register built-in command schema {}", - commandType.typeName(), ex); - throw new RuntimeException("Failed to register built-in command schema: " - + commandType.typeName(), ex); + if (forceRegisterOnIncompatible) { + logger.warn("Built-in command schema {} is incompatible — force-registering (forceRegister=true)", + commandType.typeName(), e); + try { + aggregateRuntime.registerAndValidate(commandType, schemaRegistry, true); + } catch (Exception ex) { + logger.error("Failed to force-register built-in command schema {}", + commandType.typeName(), ex); + throw new RuntimeException("Failed to register built-in command schema: " + + commandType.typeName(), ex); + } + } else { + throw new RuntimeException("Built-in command schema " + commandType.typeName() + + " is incompatible. Set akces.aggregate.schemas.forceRegister=true to override.", e); } } catch (Exception e) { throw new RuntimeException("Failed to register built-in command schema: " @@ -540,6 +559,7 @@ public void setApplicationContext(ApplicationContext ctx) { @Override public void setEnvironment(Environment env) { this.environment = env; + this.forceRegisterOnIncompatible = env.getProperty("akces.aggregate.schemas.forceRegister", Boolean.class, false); } // ------------------------------------------------------------------------- diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AssignTaskCommandHandler.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AssignTaskCommandHandler.java deleted file mode 100644 index 8b7d806d..00000000 --- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AssignTaskCommandHandler.java +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Copyright 2022 - 2026 The Original Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.elasticsoftware.akces.agentic.runtime; - -import com.embabel.agent.core.Agent; -import com.embabel.agent.core.AgentPlatform; -import com.embabel.agent.core.AgentProcess; -import com.embabel.agent.core.ProcessOptions; -import jakarta.annotation.Nonnull; -import org.elasticsoftware.akces.agentic.commands.AssignTaskCommand; -import org.elasticsoftware.akces.agentic.events.AgentTaskAssignedEvent; -import org.elasticsoftware.akces.aggregate.*; -import org.elasticsoftware.akces.events.DomainEvent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.time.Instant; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.stream.Stream; - -import static org.elasticsoftware.akces.agentic.AgenticAggregateRuntime.AGENT_TASK_ASSIGNED_TYPE; -import static org.elasticsoftware.akces.agentic.AgenticAggregateRuntime.ASSIGN_TASK_COMMAND_TYPE; - -/** - * Built-in {@link CommandHandlerFunction} that processes - * {@link AssignTaskCommand} by creating an Embabel {@link AgentProcess} and - * emitting an {@link AgentTaskAssignedEvent}. - * - *

This handler needs runtime access to the Embabel {@link AgentPlatform} and - * {@link Agent}, so it cannot use the simple static method reference pattern used - * by the memory event-sourcing handlers. Instead, it is a dedicated class that - * captures these dependencies at construction time. - * - *

The handler populates the agent's blackboard with the task description, - * requesting party, and current aggregate state, then creates an agent process. - * The process ID from {@link AgentProcess#getId()} is included in the emitted event, - * linking the Akces domain to the Embabel runtime. - * - * @param the aggregate state type; must implement {@link AggregateState} - */ -public class AssignTaskCommandHandler - implements CommandHandlerFunction { - - private static final Logger logger = - LoggerFactory.getLogger(AssignTaskCommandHandler.class); - - private final AgenticAggregate aggregate; - private final AgentPlatform agentPlatform; - private final String aggregateName; - - /** - * Creates a new {@code AssignTaskCommandHandler}. - * - * @param aggregate the owning agentic aggregate instance - * @param agentPlatform the Embabel platform used to create agent processes - * @param aggregateName the name of the aggregate (used for agent resolution) - */ - public AssignTaskCommandHandler( - AgenticAggregate aggregate, - AgentPlatform agentPlatform, - String aggregateName) { - this.aggregate = Objects.requireNonNull(aggregate, "aggregate must not be null"); - this.agentPlatform = Objects.requireNonNull(agentPlatform, "agentPlatform must not be null"); - this.aggregateName = Objects.requireNonNull(aggregateName, "aggregateName must not be null"); - } - - /** - * Processes the {@link AssignTaskCommand} by creating an Embabel {@link AgentProcess} - * and emitting an {@link AgentTaskAssignedEvent}. - * - *

The agent process is created with bindings containing the task description, - * requesting party information, and the current aggregate state. The handler resolves - * the appropriate {@link Agent} by name from the platform. - * - * @param command the assign-task command; never {@code null} - * @param state the current aggregate state - * @return a stream containing a single {@link AgentTaskAssignedEvent} - */ - @Nonnull - @Override - public Stream apply(@Nonnull AssignTaskCommand command, S state) { - logger.debug("Processing AssignTask command for aggregate {}, taskDescription='{}'", - aggregateName, command.taskDescription()); - - Map bindings = new LinkedHashMap<>(); - bindings.put("command", command); - bindings.put("state", state); - bindings.put("agenticAggregateId", command.agenticAggregateId()); - bindings.put("taskDescription", command.taskDescription()); - bindings.put("requestingParty", command.requestingParty()); - if (command.taskMetadata() != null) { - bindings.put("taskMetadata", command.taskMetadata()); - } - - // Resolve the agent for this aggregate - Agent agent = resolveAgent(); - - AgentProcess agentProcess = - agentPlatform.createAgentProcess(agent, ProcessOptions.DEFAULT, bindings); - - String processId = agentProcess.getId(); - - logger.debug("Created AgentProcess with id={} for AssignTask on aggregate {}", - processId, aggregateName); - - AgentTaskAssignedEvent event = new AgentTaskAssignedEvent( - command.agenticAggregateId(), - processId, - command.taskDescription(), - command.requestingParty(), - command.taskMetadata(), - Instant.now()); - - return Stream.of(event); - } - - /** - * {@inheritDoc} - * - *

Always returns {@code false} — built-in commands cannot create aggregate state. - */ - @Override - public boolean isCreate() { - return false; - } - - @Override - public CommandType getCommandType() { - return ASSIGN_TASK_COMMAND_TYPE; - } - - @Override - public Aggregate getAggregate() { - return aggregate; - } - - @Override - public List> getProducedDomainEventTypes() { - @SuppressWarnings("unchecked") - DomainEventType type = (DomainEventType) (DomainEventType) AGENT_TASK_ASSIGNED_TYPE; - return List.of(type); - } - - @Override - public List> getErrorEventTypes() { - return List.of(); - } - - /** - * Resolves the {@link Agent} for the aggregate from the platform's registered agents. - * - *

Looks for an agent matching either the exact aggregate name or the - * {@code {aggregateName}Agent} convention. - * - * @return the resolved {@link Agent}; never {@code null} - * @throws IllegalStateException if no matching agent is found - */ - private Agent resolveAgent() { - String agentBeanName = aggregateName + "Agent"; - for (Agent candidate : agentPlatform.agents()) { - String candidateName = candidate.getName(); - if (aggregateName.equals(candidateName) || agentBeanName.equals(candidateName)) { - return candidate; - } - } - throw new IllegalStateException( - "No Agent found with name '" + aggregateName + "' or '" + agentBeanName - + "' in the AgentPlatform for AssignTask handling. " - + "The implementing application must provide an Agent named '" - + agentBeanName + "'."); - } -} diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntime.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntime.java index 614bbb7c..37fb0dc1 100644 --- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntime.java +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntime.java @@ -17,9 +17,13 @@ package org.elasticsoftware.akces.agentic.runtime; +import com.embabel.agent.core.Agent; import com.embabel.agent.core.AgentPlatform; +import com.embabel.agent.core.AgentProcess; +import com.embabel.agent.core.ProcessOptions; import org.apache.kafka.common.errors.SerializationException; import org.elasticsoftware.akces.agentic.AgenticAggregateRuntime; +import org.elasticsoftware.akces.agentic.commands.AssignTaskCommand; import org.elasticsoftware.akces.agentic.events.AgentTaskAssignedEvent; import org.elasticsoftware.akces.agentic.events.MemoryRevokedEvent; import org.elasticsoftware.akces.agentic.events.MemoryStoredEvent; @@ -33,16 +37,21 @@ import org.elasticsoftware.akces.protocol.ProtocolRecord; import org.elasticsoftware.akces.schemas.SchemaException; import org.elasticsoftware.akces.schemas.SchemaRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import tools.jackson.databind.ObjectMapper; import java.io.IOException; -import java.util.ArrayList; +import java.time.Instant; import java.util.Collection; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Supplier; +import java.util.stream.Stream; /** * Kafka-backed implementation of {@link AgenticAggregateRuntime}. @@ -53,6 +62,9 @@ */ public class KafkaAgenticAggregateRuntime implements AgenticAggregateRuntime { + private static final Logger logger = + LoggerFactory.getLogger(KafkaAgenticAggregateRuntime.class); + private final AggregateRuntime delegate; private final ObjectMapper objectMapper; private final Class stateClass; @@ -224,7 +236,7 @@ public boolean shouldHandlePIIData() { } // ------------------------------------------------------------------------- - // Built-in EventSourcingHandler implementations for memory management + // Built-in EventSourcingHandler implementations // ------------------------------------------------------------------------- /** @@ -278,35 +290,6 @@ public static AggregateState onMemoryRevoked(MemoryRevokedEvent event, Aggregate return (AggregateState) mas.withoutMemory(event.memoryId()); } - /** - * Single-dispatch event-sourcing handler that routes {@link MemoryStoredEvent} and - * {@link MemoryRevokedEvent} to the appropriate typed handler. - * - *

Intended to be used as a method reference - * ({@code KafkaAgenticAggregateRuntime::handleMemoryEvent}) so that no anonymous adapter - * class is required at the registration site. - * - * @param event the memory domain event to apply; must be a {@code MemoryStoredEvent} or - * {@code MemoryRevokedEvent} - * @param state the current aggregate state - * @return the updated aggregate state - * @throws IllegalArgumentException if {@code event} is not a recognised memory event type - */ - public static AggregateState handleMemoryEvent(DomainEvent event, AggregateState state) { - if (event instanceof MemoryStoredEvent stored) { - return onMemoryStored(stored, state); - } else if (event instanceof MemoryRevokedEvent revoked) { - return onMemoryRevoked(revoked, state); - } else { - throw new IllegalArgumentException( - "Unsupported memory event type: " + event.getClass().getName()); - } - } - - // ------------------------------------------------------------------------- - // Built-in EventSourcingHandler for task assignment - // ------------------------------------------------------------------------- - /** * Built-in event-sourcing handler for {@link AgentTaskAssignedEvent}. * @@ -336,24 +319,127 @@ public static AggregateState onAgentTaskAssigned(AgentTaskAssignedEvent event, A } /** - * Single-dispatch event-sourcing handler that routes {@link AgentTaskAssignedEvent} - * to the appropriate typed handler. + * Single-dispatch event-sourcing handler for all built-in agentic domain events. + * + *

Routes {@link MemoryStoredEvent}, {@link MemoryRevokedEvent}, and + * {@link AgentTaskAssignedEvent} to the appropriate typed handler. * *

Intended to be used as a method reference - * ({@code KafkaAgenticAggregateRuntime::handleTaskEvent}) so that no anonymous adapter + * ({@code KafkaAgenticAggregateRuntime::handleBuiltInEvent}) so that no anonymous adapter * class is required at the registration site. * - * @param event the task domain event to apply; must be an {@code AgentTaskAssignedEvent} + * @param event the built-in domain event to apply * @param state the current aggregate state * @return the updated aggregate state - * @throws IllegalArgumentException if {@code event} is not a recognised task event type + * @throws IllegalArgumentException if {@code event} is not a recognised built-in event type */ - public static AggregateState handleTaskEvent(DomainEvent event, AggregateState state) { - if (event instanceof AgentTaskAssignedEvent assigned) { + public static AggregateState handleBuiltInEvent(DomainEvent event, AggregateState state) { + if (event instanceof MemoryStoredEvent stored) { + return onMemoryStored(stored, state); + } else if (event instanceof MemoryRevokedEvent revoked) { + return onMemoryRevoked(revoked, state); + } else if (event instanceof AgentTaskAssignedEvent assigned) { return onAgentTaskAssigned(assigned, state); } else { throw new IllegalArgumentException( - "Unsupported task event type: " + event.getClass().getName()); + "Unsupported built-in event type: " + event.getClass().getName()); + } + } + + // ------------------------------------------------------------------------- + // Built-in CommandHandler for task assignment + // ------------------------------------------------------------------------- + + /** + * Creates a {@link CommandHandlerFunction} for built-in agentic commands. + * + *

The returned function handles {@link AssignTaskCommand} by resolving the appropriate + * {@link Agent} from the {@link AgentPlatform}, creating an {@link AgentProcess}, + * and emitting an {@link AgentTaskAssignedEvent} with the process ID. + * + *

This is a factory method rather than a static method reference because it needs + * runtime access to the {@link AgentPlatform} and aggregate name for agent resolution. + * + * @param agentPlatform the Embabel platform used to create agent processes + * @param aggregateName the name of the aggregate (used for agent resolution) + * @return a command handler function for built-in agentic commands + */ + @SuppressWarnings("unchecked") + public static CommandHandlerFunction builtInCommandHandler( + AgentPlatform agentPlatform, String aggregateName) { + return (command, state) -> { + if (command instanceof AssignTaskCommand assignTask) { + return (Stream) (Stream) handleAssignTask(assignTask, state, agentPlatform, aggregateName); + } else { + throw new IllegalArgumentException( + "Unsupported built-in command type: " + command.getClass().getName()); + } + }; + } + + /** + * Handles the {@link AssignTaskCommand} by resolving the agent, creating an + * {@link AgentProcess}, and emitting an {@link AgentTaskAssignedEvent}. + */ + private static Stream handleAssignTask(AssignTaskCommand command, AggregateState state, + AgentPlatform agentPlatform, String aggregateName) { + logger.debug("Processing AssignTask command for aggregate {}, taskDescription='{}'", + aggregateName, command.taskDescription()); + + Map bindings = new LinkedHashMap<>(); + bindings.put("command", command); + bindings.put("state", state); + bindings.put("agenticAggregateId", command.agenticAggregateId()); + bindings.put("taskDescription", command.taskDescription()); + bindings.put("requestingParty", command.requestingParty()); + if (command.taskMetadata() != null) { + bindings.put("taskMetadata", command.taskMetadata()); + } + + Agent agent = resolveAgentByName(agentPlatform, aggregateName); + + AgentProcess agentProcess = + agentPlatform.createAgentProcess(agent, ProcessOptions.DEFAULT, bindings); + + String processId = agentProcess.getId(); + + logger.debug("Created AgentProcess with id={} for AssignTask on aggregate {}", + processId, aggregateName); + + AgentTaskAssignedEvent event = new AgentTaskAssignedEvent( + command.agenticAggregateId(), + processId, + command.taskDescription(), + command.requestingParty(), + command.taskMetadata(), + Instant.now()); + + return Stream.of(event); + } + + /** + * Resolves the {@link Agent} for the aggregate from the platform's registered agents. + * + *

Looks for an agent matching either the exact aggregate name or the + * {@code {aggregateName}Agent} convention. + * + * @param agentPlatform the platform containing registered agents + * @param aggregateName the aggregate name to resolve an agent for + * @return the resolved {@link Agent}; never {@code null} + * @throws IllegalStateException if no matching agent is found + */ + private static Agent resolveAgentByName(AgentPlatform agentPlatform, String aggregateName) { + String agentBeanName = aggregateName + "Agent"; + for (Agent candidate : agentPlatform.agents()) { + String candidateName = candidate.getName(); + if (aggregateName.equals(candidateName) || agentBeanName.equals(candidateName)) { + return candidate; + } } + throw new IllegalStateException( + "No Agent found with name '" + aggregateName + "' or '" + agentBeanName + + "' in the AgentPlatform for built-in command handling. " + + "The implementing application must provide an Agent named '" + + agentBeanName + "'."); } } diff --git a/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/AssignTaskCommandHandlerTest.java b/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/AssignTaskCommandHandlerTest.java index 106299fc..44de1cfd 100644 --- a/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/AssignTaskCommandHandlerTest.java +++ b/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/AssignTaskCommandHandlerTest.java @@ -24,6 +24,7 @@ import org.elasticsoftware.akces.agentic.commands.AssignTaskCommand; import org.elasticsoftware.akces.agentic.events.AgentTaskAssignedEvent; import org.elasticsoftware.akces.aggregate.*; +import org.elasticsoftware.akces.commands.Command; import org.elasticsoftware.akces.events.DomainEvent; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -42,9 +43,9 @@ import static org.mockito.Mockito.*; /** - * Unit tests for {@link AssignTaskCommandHandler}, verifying that it creates an - * Embabel {@link AgentProcess} and emits an {@link AgentTaskAssignedEvent} with - * the correct process ID. + * Unit tests for {@link KafkaAgenticAggregateRuntime#builtInCommandHandler(AgentPlatform, String)}, + * verifying that it creates an Embabel {@link AgentProcess} and emits an + * {@link AgentTaskAssignedEvent} with the correct process ID. */ @ExtendWith(MockitoExtension.class) class AssignTaskCommandHandlerTest { @@ -57,14 +58,6 @@ public String getAggregateId() { } } - /** Test AgenticAggregate. */ - static class TestAggregate implements AgenticAggregate { - @Override - public Class getStateClass() { - return TestState.class; - } - } - @Mock private AgentPlatform agentPlatform; @@ -74,11 +67,11 @@ public Class getStateClass() { @Mock private Agent agent; - private AssignTaskCommandHandler handler; + private CommandHandlerFunction handler; @BeforeEach void setUp() { - handler = new AssignTaskCommandHandler<>(new TestAggregate(), agentPlatform, "TestAggregate"); + handler = KafkaAgenticAggregateRuntime.builtInCommandHandler(agentPlatform, "TestAggregate"); } private void setUpAgentResolution() { @@ -89,7 +82,7 @@ private void setUpAgentResolution() { @Test void applyShouldCreateAgentProcessAndEmitEvent() { setUpAgentResolution(); - var party = new HumanRequestingParty("user-1", "Alice", "analyst"); + var party = new HumanRequestingParty("user-1", "analyst"); var command = new AssignTaskCommand("agg-1", "Analyze data", party, Map.of("key", "value")); var state = new TestState("agg-1"); @@ -136,7 +129,7 @@ void applyShouldPropagateAgentRequestingParty() { @Test void applyShouldHandleNullMetadata() { setUpAgentResolution(); - var party = new HumanRequestingParty("user-1", "Bob", "admin"); + var party = new HumanRequestingParty("user-1", "admin"); var command = new AssignTaskCommand("agg-1", "Simple task", party, null); var state = new TestState("agg-1"); @@ -150,61 +143,31 @@ void applyShouldHandleNullMetadata() { assertThat(event.taskMetadata()).isNull(); } - @Test - void isCreateShouldReturnFalse() { - assertThat(handler.isCreate()).isFalse(); - } - - @Test - void getCommandTypeShouldReturnAssignTaskType() { - assertThat(handler.getCommandType().typeName()).isEqualTo("AssignTask"); - assertThat(handler.getCommandType().version()).isEqualTo(1); - } - - @Test - void getProducedDomainEventTypesShouldContainAgentTaskAssigned() { - assertThat(handler.getProducedDomainEventTypes()).hasSize(1); - assertThat(handler.getProducedDomainEventTypes().getFirst().typeName()) - .isEqualTo("AgentTaskAssigned"); - } - - @Test - void getErrorEventTypesShouldBeEmpty() { - assertThat(handler.getErrorEventTypes()).isEmpty(); - } - - @Test - void constructorShouldRejectNullAggregate() { - assertThatThrownBy(() -> new AssignTaskCommandHandler<>(null, agentPlatform, "TestAggregate")) - .isInstanceOf(NullPointerException.class) - .hasMessageContaining("aggregate"); - } - - @Test - void constructorShouldRejectNullAgentPlatform() { - assertThatThrownBy(() -> new AssignTaskCommandHandler<>(new TestAggregate(), null, "TestAggregate")) - .isInstanceOf(NullPointerException.class) - .hasMessageContaining("agentPlatform"); - } - - @Test - void constructorShouldRejectNullAggregateName() { - assertThatThrownBy(() -> new AssignTaskCommandHandler<>(new TestAggregate(), agentPlatform, null)) - .isInstanceOf(NullPointerException.class) - .hasMessageContaining("aggregateName"); - } - @Test void applyShouldThrowWhenNoAgentFound() { - // Agent platform has no matching agents when(agentPlatform.agents()).thenReturn(List.of()); - var noAgentHandler = new AssignTaskCommandHandler<>(new TestAggregate(), agentPlatform, "Unknown"); - var party = new HumanRequestingParty("user-1", "Alice", "analyst"); + var noAgentHandler = KafkaAgenticAggregateRuntime.builtInCommandHandler(agentPlatform, "Unknown"); + var party = new HumanRequestingParty("user-1", "analyst"); var command = new AssignTaskCommand("agg-1", "task", party, null); var state = new TestState("agg-1"); - assertThatThrownBy(() -> noAgentHandler.apply(command, state)) + assertThatThrownBy(() -> noAgentHandler.apply(command, state).toList()) .isInstanceOf(IllegalStateException.class) .hasMessageContaining("No Agent found"); } + + @Test + void applyShouldThrowForUnknownCommandType() { + Command unknownCommand = new Command() { + @Override + public String getAggregateId() { + return "agg-1"; + } + }; + var state = new TestState("agg-1"); + + assertThatThrownBy(() -> handler.apply(unknownCommand, state).toList()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Unsupported built-in command type"); + } } diff --git a/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/MemoryEventSourcingTest.java b/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/MemoryEventSourcingTest.java index 371b3a5b..20777a1d 100644 --- a/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/MemoryEventSourcingTest.java +++ b/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/MemoryEventSourcingTest.java @@ -38,7 +38,7 @@ * {@link MemoryRevokedEvent} sequences. * *

Per framework testing guidelines, the event-sourcing handler methods - * ({@code onMemoryStored}, {@code onMemoryRevoked}, {@code handleMemoryEvent}) are + * ({@code onMemoryStored}, {@code onMemoryRevoked}, {@code handleBuiltInEvent}) are * tested through their actual invocations rather than testing the event records directly. */ class MemoryEventSourcingTest { @@ -150,35 +150,35 @@ void onMemoryRevokedShouldThrowWhenStateIsNotMemoryAware() { } // ------------------------------------------------------------------------- - // handleMemoryEvent dispatch tests + // handleMemoryEvent dispatch tests (via handleBuiltInEvent) // ------------------------------------------------------------------------- @Test - void handleMemoryEventShouldDispatchMemoryStoredEvent() { + void handleBuiltInEventShouldDispatchMemoryStoredEvent() { Instant now = Instant.now(); var event = new MemoryStoredEvent("agg-1", "mem-1", "sub", "fact", "cite", "reason", now); var state = new TestMemoryState("agg-1", List.of()); - AggregateState result = KafkaAgenticAggregateRuntime.handleMemoryEvent(event, state); + AggregateState result = KafkaAgenticAggregateRuntime.handleBuiltInEvent(event, state); assertThat(result).isInstanceOf(TestMemoryState.class); assertThat(((TestMemoryState) result).getMemories()).hasSize(1); } @Test - void handleMemoryEventShouldDispatchMemoryRevokedEvent() { + void handleBuiltInEventShouldDispatchMemoryRevokedEvent() { var m1 = new AgenticAggregateMemory("m1", "s", "f", "c", "r", Instant.now()); var state = new TestMemoryState("agg-1", List.of(m1)); var event = new MemoryRevokedEvent("agg-1", "m1", "evicted", Instant.now()); - AggregateState result = KafkaAgenticAggregateRuntime.handleMemoryEvent(event, state); + AggregateState result = KafkaAgenticAggregateRuntime.handleBuiltInEvent(event, state); assertThat(result).isInstanceOf(TestMemoryState.class); assertThat(((TestMemoryState) result).getMemories()).isEmpty(); } @Test - void handleMemoryEventShouldThrowForUnknownEventType() { + void handleBuiltInEventShouldThrowForUnknownEventType() { var unknownEvent = new DomainEvent() { @Override public String getAggregateId() { @@ -187,9 +187,9 @@ public String getAggregateId() { }; var state = new TestMemoryState("agg-1", List.of()); - assertThatThrownBy(() -> KafkaAgenticAggregateRuntime.handleMemoryEvent(unknownEvent, state)) + assertThatThrownBy(() -> KafkaAgenticAggregateRuntime.handleBuiltInEvent(unknownEvent, state)) .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("Unsupported memory event type"); + .hasMessageContaining("Unsupported built-in event type"); } // ------------------------------------------------------------------------- @@ -215,7 +215,7 @@ void shouldReconstructStateFromEventSequence() { // Replay events to reconstruct state for (DomainEvent event : events) { - state = KafkaAgenticAggregateRuntime.handleMemoryEvent(event, state); + state = KafkaAgenticAggregateRuntime.handleBuiltInEvent(event, state); } var finalState = (TestMemoryState) state; @@ -229,9 +229,9 @@ void shouldHandleStoreAndRevokeOfSameMemoryInSequence() { AggregateState state = new TestMemoryState("agg-1", List.of()); // Store then immediately revoke - state = KafkaAgenticAggregateRuntime.handleMemoryEvent( + state = KafkaAgenticAggregateRuntime.handleBuiltInEvent( new MemoryStoredEvent("agg-1", "m1", "s", "f", "c", "r", Instant.now()), state); - state = KafkaAgenticAggregateRuntime.handleMemoryEvent( + state = KafkaAgenticAggregateRuntime.handleBuiltInEvent( new MemoryRevokedEvent("agg-1", "m1", "undo", Instant.now()), state); assertThat(((TestMemoryState) state).getMemories()).isEmpty(); @@ -242,7 +242,7 @@ void shouldPreserveInsertionOrderAfterMultipleStoredEvents() { AggregateState state = new TestMemoryState("agg-1", List.of()); for (int i = 1; i <= 5; i++) { - state = KafkaAgenticAggregateRuntime.handleMemoryEvent( + state = KafkaAgenticAggregateRuntime.handleBuiltInEvent( new MemoryStoredEvent("agg-1", "m" + i, "s" + i, "f" + i, "c" + i, "r" + i, Instant.now().plusSeconds(i)), state); diff --git a/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/TaskEventSourcingTest.java b/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/TaskEventSourcingTest.java index f43c6fe5..e9d8eb12 100644 --- a/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/TaskEventSourcingTest.java +++ b/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/TaskEventSourcingTest.java @@ -86,7 +86,7 @@ public String getAggregateId() { @Test void onAgentTaskAssignedShouldAppendTaskToState() { Instant now = Instant.parse("2026-04-10T12:00:00Z"); - var party = new HumanRequestingParty("user-1", "Alice", "analyst"); + var party = new HumanRequestingParty("user-1", "analyst"); var metadata = Map.of("correlationId", "corr-123"); var event = new AgentTaskAssignedEvent("agg-1", "proc-1", "Analyze data", party, metadata, now); @@ -108,7 +108,7 @@ void onAgentTaskAssignedShouldAppendTaskToState() { @Test void onAgentTaskAssignedShouldThrowWhenStateIsNotTaskAware() { var event = new AgentTaskAssignedEvent("agg-1", "proc-1", "task", - new HumanRequestingParty("u", "n", "r"), null, Instant.now()); + new HumanRequestingParty("u", "r"), null, Instant.now()); var plainState = new PlainState("agg-1"); assertThatThrownBy(() -> KafkaAgenticAggregateRuntime.onAgentTaskAssigned(event, plainState)) @@ -136,24 +136,24 @@ void onAgentTaskAssignedShouldPreserveExistingTasks() { } // ------------------------------------------------------------------------- - // handleTaskEvent dispatch tests + // handleBuiltInEvent dispatch tests (AgentTaskAssigned path) // ------------------------------------------------------------------------- @Test - void handleTaskEventShouldDispatchAgentTaskAssignedEvent() { + void handleBuiltInEventShouldDispatchAgentTaskAssignedEvent() { Instant now = Instant.now(); - var party = new HumanRequestingParty("user-1", "Alice", "analyst"); + var party = new HumanRequestingParty("user-1", "analyst"); var event = new AgentTaskAssignedEvent("agg-1", "proc-1", "task", party, null, now); var state = new TestTaskState("agg-1", List.of()); - AggregateState result = KafkaAgenticAggregateRuntime.handleTaskEvent(event, state); + AggregateState result = KafkaAgenticAggregateRuntime.handleBuiltInEvent(event, state); assertThat(result).isInstanceOf(TestTaskState.class); assertThat(((TestTaskState) result).getAssignedTasks()).hasSize(1); } @Test - void handleTaskEventShouldThrowForUnknownEventType() { + void handleBuiltInEventShouldThrowForUnknownEventType() { var unknownEvent = new DomainEvent() { @Override public String getAggregateId() { @@ -162,9 +162,9 @@ public String getAggregateId() { }; var state = new TestTaskState("agg-1", List.of()); - assertThatThrownBy(() -> KafkaAgenticAggregateRuntime.handleTaskEvent(unknownEvent, state)) + assertThatThrownBy(() -> KafkaAgenticAggregateRuntime.handleBuiltInEvent(unknownEvent, state)) .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("Unsupported task event type"); + .hasMessageContaining("Unsupported built-in event type"); } // ------------------------------------------------------------------------- @@ -174,12 +174,12 @@ public String getAggregateId() { @Test void shouldReconstructStateFromMultipleTaskAssignedEvents() { AggregateState state = new TestTaskState("agg-1", List.of()); - var party = new HumanRequestingParty("user-1", "Alice", "analyst"); + var party = new HumanRequestingParty("user-1", "analyst"); for (int i = 1; i <= 3; i++) { var event = new AgentTaskAssignedEvent("agg-1", "proc-" + i, "Task " + i, party, null, Instant.now().plusSeconds(i)); - state = KafkaAgenticAggregateRuntime.handleTaskEvent(event, state); + state = KafkaAgenticAggregateRuntime.handleBuiltInEvent(event, state); } var finalState = (TestTaskState) state; diff --git a/main/api/src/main/java/org/elasticsoftware/akces/aggregate/HumanRequestingParty.java b/main/api/src/main/java/org/elasticsoftware/akces/aggregate/HumanRequestingParty.java index d0fbf5ec..e55a065e 100644 --- a/main/api/src/main/java/org/elasticsoftware/akces/aggregate/HumanRequestingParty.java +++ b/main/api/src/main/java/org/elasticsoftware/akces/aggregate/HumanRequestingParty.java @@ -21,14 +21,15 @@ * A {@link RequestingParty} implementation representing a human user that requested * a task assignment. * - * @param userId the unique identifier of the human user - * @param displayName the display name of the human user - * @param role the role of the human user in the system - * (e.g. "administrator", "analyst", "end-user") + *

For GDPR reasons, this record does not carry any personally identifiable + * information beyond a system-level user identifier and role. + * + * @param userId the unique identifier of the human user + * @param role the role of the human user in the system + * (e.g. "administrator", "analyst", "end-user") */ public record HumanRequestingParty( String userId, - String displayName, String role ) implements RequestingParty { } diff --git a/main/api/src/test/java/org/elasticsoftware/akces/aggregate/RequestingPartySerializationTest.java b/main/api/src/test/java/org/elasticsoftware/akces/aggregate/RequestingPartySerializationTest.java index 8319cf4c..4db0a372 100644 --- a/main/api/src/test/java/org/elasticsoftware/akces/aggregate/RequestingPartySerializationTest.java +++ b/main/api/src/test/java/org/elasticsoftware/akces/aggregate/RequestingPartySerializationTest.java @@ -61,16 +61,15 @@ void agentRequestingPartyShouldImplementRequestingParty() { @Test void humanRequestingPartyShouldPopulateAllFields() { - var human = new HumanRequestingParty("user-1", "John Doe", "administrator"); + var human = new HumanRequestingParty("user-1", "administrator"); assertThat(human.userId()).isEqualTo("user-1"); - assertThat(human.displayName()).isEqualTo("John Doe"); assertThat(human.role()).isEqualTo("administrator"); } @Test void humanRequestingPartyShouldImplementRequestingParty() { - var human = new HumanRequestingParty("user-1", "John Doe", "administrator"); + var human = new HumanRequestingParty("user-1", "administrator"); assertThat(human).isInstanceOf(RequestingParty.class); assertThat(((RequestingParty) human).role()).isEqualTo("administrator"); @@ -91,8 +90,8 @@ void agentRequestingPartyEqualityShouldHold() { @Test void humanRequestingPartyEqualityShouldHold() { - var a = new HumanRequestingParty("id-1", "Name", "role"); - var b = new HumanRequestingParty("id-1", "Name", "role"); + var a = new HumanRequestingParty("id-1", "role"); + var b = new HumanRequestingParty("id-1", "role"); assertThat(a).isEqualTo(b); assertThat(a.hashCode()).isEqualTo(b.hashCode()); @@ -101,7 +100,7 @@ void humanRequestingPartyEqualityShouldHold() { @Test void agentAndHumanShouldNotBeEqual() { var agent = new AgentRequestingParty("id-1", "Name", "role"); - var human = new HumanRequestingParty("id-1", "Name", "role"); + var human = new HumanRequestingParty("id-1", "role"); assertThat(agent).isNotEqualTo(human); } @@ -116,8 +115,8 @@ void agentRequestingPartyInequalityWhenIdDiffers() { @Test void humanRequestingPartyInequalityWhenIdDiffers() { - var a = new HumanRequestingParty("id-1", "Name", "role"); - var b = new HumanRequestingParty("id-2", "Name", "role"); + var a = new HumanRequestingParty("id-1", "role"); + var b = new HumanRequestingParty("id-2", "role"); assertThat(a).isNotEqualTo(b); } @@ -163,16 +162,16 @@ void requestingPartyShouldBeSealed() { @Test void patternMatchingShouldWorkWithSwitch() { RequestingParty agent = new AgentRequestingParty("a-1", "TestAgent", "supervisor"); - RequestingParty human = new HumanRequestingParty("u-1", "TestUser", "analyst"); + RequestingParty human = new HumanRequestingParty("u-1", "analyst"); assertThat(describe(agent)).isEqualTo("Agent: TestAgent (supervisor)"); - assertThat(describe(human)).isEqualTo("Human: TestUser (analyst)"); + assertThat(describe(human)).isEqualTo("Human: u-1 (analyst)"); } private static String describe(RequestingParty party) { return switch (party) { case AgentRequestingParty a -> "Agent: " + a.agentName() + " (" + a.role() + ")"; - case HumanRequestingParty h -> "Human: " + h.displayName() + " (" + h.role() + ")"; + case HumanRequestingParty h -> "Human: " + h.userId() + " (" + h.role() + ")"; }; } } diff --git a/main/api/src/test/java/org/elasticsoftware/akces/aggregate/TaskAwareStateTest.java b/main/api/src/test/java/org/elasticsoftware/akces/aggregate/TaskAwareStateTest.java index 36c1f266..b60ea053 100644 --- a/main/api/src/test/java/org/elasticsoftware/akces/aggregate/TaskAwareStateTest.java +++ b/main/api/src/test/java/org/elasticsoftware/akces/aggregate/TaskAwareStateTest.java @@ -72,7 +72,7 @@ public TaskAwareState withoutAssignedTask(String agentProcessId) { @Test void assignedTaskShouldPopulateAllFields() { Instant now = Instant.now(); - var party = new HumanRequestingParty("user-1", "Alice", "analyst"); + var party = new HumanRequestingParty("user-1", "analyst"); var metadata = Map.of("correlationId", "corr-123"); var task = new AssignedTask("proc-1", "Analyze data", party, metadata, now); @@ -98,7 +98,7 @@ void assignedTaskEqualityShouldHold() { @Test void assignedTaskInequalityWhenProcessIdDiffers() { Instant now = Instant.now(); - var party = new HumanRequestingParty("user-1", "Alice", "analyst"); + var party = new HumanRequestingParty("user-1", "analyst"); var a = new AssignedTask("proc-1", "task", party, null, now); var b = new AssignedTask("proc-2", "task", party, null, now); @@ -108,7 +108,7 @@ void assignedTaskInequalityWhenProcessIdDiffers() { @Test void assignedTaskShouldAllowNullMetadata() { Instant now = Instant.now(); - var party = new HumanRequestingParty("user-1", "Alice", "analyst"); + var party = new HumanRequestingParty("user-1", "analyst"); var task = new AssignedTask("proc-1", "task", party, null, now); @@ -129,7 +129,7 @@ void emptyStateShouldReturnEmptyTaskList() { @Test void withAssignedTaskShouldAppendToEnd() { Instant now = Instant.now(); - var party = new HumanRequestingParty("user-1", "Alice", "analyst"); + var party = new HumanRequestingParty("user-1", "analyst"); var task1 = new AssignedTask("proc-1", "Task 1", party, null, now); var task2 = new AssignedTask("proc-2", "Task 2", party, null, now.plusSeconds(1)); @@ -161,7 +161,7 @@ void withoutAssignedTaskShouldRemoveByProcessId() { @Test void withoutNonExistentProcessIdShouldReturnEquivalentState() { Instant now = Instant.now(); - var party = new HumanRequestingParty("user-1", "Alice", "analyst"); + var party = new HumanRequestingParty("user-1", "analyst"); var task = new AssignedTask("proc-1", "Task 1", party, null, now); TaskAwareState state = new TestTaskState("agg-1", List.of(task)); @@ -173,7 +173,7 @@ void withoutNonExistentProcessIdShouldReturnEquivalentState() { @Test void taskRoundTripThroughTaskAwareState() { Instant now = Instant.now(); - var party = new HumanRequestingParty("user-42", "Bob", "manager"); + var party = new HumanRequestingParty("user-42", "manager"); var metadata = Map.of("priority", "high", "deadline", "2026-05-01"); var task = new AssignedTask("proc-42", "Urgent analysis", party, metadata, now); From 8e263e4cd4953ac95d830ffad4885a6623a81fd5 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 10 Apr 2026 12:56:09 +0000 Subject: [PATCH 5/5] Remove unnecessary unchecked cast in builtInCommandHandler Agent-Logs-Url: https://github.com/elasticsoftwarefoundation/akces-framework/sessions/83c4f48e-f510-429f-8c85-0f14c17d75d4 Co-authored-by: jwijgerd <914840+jwijgerd@users.noreply.github.com> --- .../akces/agentic/runtime/KafkaAgenticAggregateRuntime.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntime.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntime.java index 37fb0dc1..878cb946 100644 --- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntime.java +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntime.java @@ -364,12 +364,11 @@ public static AggregateState handleBuiltInEvent(DomainEvent event, AggregateStat * @param aggregateName the name of the aggregate (used for agent resolution) * @return a command handler function for built-in agentic commands */ - @SuppressWarnings("unchecked") public static CommandHandlerFunction builtInCommandHandler( AgentPlatform agentPlatform, String aggregateName) { return (command, state) -> { if (command instanceof AssignTaskCommand assignTask) { - return (Stream) (Stream) handleAssignTask(assignTask, state, agentPlatform, aggregateName); + return handleAssignTask(assignTask, state, agentPlatform, aggregateName); } else { throw new IllegalArgumentException( "Unsupported built-in command type: " + command.getClass().getName());