Phase 4: Integration with AgenticAggregatePartition
Replaces: #308 (please close that issue)
Incorporates changes from: PR #303
Tracking issue: #311
Module: main/agentic
Can start: After Phase 2 (#313) and Phase 3 (#314) are merged
Blocks: Phase 5
📋 Plan reference: Phase 4: Integration with AgenticAggregatePartition (lines 573–677)
Overview
Wire everything together so that agent-handled commands and events flow through the Embabel agent loop end-to-end. This phase connects the handler adapters (from Phase 1), the Goals/Actions/Conditions (from Phases 2 & 3), and the AgenticAggregatePartition into a cohesive processing pipeline. Includes transactional boundary configuration and memory command cleanup.
Tasks
4.1 Agent-Assisted Command Processing via Handler Adapters
- Agent-handled commands/events are processed through the standard
KafkaAggregateRuntime handler flow using the agentic adapters
- When a command type is declared in
agentHandledCommands:
handleCommand() looks up the CommandHandlerFunction → AgenticCommandHandlerFunctionAdapter
- Adapter's
apply(command, state):
- Sets up Blackboard with command, state, memories, aggregate service records
- Creates
AgentProcess via agentPlatform.createAgentProcess(processCommandGoal, blackboard)
- Calls
tick() in loop until end state
- Collects
DomainEvent instances from blackboard via AgentProcessResultTranslator
- Returns as
Stream<DomainEvent>
- Standard
handleCommand() iterates returned events, calls processDomainEvent() for each
- State-changing events applied via
@EventSourcingHandler, error events emitted without state change
- For standard
@CommandHandler commands, existing CommandHandlerFunctionAdapter path is unchanged
- Same adapter-based flow applies for
agentHandledEvents via AgenticEventHandlerFunctionAdapter with reactToExternalEventGoal
- No incremental tick support — deferred to separate plan
4.2 AgentProcess Result Translation
- After each
tick(), inspect blackboard for:
DomainEvent objects → emit as domain events (state-changing and error events)
MemoryStoredEvent / MemoryRevokedEvent → apply through built-in @EventSourcingHandler
Command objects → send through command bus
- Create
AgentProcessResultTranslator utility class for this translation
- Accept all
ErrorEvent instances at runtime, even if undeclared in agentProducedErrors — log warning for undeclared, do not reject
- No
EmitDomainEventsAction — events are collected from Blackboard directly by the translator
4.3 Event Application Mechanism — Simplified by Adapter Approach
- Adapters return
DomainEvent instances as Stream<DomainEvent> from apply()
- Existing
handleCommand() / handleEvent() already iterate and call processDomainEvent()
- No changes needed to
KafkaAggregateRuntime — delegation pattern works as-is
- No need to make
processDomainEvent protected or refactor to inheritance
4.4 Transactional Boundaries and Timeout Configuration (NEW in PR #303)
- The agent tick-to-completion loop runs inside the Kafka transaction that
AgenticAggregatePartition.processRecords() opens
- All events produced (including
MemoryStoredEvent, MemoryRevokedEvent, state-changing events, error events) are part of a single transaction — ensuring atomicity
Transaction lifecycle:
AgenticAggregatePartition.processRecords():
1. producer.beginTransaction()
2. Process external events (if any)
3. Process commands:
a. handleCommand() → adapter.apply() → tick() loop → Stream<DomainEvent>
b. processDomainEvent() for each event
c. (agent may take seconds to minutes for LLM calls, MCP tool invocations)
4. producer.sendOffsetsToTransaction()
5. producer.commitTransaction()
Timeout configuration:
agenticServiceProducerFactory must set transaction.timeout.ms to 300000 (5 minutes, configurable via akces.agentic.transaction-timeout-ms)
- Adapter's
apply() should log a warning if tick loop exceeds 80% of transaction timeout
- Document this configuration requirement for operators
Error handling during agent execution:
- If
AgentProcess.tick() throws, adapter catches it, logs error, returns appropriate ErrorEvent (e.g., CommandExecutionError) so transaction can commit with meaningful error
- If Kafka transaction is aborted (timeout), standard
processRecords() catch block handles rollback
4.5 Agent Context Setup
- Standard Blackboard content set up by agentic handler adapters before agent invocation:
Command or DomainEvent — the materialized trigger
AggregateState — current state (always present for singleton agentic aggregates)
List<AgenticAggregateMemory> — current memories from MemoryAwareState.getMemories()
String agenticAggregateId — the aggregate identifier
List<AggregateServiceRecord> — all registered aggregate services from Akces-Control topic
AggregateRuntime — for type information (including registered agentProducedErrors types)
- Conditions:
hasMemories, isCommandProcessing / isExternalEvent
4.6 Memory Command Cleanup
- Delete
StoreMemoryCommand.java — memory storage now handled by StoreMemoryAction (Embabel @Action) producing MemoryStoredEvent directly
- Delete
ForgetMemoryCommand.java — memory revocation now handled by ForgetMemoryAction producing MemoryRevokedEvent directly
- Remove
enforceMemorySlidingWindow() from AgenticAggregatePartition — memory capacity enforcement is now the agent's responsibility inside LearnFromProcessGoal
- Remove
maxMemories field from AgenticAggregatePartition
- Update
AkcesAgenticAggregateController to remove StoreMemoryCommand/ForgetMemoryCommand from built-in command registrations
Changes from PR #303
proceedAgentTasks() (old 4.3.1) REMOVED — incremental tick support deferred to agenttasks plan
- New section 4.4: Transactional Boundaries — tick-to-completion loop inside Kafka transaction; increased
transaction.timeout.ms; error handling during agent execution
- Old 4.4 (Agent Context Setup) renumbered to 4.5
- New section 4.6: Memory Command Cleanup — delete
StoreMemoryCommand/ForgetMemoryCommand; remove enforceMemorySlidingWindow()
- Concrete goal selection — adapters use specific goals (
ProcessCommandGoal / ReactToExternalEventGoal) instead of runtime condition evaluation
Key Design Decisions
- No
KafkaAggregateRuntime internals changes: Adapter pattern plugs into existing handler contracts
- Tick-to-completion inside Kafka transaction: Ensures atomicity of all agent-produced events
- Increased transaction timeout: Configurable via
akces.agentic.transaction-timeout-ms (default 5 min)
- Error event tolerance: All
ErrorEvents accepted at runtime; warnings for undeclared
- No incremental tick support: Deferred to separate plan
- Memory commands deleted: Actions produce events directly
Acceptance Criteria
Phase 4: Integration with AgenticAggregatePartition
Tracking issue: #311
Module:
main/agenticCan start: After Phase 2 (#313) and Phase 3 (#314) are merged
Blocks: Phase 5
📋 Plan reference: Phase 4: Integration with AgenticAggregatePartition (lines 573–677)
Overview
Wire everything together so that agent-handled commands and events flow through the Embabel agent loop end-to-end. This phase connects the handler adapters (from Phase 1), the Goals/Actions/Conditions (from Phases 2 & 3), and the
AgenticAggregatePartitioninto a cohesive processing pipeline. Includes transactional boundary configuration and memory command cleanup.Tasks
4.1 Agent-Assisted Command Processing via Handler Adapters
KafkaAggregateRuntimehandler flow using the agentic adaptersagentHandledCommands:handleCommand()looks up theCommandHandlerFunction→AgenticCommandHandlerFunctionAdapterapply(command, state):AgentProcessviaagentPlatform.createAgentProcess(processCommandGoal, blackboard)tick()in loop until end stateDomainEventinstances from blackboard viaAgentProcessResultTranslatorStream<DomainEvent>handleCommand()iterates returned events, callsprocessDomainEvent()for each@EventSourcingHandler, error events emitted without state change@CommandHandlercommands, existingCommandHandlerFunctionAdapterpath is unchangedagentHandledEventsviaAgenticEventHandlerFunctionAdapterwithreactToExternalEventGoal4.2 AgentProcess Result Translation
tick(), inspect blackboard for:DomainEventobjects → emit as domain events (state-changing and error events)MemoryStoredEvent/MemoryRevokedEvent→ apply through built-in@EventSourcingHandlerCommandobjects → send through command busAgentProcessResultTranslatorutility class for this translationErrorEventinstances at runtime, even if undeclared inagentProducedErrors— log warning for undeclared, do not rejectEmitDomainEventsAction— events are collected from Blackboard directly by the translator4.3 Event Application Mechanism — Simplified by Adapter Approach
DomainEventinstances asStream<DomainEvent>fromapply()handleCommand()/handleEvent()already iterate and callprocessDomainEvent()KafkaAggregateRuntime— delegation pattern works as-isprocessDomainEventprotected or refactor to inheritance4.4 Transactional Boundaries and Timeout Configuration (NEW in PR #303)
AgenticAggregatePartition.processRecords()opensMemoryStoredEvent,MemoryRevokedEvent, state-changing events, error events) are part of a single transaction — ensuring atomicityTransaction lifecycle:
Timeout configuration:
agenticServiceProducerFactorymust settransaction.timeout.msto 300000 (5 minutes, configurable viaakces.agentic.transaction-timeout-ms)apply()should log a warning if tick loop exceeds 80% of transaction timeoutError handling during agent execution:
AgentProcess.tick()throws, adapter catches it, logs error, returns appropriateErrorEvent(e.g.,CommandExecutionError) so transaction can commit with meaningful errorprocessRecords()catch block handles rollback4.5 Agent Context Setup
CommandorDomainEvent— the materialized triggerAggregateState— current state (always present for singleton agentic aggregates)List<AgenticAggregateMemory>— current memories fromMemoryAwareState.getMemories()String agenticAggregateId— the aggregate identifierList<AggregateServiceRecord>— all registered aggregate services fromAkces-ControltopicAggregateRuntime— for type information (including registeredagentProducedErrorstypes)hasMemories,isCommandProcessing/isExternalEvent4.6 Memory Command Cleanup
StoreMemoryCommand.java— memory storage now handled byStoreMemoryAction(Embabel@Action) producingMemoryStoredEventdirectlyForgetMemoryCommand.java— memory revocation now handled byForgetMemoryActionproducingMemoryRevokedEventdirectlyenforceMemorySlidingWindow()fromAgenticAggregatePartition— memory capacity enforcement is now the agent's responsibility insideLearnFromProcessGoalmaxMemoriesfield fromAgenticAggregatePartitionAkcesAgenticAggregateControllerto removeStoreMemoryCommand/ForgetMemoryCommandfrom built-in command registrationsChanges from PR #303
proceedAgentTasks()(old 4.3.1) REMOVED — incremental tick support deferred to agenttasks plantransaction.timeout.ms; error handling during agent executionStoreMemoryCommand/ForgetMemoryCommand; removeenforceMemorySlidingWindow()ProcessCommandGoal/ReactToExternalEventGoal) instead of runtime condition evaluationKey Design Decisions
KafkaAggregateRuntimeinternals changes: Adapter pattern plugs into existing handler contractsakces.agentic.transaction-timeout-ms(default 5 min)ErrorEvents accepted at runtime; warnings for undeclaredAcceptance Criteria
AgenticCommandHandlerFunctionAdapter→AgentProcess→ domain events → stateAgenticEventHandlerFunctionAdapter→AgentProcess→ domain events → stateAgentProcessResultTranslatorcorrectly extracts events, memory events, and commands from blackboard@CommandHandlercommands still work unchangedagenticServiceProducerFactoryconfigured with increasedtransaction.timeout.msakces.agentic.transaction-timeout-msproperty is exposed and documentedErrorEventStoreMemoryCommand.javaandForgetMemoryCommand.javadeletedenforceMemorySlidingWindow()removed fromAgenticAggregatePartition