Conversation
…adata foundation
- Extend @AgenticAggregateInfo with agentHandledCommands, agentHandledEvents, agentProducedErrors
- Add AgentPlatform getAgentPlatform() to AgenticAggregateRuntime interface
- Store/expose AgentPlatform in KafkaAgenticAggregateRuntime (new constructor param)
- Create AgentProcessResultTranslator utility (drain DomainEvents from Blackboard)
- Create AgenticCommandHandlerFunctionAdapter (CommandHandlerFunction via Embabel Agent)
- Create AgenticEventHandlerFunctionAdapter (EventHandlerFunction via Embabel Agent)
- Update AgenticAggregateRuntimeFactory: resolve AgentPlatform (fail-fast), resolve Agent
by '{aggregateName}Agent' bean convention, register adapters for agentHandledCommands/Events,
register DomainEventType(error=true) for agentProducedErrors
- Update KafkaAgenticAggregateRuntimeTest for new constructor + add getAgentPlatform test
- Add 9 new annotation tests for the 3 new @AgenticAggregateInfo properties
Agent-Logs-Url: https://github.com/elasticsoftwarefoundation/akces-framework/sessions/88dfa45b-d88a-42e2-9027-be4b23fea52d
Co-authored-by: jwijgerd <914840+jwijgerd@users.noreply.github.com>
…ervicesSupplier, clarify tick loop intent Agent-Logs-Url: https://github.com/elasticsoftwarefoundation/akces-framework/sessions/88dfa45b-d88a-42e2-9027-be4b23fea52d Co-authored-by: jwijgerd <914840+jwijgerd@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Introduces Phase 1 Embabel integration for agentic aggregates by injecting AgentPlatform into the agentic runtime and adding adapter infrastructure to route selected commands/events through Embabel agent processes.
Changes:
- Extend
@AgenticAggregateInfowith metadata for agent-handled commands/events and agent-produced errors, plus tests for default/configured values. - Inject and expose
AgentPlatformviaAgenticAggregateRuntime/KafkaAgenticAggregateRuntime, and resolve it inAgenticAggregateRuntimeFactory. - Add Embabel adapter implementations (
AgenticCommandHandlerFunctionAdapter,AgenticEventHandlerFunctionAdapter) and a result translator (AgentProcessResultTranslator) to run agent processes and collect emittedDomainEvents.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| main/api/src/test/java/org/elasticsoftware/akces/aggregate/AgenticAggregateInfoTest.java | Adds test fixtures/assertions for the new @AgenticAggregateInfo array properties. |
| main/api/src/main/java/org/elasticsoftware/akces/annotations/AgenticAggregateInfo.java | Extends the annotation with agentHandledCommands, agentHandledEvents, agentProducedErrors metadata + Javadoc. |
| main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntimeTest.java | Updates unit tests for new constructor arg and getAgentPlatform() exposure. |
| main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntime.java | Stores/exposes AgentPlatform on the runtime wrapper. |
| main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgentProcessResultTranslator.java | New utility to drain DomainEvents from Embabel blackboard and hide them. |
| main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java | New adapter to run Embabel agent process for external event handling. |
| main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java | New adapter to run Embabel agent process for command handling. |
| main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java | Registers agent-handled command/event adapters and resolves AgentPlatform/Agent beans. |
| main/agentic/src/main/java/org/elasticsoftware/akces/agentic/AgenticAggregateRuntime.java | Extends the runtime interface with getAgentPlatform(). |
...ain/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java
Outdated
Show resolved
Hide resolved
.../main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java
Outdated
Show resolved
Hide resolved
main/api/src/main/java/org/elasticsoftware/akces/annotations/AgenticAggregateInfo.java
Outdated
Show resolved
Hide resolved
| * <p>Undeclared {@link ErrorEvent} instances are accepted at runtime. A warning is | ||
| * logged so operators can detect unexpected error types without causing the transaction | ||
| * to abort. |
There was a problem hiding this comment.
The class-level Javadoc says undeclared ErrorEvent instances are accepted without aborting the transaction, but the Akces runtime requires emitted events to have a registered DomainEventType (otherwise processing will fail with null dereferences). If the intent is to accept unknown error types, the runtime/registration logic needs to handle that; otherwise this Javadoc should be corrected to avoid promising behavior that isn't supported.
There was a problem hiding this comment.
ensure that the runtime handles the case of unknown ErrorEvent types
There was a problem hiding this comment.
Done in commit 68f371b. AgentProcessResultTranslator.collectEvents() now accepts the set of registered DomainEventTypes and excludes any ErrorEvent whose class isn't registered — logging a WARN and hiding the event from the blackboard so it won't be collected again. This prevents the NPE in KafkaAggregateRuntime.processDomainEvent() that would occur when getDomainEventType() returns null. Eight new tests in AgentProcessResultTranslatorTest cover both the inclusion and exclusion paths.
...ic/src/main/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntime.java
Show resolved
Hide resolved
...ic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java
Outdated
Show resolved
Hide resolved
...ain/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java
Outdated
Show resolved
Hide resolved
.../main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java
Outdated
Show resolved
Hide resolved
...ic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java
Show resolved
Hide resolved
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
| * <p>Undeclared {@link ErrorEvent} instances are accepted at runtime. A warning is | ||
| * logged so operators can detect unexpected error types without causing the transaction | ||
| * to abort. |
There was a problem hiding this comment.
ensure that the runtime handles the case of unknown ErrorEvent types
...ic/src/main/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntime.java
Show resolved
Hide resolved
…rorEvent handling, MemoryAwareState guard - KafkaAgenticAggregateRuntime: add Objects.requireNonNull for all 4 constructor params - AgentProcessResultTranslator: accept registeredEventTypes param; exclude unknown ErrorEvents with WARN log instead of passing them to processDomainEvent() where they would NPE - Both adapter apply() methods: use instanceof MemoryAwareState guard instead of requiring S extends MemoryAwareState bound; pass getAllRegisteredEventTypes() to translator - Fix @AgenticAggregateInfo.agentProducedErrors() Javadoc to accurately describe exclusion - Add AgentProcessResultTranslatorTest (8 tests) covering all event/error scenarios - Add 4 constructor null-check tests to KafkaAgenticAggregateRuntimeTest Agent-Logs-Url: https://github.com/elasticsoftwarefoundation/akces-framework/sessions/d56d40b0-ba4b-4381-8b08-568d5b4098e2 Co-authored-by: jwijgerd <914840+jwijgerd@users.noreply.github.com>
| // Timeout configuration and incremental tick support are deferred to a later phase | ||
| // (see plans/agenttasks.md). The transaction timeout is controlled externally via | ||
| // the 'akces.agentic.transaction-timeout-ms' property on the producer factory. | ||
| while (!agentProcess.getFinished()) { |
There was a problem hiding this comment.
implement the same defensice limit as in the AgenticEventHandlerFunctionAdapter
There was a problem hiding this comment.
Done in commit 77c35e5. AgenticCommandHandlerFunctionAdapter.apply() now uses the same 10,000-tick / 30-second deadline loop as the event handler adapter, and throws IllegalStateException with a structured error log if the process doesn't finish within those limits.
.../main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java
Outdated
Show resolved
Hide resolved
...ic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java
Outdated
Show resolved
Hide resolved
| // Tick to completion. Phase 1 runs the full agent process synchronously. | ||
| // Timeout configuration and incremental tick support are deferred to a later phase | ||
| // (see plans/agenttasks.md). The transaction timeout is controlled externally via | ||
| // the 'akces.agentic.transaction-timeout-ms' property on the producer factory. | ||
| while (!agentProcess.getFinished()) { | ||
| agentProcess.tick(); |
There was a problem hiding this comment.
while (!agentProcess.getFinished()) { agentProcess.tick(); } has no timeout / max-tick guard, so a stuck agent can block command handling indefinitely (tying up Kafka consumer/transaction threads). Please add the same kind of safety limits used in AgenticEventHandlerFunctionAdapter (max ticks + wall-clock timeout and/or interruption handling), and surface a deterministic error outcome when limits are exceeded.
| // Tick to completion. Phase 1 runs the full agent process synchronously. | |
| // Timeout configuration and incremental tick support are deferred to a later phase | |
| // (see plans/agenttasks.md). The transaction timeout is controlled externally via | |
| // the 'akces.agentic.transaction-timeout-ms' property on the producer factory. | |
| while (!agentProcess.getFinished()) { | |
| agentProcess.tick(); | |
| final int maxTicks = 10_000; | |
| final long timeoutNanos = 30_000_000_000L; | |
| final long startNanos = System.nanoTime(); | |
| int ticks = 0; | |
| while (!agentProcess.getFinished()) { | |
| if (Thread.currentThread().isInterrupted()) { | |
| String message = "Agent process interrupted while handling command %s for aggregate %s" | |
| .formatted(commandType.typeName(), aggregate.getClass().getSimpleName()); | |
| logger.warn(message); | |
| throw new IllegalStateException(message); | |
| } | |
| if (ticks >= maxTicks) { | |
| String message = "Agent process exceeded max ticks (%d) while handling command %s for aggregate %s" | |
| .formatted(maxTicks, commandType.typeName(), aggregate.getClass().getSimpleName()); | |
| logger.warn(message); | |
| throw new IllegalStateException(message); | |
| } | |
| if (System.nanoTime() - startNanos >= timeoutNanos) { | |
| String message = "Agent process exceeded timeout (%d ms) while handling command %s for aggregate %s" | |
| .formatted(timeoutNanos / 1_000_000L, commandType.typeName(), aggregate.getClass().getSimpleName()); | |
| logger.warn(message); | |
| throw new IllegalStateException(message); | |
| } | |
| agentProcess.tick(); | |
| ticks++; |
… matching EventHandlerFunctionAdapter Agent-Logs-Url: https://github.com/elasticsoftwarefoundation/akces-framework/sessions/2d9121bb-7262-4cfc-b4fd-a41735f4aa01 Co-authored-by: jwijgerd <914840+jwijgerd@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
@AgenticAggregateInfowithagentHandledCommands,agentHandledEvents,agentProducedErrorsgetAgentPlatform()toAgenticAggregateRuntimeinterfaceKafkaAgenticAggregateRuntimeto store and exposeAgentPlatformAgentProcessResultTranslatorutility classAgenticCommandHandlerFunctionAdapter(implementsCommandHandlerFunction)AgenticEventHandlerFunctionAdapter(implementsEventHandlerFunction)AgenticAggregateRuntimeFactoryto wireAgentPlatform, resolveAgent, register adaptersObjects.requireNonNullfor all 4KafkaAgenticAggregateRuntimeconstructor paramsAgentProcessResultTranslatorfilters unknownErrorEvents (warning + exclude) to prevent NPE inprocessDomainEvent()instanceof MemoryAwareStateguard instead of hardS extends MemoryAwareStateboundagentProducedErrorsJavadoc to accurately describe exclusion behaviorAgenticCommandHandlerFunctionAdapter.apply()now uses the same defensive tick limit (10,000 ticks / 30s deadline) asAgenticEventHandlerFunctionAdapter; throwsIllegalStateExceptionon timeout with a structured error logAgentProcessResultTranslatorTest— 8 tests for event draining and unknown-error filteringKafkaAgenticAggregateRuntimeTest