Phase 1 + Annotation Extensions: AgentPlatform Injection & Metadata Foundation
Replaces: #305 (please close that issue)
Incorporates changes from: PR #303
Tracking issue: #311
Module: main/agentic, main/api
Can start: Immediately
Blocks: Phase 2, Phase 3, Phase 4, Phase 5
📋 Plan references:
Overview
This issue combines two foundational pieces that are independent but closely related:
-
Phase 1 — Inject the Embabel AgentPlatform into the KafkaAgenticAggregateRuntime so that the runtime can create and run AgentProcess instances during command processing.
-
Annotation Extensions — Extend @AgenticAggregateInfo with three new properties (agentHandledCommands, agentHandledEvents, agentProducedErrors) and create the agentic handler adapter classes that plug into the existing handler mechanism.
Tasks
Phase 1: AgentPlatform Injection
1.1 Add AgentPlatform to KafkaAgenticAggregateRuntime
- Inject
AgentPlatform (Spring-managed bean from embabel-agent-starter) into the constructor
- Store as a
final field
- Expose via
getAgentPlatform() method
1.2 Wire AgentPlatform through AgenticAggregateRuntimeFactory
- Resolve
AgentPlatform bean from ApplicationContext
- Pass it to
KafkaAgenticAggregateRuntime constructor
- Fail fast if
AgentPlatform is not available — agentic aggregates cannot start without it
1.3 Update AgenticAggregateRuntime Interface
- Add
AgentPlatform getAgentPlatform() method to the interface
- Allows
AgenticAggregatePartition to access the platform
1.4 Expose Memories on AgentProcess Blackboard — Moved to Adapter
This is now handled inside the agentic handler adapters' apply() method. The adapter already has access to the state (passed as a parameter), so the partition does not need to set up the blackboard separately.
Annotation Extensions
Extend @AgenticAggregateInfo
Add three new properties:
agentHandledCommands — Class<? extends Command>[] — commands handled by the AI agent (no @CommandHandler method needed)
agentHandledEvents — Class<? extends DomainEvent>[] — external events handled by the AI agent (no @EventHandler method needed)
agentProducedErrors — Class<? extends ErrorEvent>[] — error events the AI agent may produce
Registration Flow for Agent Properties
In AgenticAggregateBeanFactoryPostProcessor / AgenticAggregateRuntimeFactory:
- For
agentHandledCommands[]: Validate, create CommandType, create AgenticCommandHandlerFunctionAdapter, register via runtimeBuilder.addCommandHandler()
- For
agentHandledEvents[]: Validate, create DomainEventType(external=true), create AgenticEventHandlerFunctionAdapter, register via runtimeBuilder.addExternalEventHandler()
- For
agentProducedErrors[]: Validate, create DomainEventType(error=true), register JSON schema
Create AgenticCommandHandlerFunctionAdapter
- Implements
CommandHandlerFunction<S, C, E> (NO AgenticProcessHandler — deferred to agenttasks plan)
apply(command, state): Sets up Blackboard → creates AgentProcess via agentPlatform.createAgentProcess(processCommandGoal, blackboard) → runs tick() loop → collects DomainEvents via AgentProcessResultTranslator → returns Stream
- Concrete goal: Uses
ProcessCommandGoal resolved at construction time
isCreate() always returns false — agent-handled commands cannot create state
- Plugs into existing
KafkaAggregateRuntime.handleCommand() flow unchanged
Create AgenticEventHandlerFunctionAdapter
- Implements
EventHandlerFunction<S, InputEvent, E> (NO AgenticProcessHandler — deferred)
apply(event, state): Sets up Blackboard → creates AgentProcess via agentPlatform.createAgentProcess(reactToExternalEventGoal, blackboard) → runs tick() loop → collects DomainEvents → returns Stream
- Concrete goal: Uses
ReactToExternalEventGoal resolved at construction time
isCreate() always returns false
- Plugs into existing
KafkaAggregateRuntime.handleEvent() flow unchanged
Key Design Decisions
- No
AgentTask / AgenticProcessHandler: Incremental tick support is deferred to a separate plan. Adapters run tick-to-completion.
- Adapter-based approach: No changes needed to
KafkaAggregateRuntime internals — the adapters plug into existing CommandHandlerFunction/EventHandlerFunction contracts
- Concrete goal selection: Each adapter is hardcoded to its specific goal (
ProcessCommandGoal / ReactToExternalEventGoal), resolved at construction time
- Agent-handled commands cannot create state:
isCreate() always false. Every AgenticAggregate must have a deterministic @CommandHandler(create = true) method
- Error events: Explicitly declared via
agentProducedErrors. Registered as DomainEventType(error=true) for schema validation and service discovery
- Runtime error tolerance: Accept all
ErrorEvent instances at runtime, even if undeclared. Log warning, do not reject.
- AgentPlatform is mandatory: Fatal error if not available in
ApplicationContext
- Blackboard setup in adapter: Memories, state, aggregate service records, and conditions are set up inside the adapter's
apply() method (not by the partition)
Acceptance Criteria
Phase 1 + Annotation Extensions: AgentPlatform Injection & Metadata Foundation
Tracking issue: #311
Module:
main/agentic,main/apiCan start: Immediately
Blocks: Phase 2, Phase 3, Phase 4, Phase 5
📋 Plan references:
@AgenticAggregateInfoAnnotation Extensions (lines 11–296)Overview
This issue combines two foundational pieces that are independent but closely related:
Phase 1 — Inject the Embabel
AgentPlatforminto theKafkaAgenticAggregateRuntimeso that the runtime can create and runAgentProcessinstances during command processing.Annotation Extensions — Extend
@AgenticAggregateInfowith three new properties (agentHandledCommands,agentHandledEvents,agentProducedErrors) and create the agentic handler adapter classes that plug into the existing handler mechanism.Tasks
Phase 1: AgentPlatform Injection
1.1 Add
AgentPlatformtoKafkaAgenticAggregateRuntimeAgentPlatform(Spring-managed bean fromembabel-agent-starter) into the constructorfinalfieldgetAgentPlatform()method1.2 Wire
AgentPlatformthroughAgenticAggregateRuntimeFactoryAgentPlatformbean fromApplicationContextKafkaAgenticAggregateRuntimeconstructorAgentPlatformis not available — agentic aggregates cannot start without it1.3 Update
AgenticAggregateRuntimeInterfaceAgentPlatform getAgentPlatform()method to the interfaceAgenticAggregatePartitionto access the platform1.4 Expose Memories on AgentProcess Blackboard— Moved to AdapterAnnotation Extensions
Extend
@AgenticAggregateInfoAdd three new properties:
agentHandledCommands—Class<? extends Command>[]— commands handled by the AI agent (no@CommandHandlermethod needed)agentHandledEvents—Class<? extends DomainEvent>[]— external events handled by the AI agent (no@EventHandlermethod needed)agentProducedErrors—Class<? extends ErrorEvent>[]— error events the AI agent may produceRegistration Flow for Agent Properties
In
AgenticAggregateBeanFactoryPostProcessor/AgenticAggregateRuntimeFactory:agentHandledCommands[]: Validate, createCommandType, createAgenticCommandHandlerFunctionAdapter, register viaruntimeBuilder.addCommandHandler()agentHandledEvents[]: Validate, createDomainEventType(external=true), createAgenticEventHandlerFunctionAdapter, register viaruntimeBuilder.addExternalEventHandler()agentProducedErrors[]: Validate, createDomainEventType(error=true), register JSON schemaCreate
AgenticCommandHandlerFunctionAdapterCommandHandlerFunction<S, C, E>(NOAgenticProcessHandler— deferred to agenttasks plan)apply(command, state): Sets up Blackboard → creates AgentProcess viaagentPlatform.createAgentProcess(processCommandGoal, blackboard)→ runs tick() loop → collects DomainEvents viaAgentProcessResultTranslator→ returns StreamProcessCommandGoalresolved at construction timeisCreate()always returnsfalse— agent-handled commands cannot create stateKafkaAggregateRuntime.handleCommand()flow unchangedCreate
AgenticEventHandlerFunctionAdapterEventHandlerFunction<S, InputEvent, E>(NOAgenticProcessHandler— deferred)apply(event, state): Sets up Blackboard → creates AgentProcess viaagentPlatform.createAgentProcess(reactToExternalEventGoal, blackboard)→ runs tick() loop → collects DomainEvents → returns StreamReactToExternalEventGoalresolved at construction timeisCreate()always returnsfalseKafkaAggregateRuntime.handleEvent()flow unchangedKey Design Decisions
AgentTask/AgenticProcessHandler: Incremental tick support is deferred to a separate plan. Adapters run tick-to-completion.KafkaAggregateRuntimeinternals — the adapters plug into existingCommandHandlerFunction/EventHandlerFunctioncontractsProcessCommandGoal/ReactToExternalEventGoal), resolved at construction timeisCreate()alwaysfalse. EveryAgenticAggregatemust have a deterministic@CommandHandler(create = true)methodagentProducedErrors. Registered asDomainEventType(error=true)for schema validation and service discoveryErrorEventinstances at runtime, even if undeclared. Log warning, do not reject.ApplicationContextapply()method (not by the partition)Acceptance Criteria
@AgenticAggregateInfohasagentHandledCommands(),agentHandledEvents(),agentProducedErrors()propertiesAgentPlatformis injected intoKafkaAgenticAggregateRuntimeand exposed viagetAgentPlatform()AgenticAggregateRuntimeFactoryresolvesAgentPlatformand fails fast if absentAgenticCommandHandlerFunctionAdapterimplementsCommandHandlerFunctionwith concreteProcessCommandGoalAgenticEventHandlerFunctionAdapterimplementsEventHandlerFunctionwith concreteReactToExternalEventGoalisCreate() = falseAgentTaskrecord orAgenticProcessHandlerinterface (deferred)agentHandledCommandscreateCommandTypeentries with adapteragentHandledEventscreate externalDomainEventTypeentries with adapteragentProducedErrorscreateDomainEventType(error=true)entries with JSON schemasapply()