Skip to content

feat(embabel): Phase 4 — Integration with AgenticAggregatePartition #315

@jwijgerd

Description

@jwijgerd

Phase 4: Integration with AgenticAggregatePartition

Replaces: #308 (please close that issue)
Incorporates changes from: PR #303

Tracking issue: #311
Module: main/agentic
Can start: After Phase 2 (#313) and Phase 3 (#314) are merged
Blocks: Phase 5

📋 Plan reference: Phase 4: Integration with AgenticAggregatePartition (lines 573–677)


Overview

Wire everything together so that agent-handled commands and events flow through the Embabel agent loop end-to-end. This phase connects the handler adapters (from Phase 1), the Goals/Actions/Conditions (from Phases 2 & 3), and the AgenticAggregatePartition into a cohesive processing pipeline. Includes transactional boundary configuration and memory command cleanup.


Tasks

4.1 Agent-Assisted Command Processing via Handler Adapters

  • Agent-handled commands/events are processed through the standard KafkaAggregateRuntime handler flow using the agentic adapters
  • When a command type is declared in agentHandledCommands:
    1. handleCommand() looks up the CommandHandlerFunctionAgenticCommandHandlerFunctionAdapter
    2. Adapter's apply(command, state):
      • Sets up Blackboard with command, state, memories, aggregate service records
      • Creates AgentProcess via agentPlatform.createAgentProcess(processCommandGoal, blackboard)
      • Calls tick() in loop until end state
      • Collects DomainEvent instances from blackboard via AgentProcessResultTranslator
      • Returns as Stream<DomainEvent>
    3. Standard handleCommand() iterates returned events, calls processDomainEvent() for each
    4. State-changing events applied via @EventSourcingHandler, error events emitted without state change
  • For standard @CommandHandler commands, existing CommandHandlerFunctionAdapter path is unchanged
  • Same adapter-based flow applies for agentHandledEvents via AgenticEventHandlerFunctionAdapter with reactToExternalEventGoal
  • No incremental tick support — deferred to separate plan

4.2 AgentProcess Result Translation

  • After each tick(), inspect blackboard for:
    • DomainEvent objects → emit as domain events (state-changing and error events)
    • MemoryStoredEvent / MemoryRevokedEvent → apply through built-in @EventSourcingHandler
    • Command objects → send through command bus
  • Create AgentProcessResultTranslator utility class for this translation
  • Accept all ErrorEvent instances at runtime, even if undeclared in agentProducedErrors — log warning for undeclared, do not reject
  • No EmitDomainEventsAction — events are collected from Blackboard directly by the translator

4.3 Event Application Mechanism — Simplified by Adapter Approach

  • Adapters return DomainEvent instances as Stream<DomainEvent> from apply()
  • Existing handleCommand() / handleEvent() already iterate and call processDomainEvent()
  • No changes needed to KafkaAggregateRuntime — delegation pattern works as-is
  • No need to make processDomainEvent protected or refactor to inheritance

4.4 Transactional Boundaries and Timeout Configuration (NEW in PR #303)

  • The agent tick-to-completion loop runs inside the Kafka transaction that AgenticAggregatePartition.processRecords() opens
  • All events produced (including MemoryStoredEvent, MemoryRevokedEvent, state-changing events, error events) are part of a single transaction — ensuring atomicity

Transaction lifecycle:

AgenticAggregatePartition.processRecords():
  1. producer.beginTransaction()
  2. Process external events (if any)
  3. Process commands:
     a. handleCommand() → adapter.apply() → tick() loop → Stream<DomainEvent>
     b. processDomainEvent() for each event
     c. (agent may take seconds to minutes for LLM calls, MCP tool invocations)
  4. producer.sendOffsetsToTransaction()
  5. producer.commitTransaction()

Timeout configuration:

  • agenticServiceProducerFactory must set transaction.timeout.ms to 300000 (5 minutes, configurable via akces.agentic.transaction-timeout-ms)
  • Adapter's apply() should log a warning if tick loop exceeds 80% of transaction timeout
  • Document this configuration requirement for operators

Error handling during agent execution:

  • If AgentProcess.tick() throws, adapter catches it, logs error, returns appropriate ErrorEvent (e.g., CommandExecutionError) so transaction can commit with meaningful error
  • If Kafka transaction is aborted (timeout), standard processRecords() catch block handles rollback

4.5 Agent Context Setup

  • Standard Blackboard content set up by agentic handler adapters before agent invocation:
    • Command or DomainEvent — the materialized trigger
    • AggregateState — current state (always present for singleton agentic aggregates)
    • List<AgenticAggregateMemory> — current memories from MemoryAwareState.getMemories()
    • String agenticAggregateId — the aggregate identifier
    • List<AggregateServiceRecord> — all registered aggregate services from Akces-Control topic
    • AggregateRuntime — for type information (including registered agentProducedErrors types)
    • Conditions: hasMemories, isCommandProcessing / isExternalEvent

4.6 Memory Command Cleanup

  • Delete StoreMemoryCommand.java — memory storage now handled by StoreMemoryAction (Embabel @Action) producing MemoryStoredEvent directly
  • Delete ForgetMemoryCommand.java — memory revocation now handled by ForgetMemoryAction producing MemoryRevokedEvent directly
  • Remove enforceMemorySlidingWindow() from AgenticAggregatePartition — memory capacity enforcement is now the agent's responsibility inside LearnFromProcessGoal
  • Remove maxMemories field from AgenticAggregatePartition
  • Update AkcesAgenticAggregateController to remove StoreMemoryCommand/ForgetMemoryCommand from built-in command registrations

Changes from PR #303

  • proceedAgentTasks() (old 4.3.1) REMOVED — incremental tick support deferred to agenttasks plan
  • New section 4.4: Transactional Boundaries — tick-to-completion loop inside Kafka transaction; increased transaction.timeout.ms; error handling during agent execution
  • Old 4.4 (Agent Context Setup) renumbered to 4.5
  • New section 4.6: Memory Command Cleanup — delete StoreMemoryCommand/ForgetMemoryCommand; remove enforceMemorySlidingWindow()
  • Concrete goal selection — adapters use specific goals (ProcessCommandGoal / ReactToExternalEventGoal) instead of runtime condition evaluation

Key Design Decisions

  • No KafkaAggregateRuntime internals changes: Adapter pattern plugs into existing handler contracts
  • Tick-to-completion inside Kafka transaction: Ensures atomicity of all agent-produced events
  • Increased transaction timeout: Configurable via akces.agentic.transaction-timeout-ms (default 5 min)
  • Error event tolerance: All ErrorEvents accepted at runtime; warnings for undeclared
  • No incremental tick support: Deferred to separate plan
  • Memory commands deleted: Actions produce events directly

Acceptance Criteria

  • Agent-handled commands flow through AgenticCommandHandlerFunctionAdapterAgentProcess → domain events → state
  • Agent-handled external events flow through AgenticEventHandlerFunctionAdapterAgentProcess → domain events → state
  • AgentProcessResultTranslator correctly extracts events, memory events, and commands from blackboard
  • Standard @CommandHandler commands still work unchanged
  • Error events are emitted without changing state; undeclared error events log a warning
  • agenticServiceProducerFactory configured with increased transaction.timeout.ms
  • akces.agentic.transaction-timeout-ms property is exposed and documented
  • Adapter logs warning when tick loop exceeds 80% of transaction timeout
  • tick() exceptions caught and returned as ErrorEvent
  • StoreMemoryCommand.java and ForgetMemoryCommand.java deleted
  • enforceMemorySlidingWindow() removed from AgenticAggregatePartition
  • Blackboard is populated with all required context before agent invocation
  • All existing tests still pass
  • Code follows Java 25 conventions with JavaDoc

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions