diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/AgenticAggregateRuntime.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/AgenticAggregateRuntime.java index fa6e3854..97149be8 100644 --- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/AgenticAggregateRuntime.java +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/AgenticAggregateRuntime.java @@ -17,6 +17,7 @@ package org.elasticsoftware.akces.agentic; +import com.embabel.agent.core.AgentPlatform; import org.elasticsoftware.akces.agentic.events.MemoryRevokedEvent; import org.elasticsoftware.akces.agentic.events.MemoryStoredEvent; import org.elasticsoftware.akces.aggregate.AgenticAggregateMemory; @@ -31,13 +32,19 @@ /** * Extended runtime interface for {@link org.elasticsoftware.akces.aggregate.AgenticAggregate}s. * - *

Extends {@link AggregateRuntime} with memory-specific operations. Analogous to how - * {@link org.elasticsoftware.akces.aggregate.AgenticAggregate} extends + *

Extends {@link AggregateRuntime} with memory-specific and agent-platform operations. + * Analogous to how {@link org.elasticsoftware.akces.aggregate.AgenticAggregate} extends * {@link org.elasticsoftware.akces.aggregate.Aggregate} to add memory awareness. * - *

The key addition is {@link #getMemories(AggregateStateRecord)}, which allows the partition - * to derive current memory state directly from a loaded state record — avoiding a separate - * in-memory deque that must be rebuilt from the event log after restarts. + *

Key additions over the base interface: + *

*/ public interface AgenticAggregateRuntime extends AggregateRuntime { DomainEventType MEMORY_STORED_TYPE = new DomainEventType<>( @@ -45,6 +52,17 @@ public interface AgenticAggregateRuntime extends AggregateRuntime { DomainEventType MEMORY_REVOKED_TYPE = new DomainEventType<>( "MemoryRevoked", 1, MemoryRevokedEvent.class, false, false, false, false); + + /** + * Returns the Embabel {@link AgentPlatform} used to create and run agent processes. + * + *

This platform is the entry point for GOAP-based planning, LLM reasoning, and tool use + * during agent-handled command and event processing. + * + * @return the {@link AgentPlatform}; never {@code null} + */ + AgentPlatform getAgentPlatform(); + /** * Returns the memories from the given aggregate state record. * diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java index a0a2eeaa..d3a90693 100644 --- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java @@ -17,19 +17,34 @@ package org.elasticsoftware.akces.agentic.beans; +import com.embabel.agent.core.Agent; +import com.embabel.agent.core.AgentPlatform; import org.elasticsoftware.akces.agentic.AgenticAggregateRuntime; +import org.elasticsoftware.akces.agentic.runtime.AgenticCommandHandlerFunctionAdapter; +import org.elasticsoftware.akces.agentic.runtime.AgenticEventHandlerFunctionAdapter; import org.elasticsoftware.akces.agentic.runtime.KafkaAgenticAggregateRuntime; import org.elasticsoftware.akces.aggregate.*; import org.elasticsoftware.akces.aggregate.AgenticAggregate; import org.elasticsoftware.akces.annotations.AgenticAggregateInfo; import org.elasticsoftware.akces.annotations.AggregateStateInfo; +import org.elasticsoftware.akces.annotations.CommandInfo; +import org.elasticsoftware.akces.annotations.DomainEventInfo; +import org.elasticsoftware.akces.commands.Command; +import org.elasticsoftware.akces.events.DomainEvent; +import org.elasticsoftware.akces.events.ErrorEvent; import org.elasticsoftware.akces.kafka.KafkaAggregateRuntime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; import org.springframework.beans.factory.FactoryBean; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import tools.jackson.databind.ObjectMapper; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + import static org.elasticsoftware.akces.agentic.AgenticAggregateRuntime.MEMORY_REVOKED_TYPE; import static org.elasticsoftware.akces.agentic.AgenticAggregateRuntime.MEMORY_STORED_TYPE; @@ -43,12 +58,45 @@ * the agentic module: agentic aggregates are never GDPR-keyed, never indexed, and * never have PII-annotated state.

* + *

In addition to wiring standard command, event, and event-sourcing handler function + * adapters, this factory also processes the three agentic annotation properties: + *

+ * + *

Resolves the {@link AgentPlatform} from the {@link ApplicationContext} and fails + * fast if it is not available — agentic aggregates cannot start without a configured + * Embabel agent platform.

+ * + *

When {@code agentHandledCommands} or {@code agentHandledEvents} are declared, the + * factory looks up an {@link Agent} bean named {@code {aggregateName}Agent} from the + * {@link ApplicationContext}. This {@link Agent} is provided by the implementing + * application and must be registered as a Spring bean before this factory is invoked. + * A fatal error is raised if the bean cannot be found.

+ * *

Produces a {@link KafkaAgenticAggregateRuntime} wrapping the internally built - * {@link KafkaAggregateRuntime}, adding memory-aware operations.

+ * {@link KafkaAggregateRuntime}, adding memory-aware and agent-platform operations.

* * @param the aggregate state type */ -public class AgenticAggregateRuntimeFactory implements FactoryBean, ApplicationContextAware { +public class AgenticAggregateRuntimeFactory + implements FactoryBean, ApplicationContextAware { + + private static final Logger logger = + LoggerFactory.getLogger(AgenticAggregateRuntimeFactory.class); + + /** System error types added to every non-create command handler (including agent-handled). */ + private static final List> COMMAND_HANDLER_SYSTEM_ERRORS = + AgenticAggregateBeanFactoryPostProcessor.COMMAND_HANDLER_SYSTEM_ERRORS; + + /** System error types added to every non-create event handler (including agent-handled). */ + private static final List> EVENT_HANDLER_SYSTEM_ERRORS = + AgenticAggregateBeanFactoryPostProcessor.EVENT_HANDLER_SYSTEM_ERRORS; private ApplicationContext applicationContext; private final ObjectMapper objectMapper; @@ -72,13 +120,19 @@ public void setApplicationContext(ApplicationContext applicationContext) throws @Override public AgenticAggregateRuntime getObject() { - AgenticAggregateInfo agenticInfo = aggregate.getClass().getAnnotation(AgenticAggregateInfo.class); + AgenticAggregateInfo agenticInfo = + aggregate.getClass().getAnnotation(AgenticAggregateInfo.class); if (agenticInfo == null) { throw new IllegalStateException( "Class implementing AgenticAggregate must be annotated with @AgenticAggregateInfo"); } - KafkaAggregateRuntime kafkaRuntime = createRuntime(agenticInfo, aggregate); - return new KafkaAgenticAggregateRuntime(kafkaRuntime, objectMapper, agenticInfo.stateClass()); + + // Resolve AgentPlatform — mandatory for all agentic aggregates. + AgentPlatform agentPlatform = resolveAgentPlatform(agenticInfo.value()); + + KafkaAggregateRuntime kafkaRuntime = createRuntime(agenticInfo, aggregate, agentPlatform); + return new KafkaAgenticAggregateRuntime( + kafkaRuntime, objectMapper, agenticInfo.stateClass(), agentPlatform); } @Override @@ -86,9 +140,52 @@ public Class getObjectType() { return AgenticAggregateRuntime.class; } + /** + * Resolves the {@link AgentPlatform} from the {@link ApplicationContext}. + * + * @param aggregateName the aggregate name (for error messages) + * @return the resolved {@link AgentPlatform}; never {@code null} + * @throws IllegalStateException if no {@link AgentPlatform} bean is available + */ + private AgentPlatform resolveAgentPlatform(String aggregateName) { + try { + return applicationContext.getBean(AgentPlatform.class); + } catch (BeansException e) { + throw new IllegalStateException( + "AgentPlatform is required for agentic aggregate '" + aggregateName + + "' but was not found in the ApplicationContext. " + + "Please ensure embabel-agent-starter is configured.", e); + } + } + + /** + * Resolves the {@link Agent} for this aggregate from the {@link ApplicationContext}. + * + *

Looks for a Spring bean of type {@link Agent} named {@code {aggregateName}Agent}. + * The implementing application is responsible for registering this bean. If no such + * bean is found, a fatal error is raised. + * + * @param aggregateName the aggregate name used to derive the agent bean name + * @return the resolved {@link Agent}; never {@code null} + * @throws IllegalStateException if no matching {@link Agent} bean is found + */ + private Agent resolveAgent(String aggregateName) { + String agentBeanName = aggregateName + "Agent"; + try { + return applicationContext.getBean(agentBeanName, Agent.class); + } catch (BeansException e) { + throw new IllegalStateException( + "No Agent bean found with name '" + agentBeanName + "' for agentic aggregate '" + + aggregateName + "'. " + + "The implementing application must provide a Spring bean of type " + + "com.embabel.agent.core.Agent named '" + agentBeanName + "'.", e); + } + } + @SuppressWarnings("unchecked") private KafkaAggregateRuntime createRuntime(AgenticAggregateInfo agenticInfo, - AgenticAggregate aggregate) { + AgenticAggregate aggregate, + AgentPlatform agentPlatform) { KafkaAggregateRuntime.Builder runtimeBuilder = new KafkaAggregateRuntime.Builder(); AggregateStateInfo stateInfo = agenticInfo.stateClass().getAnnotation(AggregateStateInfo.class); @@ -110,7 +207,7 @@ private KafkaAggregateRuntime createRuntime(AgenticAggregateInfo agenticInfo, .setAggregateClass((Class>) aggregate.getClass()) .setObjectMapper(objectMapper); - // CommandHandlerFunctions + // CommandHandlerFunctions (deterministic, from @CommandHandler methods) applicationContext.getBeansOfType(CommandHandlerFunction.class).values().stream() .filter(adapter -> adapter.getAggregate().equals(aggregate)) .forEach(adapter -> { @@ -165,7 +262,8 @@ private KafkaAggregateRuntime createRuntime(AgenticAggregateInfo agenticInfo, if (adapter.getInputType() instanceof AggregateStateType stateType) { runtimeBuilder.addStateUpcastingHandler(stateType, adapter); } else if (adapter.getInputType() instanceof DomainEventType eventType) { - runtimeBuilder.addEventUpcastingHandler(eventType, adapter).addDomainEvent(eventType); + runtimeBuilder.addEventUpcastingHandler(eventType, adapter) + .addDomainEvent(eventType); } }); @@ -179,6 +277,178 @@ private KafkaAggregateRuntime createRuntime(AgenticAggregateInfo agenticInfo, .addEventSourcingHandler(MEMORY_REVOKED_TYPE, KafkaAgenticAggregateRuntime::handleMemoryEvent) .addDomainEvent(MEMORY_REVOKED_TYPE); + // Collect agent-produced error types for registration and inclusion in adapters. + List> agentProducedErrorTypes = + buildAgentProducedErrorTypes(agenticInfo.agentProducedErrors()); + agentProducedErrorTypes.forEach(runtimeBuilder::addDomainEvent); + + // Lazy Agent resolution: only needed when agent-handled commands or events exist. + boolean hasAgentHandlers = agenticInfo.agentHandledCommands().length > 0 + || agenticInfo.agentHandledEvents().length > 0; + Agent agent = hasAgentHandlers ? resolveAgent(agenticInfo.value()) : null; + + // Process agentHandledCommands — register AgenticCommandHandlerFunctionAdapter + for (Class commandClass : agenticInfo.agentHandledCommands()) { + processAgentHandledCommand( + commandClass, aggregate, agentPlatform, agent, + agentProducedErrorTypes, runtimeBuilder); + } + + // Process agentHandledEvents — register AgenticEventHandlerFunctionAdapter + for (Class eventClass : agenticInfo.agentHandledEvents()) { + processAgentHandledEvent( + eventClass, aggregate, agentPlatform, agent, + agentProducedErrorTypes, runtimeBuilder); + } + return runtimeBuilder.validateAndBuild(); } + + /** + * Builds the list of {@link DomainEventType} entries for all classes declared in + * {@code agentProducedErrors}. + * + * @param errorClasses the error event classes from the annotation + * @return a list of {@code DomainEventType(error=true)} entries + * @throws IllegalArgumentException if any class is not annotated with {@code @DomainEventInfo} + * or does not implement {@link ErrorEvent} + */ + private List> buildAgentProducedErrorTypes( + Class[] errorClasses) { + List> result = new ArrayList<>(errorClasses.length); + for (Class errorClass : errorClasses) { + DomainEventInfo info = errorClass.getAnnotation(DomainEventInfo.class); + if (info == null) { + throw new IllegalArgumentException( + "Agent-produced error class " + errorClass.getName() + + " must be annotated with @DomainEventInfo"); + } + result.add(new DomainEventType<>( + info.type(), info.version(), errorClass, + false, // create + false, // external + true, // error + false)); // piiData + } + return Collections.unmodifiableList(result); + } + + /** + * Validates and registers an agent-handled command, creating an + * {@link AgenticCommandHandlerFunctionAdapter} and registering it with the runtime builder. + * + * @param commandClass the command class to register + * @param aggregate the owning aggregate instance + * @param agentPlatform the Embabel platform + * @param agent the deployed {@link Agent} for this aggregate + * @param agentProducedErrors the error types the agent may produce + * @param runtimeBuilder the runtime builder to register with + */ + @SuppressWarnings({"unchecked", "rawtypes"}) + private void processAgentHandledCommand( + Class commandClass, + AgenticAggregate aggregate, + AgentPlatform agentPlatform, + Agent agent, + List> agentProducedErrors, + KafkaAggregateRuntime.Builder runtimeBuilder) { + + CommandInfo commandInfo = commandClass.getAnnotation(CommandInfo.class); + if (commandInfo == null) { + throw new IllegalArgumentException( + "Agent-handled command class " + commandClass.getName() + + " must be annotated with @CommandInfo"); + } + + // Agent-handled commands always have create=false (design constraint). + CommandType commandType = new CommandType<>( + commandInfo.type(), commandInfo.version(), commandClass, + false, // create — agent-handled commands cannot create state + false, // external + false); // piiData + + // Error types: system errors + agent-produced errors + List> errorTypes = new ArrayList<>(COMMAND_HANDLER_SYSTEM_ERRORS); + errorTypes.addAll(agentProducedErrors); + + AgenticCommandHandlerFunctionAdapter adapter = new AgenticCommandHandlerFunctionAdapter<>( + (AgenticAggregate) aggregate, + commandType, + agentPlatform, + agent, + (List) List.of(), // producedDomainEventTypes: empty — events are registered via EventSourcingHandler adapters + (List) errorTypes, + // TODO: wire aggregateServicesSupplier from AkcesAgenticAggregateController (Phase 3) + Collections::emptyList); + + runtimeBuilder.addCommandHandler(commandType, adapter).addCommand(commandType); + errorTypes.forEach(runtimeBuilder::addDomainEvent); + + logger.debug("Registered agent-handled command {} v{} for aggregate {}", + commandInfo.type(), commandInfo.version(), + aggregate.getClass().getSimpleName()); + } + + /** + * Validates and registers an agent-handled external event, creating an + * {@link AgenticEventHandlerFunctionAdapter} and registering it with the runtime builder. + * + * @param eventClass the external domain event class to register + * @param aggregate the owning aggregate instance + * @param agentPlatform the Embabel platform + * @param agent the deployed {@link Agent} for this aggregate + * @param agentProducedErrors the error types the agent may produce + * @param runtimeBuilder the runtime builder to register with + */ + @SuppressWarnings({"unchecked", "rawtypes"}) + private void processAgentHandledEvent( + Class eventClass, + AgenticAggregate aggregate, + AgentPlatform agentPlatform, + Agent agent, + List> agentProducedErrors, + KafkaAggregateRuntime.Builder runtimeBuilder) { + + if (ErrorEvent.class.isAssignableFrom(eventClass)) { + throw new IllegalArgumentException( + "Agent-handled event class " + eventClass.getName() + + " must not implement ErrorEvent. " + + "Error events should be declared in agentProducedErrors instead."); + } + + DomainEventInfo eventInfo = eventClass.getAnnotation(DomainEventInfo.class); + if (eventInfo == null) { + throw new IllegalArgumentException( + "Agent-handled event class " + eventClass.getName() + + " must be annotated with @DomainEventInfo"); + } + + DomainEventType eventType = new DomainEventType<>( + eventInfo.type(), eventInfo.version(), eventClass, + false, // create + true, // external — agent-handled events are always external + false, // error + false); // piiData + + // Error types: system errors + agent-produced errors + List> errorTypes = new ArrayList<>(EVENT_HANDLER_SYSTEM_ERRORS); + errorTypes.addAll(agentProducedErrors); + + AgenticEventHandlerFunctionAdapter adapter = new AgenticEventHandlerFunctionAdapter<>( + (AgenticAggregate) aggregate, + eventType, + agentPlatform, + agent, + (List) List.of(), // producedDomainEventTypes: empty — events are registered via EventSourcingHandler adapters + (List) errorTypes, + // TODO: wire aggregateServicesSupplier from AkcesAgenticAggregateController (Phase 3) + Collections::emptyList); + + runtimeBuilder.addExternalEventHandler(eventType, adapter).addDomainEvent(eventType); + errorTypes.forEach(runtimeBuilder::addDomainEvent); + + logger.debug("Registered agent-handled external event {} v{} for aggregate {}", + eventInfo.type(), eventInfo.version(), + aggregate.getClass().getSimpleName()); + } } diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgentProcessResultTranslator.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgentProcessResultTranslator.java new file mode 100644 index 00000000..a227788b --- /dev/null +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgentProcessResultTranslator.java @@ -0,0 +1,113 @@ +/* + * Copyright 2022 - 2026 The Original Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.elasticsoftware.akces.agentic.runtime; + +import com.embabel.agent.core.Blackboard; +import org.elasticsoftware.akces.aggregate.DomainEventType; +import org.elasticsoftware.akces.events.DomainEvent; +import org.elasticsoftware.akces.events.ErrorEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Utility class that translates the results of an Embabel {@code AgentProcess} back + * into Akces {@link DomainEvent} instances. + * + *

After each agent tick (or after the process reaches an end state), call + * {@link #collectEvents(Blackboard, Collection)} to drain {@link DomainEvent} objects + * from the blackboard. Events are marked as hidden on the blackboard after collection, + * so subsequent calls to this method will not return the same events again — this + * supports both tick-to-completion and future incremental-tick processing patterns. + * + *

Unknown {@link ErrorEvent} types (not declared in {@code agentProducedErrors} and + * therefore not registered as {@link DomainEventType}s in the runtime) are logged at + * {@code WARN} level and excluded from the returned list. This prevents + * {@code processDomainEvent()} from encountering a {@code null} type look-up and + * NPE-ing, while still allowing the transaction to commit with a meaningful result. + * Standard (non-error) {@link DomainEvent} types are always included and must be + * registered; passing an unregistered non-error event to the runtime will result in + * a {@code NullPointerException} in {@code processDomainEvent()}. + */ +public final class AgentProcessResultTranslator { + + private static final Logger logger = LoggerFactory.getLogger(AgentProcessResultTranslator.class); + + private AgentProcessResultTranslator() { + // utility class + } + + /** + * Collects all {@link DomainEvent} objects currently visible on the blackboard, + * removes them from visible scope (via {@link Blackboard#hide(Object)}), and returns + * the subset that can safely be passed to the runtime's {@code processDomainEvent()} + * method. + * + *

For every collected event: + *

    + *
  • Non-error {@link DomainEvent}s are always included and passed through as-is.
  • + *
  • {@link ErrorEvent}s whose class is present in {@code registeredEventTypes} are + * included.
  • + *
  • {@link ErrorEvent}s whose class is not present in + * {@code registeredEventTypes} are logged at {@code WARN} level and excluded. + * Excluding them prevents a {@code NullPointerException} inside + * {@code KafkaAggregateRuntime.processDomainEvent()} that would otherwise occur + * when the runtime looks up the unregistered type. The transaction still commits + * normally; only the unknown error event is silently dropped.
  • + *
+ * + * @param blackboard the agent process blackboard to drain events from + * @param registeredEventTypes all {@link DomainEventType}s registered with the runtime; + * used to verify that agent-produced {@link ErrorEvent}s are + * known before passing them downstream + * @return an unmodifiable list of domain events that are safe to pass to the runtime; + * never {@code null}, may be empty + */ + public static List collectEvents(Blackboard blackboard, + Collection> registeredEventTypes) { + Set> registeredClasses = registeredEventTypes.stream() + .map(DomainEventType::typeClass) + .collect(Collectors.toSet()); + + List allEvents = blackboard.getObjects().stream() + .filter(o -> o instanceof DomainEvent) + .map(o -> (DomainEvent) o) + .toList(); + + List result = new ArrayList<>(allEvents.size()); + for (DomainEvent event : allEvents) { + blackboard.hide(event); + if (event instanceof ErrorEvent && !registeredClasses.contains(event.getClass())) { + logger.warn( + "Agent produced undeclared error event of type '{}' which is not registered " + + "as a DomainEventType. The event will be excluded from processing to " + + "prevent a runtime failure. Declare it in agentProducedErrors to enable " + + "serialization and service discovery. Event: {}", + event.getClass().getName(), event); + } else { + result.add(event); + } + } + return List.copyOf(result); + } +} diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java new file mode 100644 index 00000000..718f90cf --- /dev/null +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java @@ -0,0 +1,240 @@ +/* + * Copyright 2022 - 2026 The Original Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.elasticsoftware.akces.agentic.runtime; + +import com.embabel.agent.core.Agent; +import com.embabel.agent.core.AgentPlatform; +import com.embabel.agent.core.AgentProcess; +import com.embabel.agent.core.ProcessOptions; +import jakarta.annotation.Nonnull; +import org.elasticsoftware.akces.aggregate.*; +import org.elasticsoftware.akces.commands.Command; +import org.elasticsoftware.akces.control.AggregateServiceRecord; +import org.elasticsoftware.akces.events.DomainEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; +import java.util.stream.Stream; + +/** + * A {@link CommandHandlerFunction} implementation that routes an agent-handled command + * through the Embabel AI agent framework instead of a deterministic + * {@code @CommandHandler} method. + * + *

When {@link #apply(Command, AggregateState)} is called the adapter: + *

    + *
  1. Assembles a bindings {@link Map} containing the command, current aggregate state, + * memories, aggregate service records, and condition flags.
  2. + *
  3. Creates an {@link AgentProcess} via + * {@link AgentPlatform#createAgentProcess(Agent, ProcessOptions, Map)} using the + * provided {@link Agent} and the assembled bindings.
  4. + *
  5. Calls {@link AgentProcess#tick()} in a loop until the process reaches an end + * state (completed, failed, terminated, or killed).
  6. + *
  7. Collects {@link DomainEvent} objects placed on the agent's blackboard via + * {@link AgentProcessResultTranslator#collectEvents} and returns them as a + * {@link Stream}.
  8. + *
+ * + *

{@link #isCreate()} always returns {@code false} — agent-handled commands cannot + * create aggregate state. Every agentic aggregate must have a separate deterministic + * {@code @CommandHandler(create = true)} method. + * + *

The returned stream of events is fed back into the standard + * {@code KafkaAggregateRuntime.handleCommand()} flow unchanged, where each event is + * applied through the registered {@code @EventSourcingHandler} methods. + * + * @param the aggregate state type; must implement {@link AggregateState} + * @param the command type handled by this adapter + * @param the domain event type produced by this adapter + */ +public class AgenticCommandHandlerFunctionAdapter + implements CommandHandlerFunction { + + private static final Logger logger = + LoggerFactory.getLogger(AgenticCommandHandlerFunctionAdapter.class); + + private final AgenticAggregate aggregate; + private final CommandType commandType; + private final AgentPlatform agentPlatform; + private final Agent agent; + private final List> producedDomainEventTypes; + private final List> errorEventTypes; + private final Supplier> aggregateServicesSupplier; + + /** + * Creates a new {@code AgenticCommandHandlerFunctionAdapter}. + * + * @param aggregate the owning agentic aggregate instance + * @param commandType the command type this adapter handles + * @param agentPlatform the Embabel platform used to create agent processes + * @param agent the deployed {@link Agent} for this aggregate, + * provided by the implementing application + * @param producedDomainEventTypes domain event types this adapter may produce + * @param errorEventTypes error event types this adapter may produce + * @param aggregateServicesSupplier supplier of all known {@link AggregateServiceRecord}s; + * used to populate the blackboard for service discovery + */ + public AgenticCommandHandlerFunctionAdapter( + AgenticAggregate aggregate, + CommandType commandType, + AgentPlatform agentPlatform, + Agent agent, + List> producedDomainEventTypes, + List> errorEventTypes, + Supplier> aggregateServicesSupplier) { + this.aggregate = aggregate; + this.commandType = commandType; + this.agentPlatform = agentPlatform; + this.agent = agent; + this.producedDomainEventTypes = List.copyOf(producedDomainEventTypes); + this.errorEventTypes = List.copyOf(errorEventTypes); + this.aggregateServicesSupplier = aggregateServicesSupplier; + } + + /** + * Processes the command through the Embabel AI agent framework. + * + *

Populates the agent blackboard with: + *

    + *
  • {@code "command"} — the command being processed
  • + *
  • {@code "state"} — the current aggregate state
  • + *
  • {@code "agenticAggregateId"} — the aggregate identifier
  • + *
  • {@code "memories"} — the list of current memories from state
  • + *
  • {@code "aggregateServices"} — all known aggregate service records
  • + *
  • {@code "isCommandProcessing"} (condition) — {@code true}
  • + *
  • {@code "isExternalEvent"} (condition) — {@code false}
  • + *
  • {@code "hasMemories"} (condition) — whether any memories are present
  • + *
+ * + * @param command the command to process; never {@code null} + * @param state the current aggregate state + * @return a stream of domain events produced by the agent; may be empty + */ + @Nonnull + @Override + @SuppressWarnings("unchecked") + public Stream apply(@Nonnull C command, S state) { + logger.debug("Processing agent-handled command {} for aggregate {}", + commandType.typeName(), aggregate.getClass().getSimpleName()); + + Map bindings = new LinkedHashMap<>(); + bindings.put("command", command); + bindings.put("state", state); + bindings.put("agenticAggregateId", state.getAggregateId()); + List memories = state instanceof MemoryAwareState mas + ? mas.getMemories() + : List.of(); + bindings.put("memories", memories); + bindings.put("aggregateServices", aggregateServicesSupplier.get()); + bindings.put("isCommandProcessing", true); + bindings.put("isExternalEvent", false); + bindings.put("hasMemories", !memories.isEmpty()); + + AgentProcess agentProcess = + agentPlatform.createAgentProcess(agent, ProcessOptions.DEFAULT, bindings); + + // Tick to completion with defensive limits so a stuck agent process cannot + // block command handling indefinitely. + final long maxTicks = 10_000L; + final long timeoutNanos = java.util.concurrent.TimeUnit.SECONDS.toNanos(30); + final long deadlineNanos = System.nanoTime() + timeoutNanos; + long tickCount = 0L; + + while (!agentProcess.getFinished() + && tickCount < maxTicks + && System.nanoTime() < deadlineNanos) { + agentProcess.tick(); + tickCount++; + } + + if (!agentProcess.getFinished()) { + logger.error( + "Agent process did not finish within safety limits for command {} on aggregate {}. " + + "aggregateId={}, tickCount={}, maxTicks={}, timeoutSeconds={}, status={}", + commandType.typeName(), + aggregate.getClass().getSimpleName(), + state.getAggregateId(), + tickCount, + maxTicks, + java.util.concurrent.TimeUnit.NANOSECONDS.toSeconds(timeoutNanos), + agentProcess.getStatus()); + throw new IllegalStateException( + "Agent process exceeded execution limits for command " + commandType.typeName() + + " on aggregate " + aggregate.getClass().getSimpleName()); + } + + logger.debug("Agent process completed with status {} for command {} on aggregate {}", + agentProcess.getStatus(), commandType.typeName(), aggregate.getClass().getSimpleName()); + + return (Stream) AgentProcessResultTranslator + .collectEvents(agentProcess.getBlackboard(), getAllRegisteredEventTypes()) + .stream(); + } + + /** + * {@inheritDoc} + * + *

Always returns {@code false} — agent-handled commands cannot create aggregate + * state. A deterministic {@code @CommandHandler(create = true)} must exist on the + * aggregate class to handle creation. + */ + @Override + public boolean isCreate() { + return false; + } + + @Override + public CommandType getCommandType() { + return commandType; + } + + @Override + public Aggregate getAggregate() { + return aggregate; + } + + @Override + public List> getProducedDomainEventTypes() { + return producedDomainEventTypes; + } + + @Override + public List> getErrorEventTypes() { + return errorEventTypes; + } + + /** + * Returns all domain event types this adapter may produce (both state-changing and error types). + * Used by {@link AgentProcessResultTranslator} to filter out unknown {@link org.elasticsoftware.akces.events.ErrorEvent} + * instances that are not registered with the runtime. + * + * @return combined list of produced and error domain event types + */ + private List> getAllRegisteredEventTypes() { + List> all = new ArrayList<>(producedDomainEventTypes.size() + errorEventTypes.size()); + all.addAll(producedDomainEventTypes); + all.addAll(errorEventTypes); + return all; + } +} diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java new file mode 100644 index 00000000..5a07ec43 --- /dev/null +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java @@ -0,0 +1,237 @@ +/* + * Copyright 2022 - 2026 The Original Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.elasticsoftware.akces.agentic.runtime; + +import com.embabel.agent.core.Agent; +import com.embabel.agent.core.AgentPlatform; +import com.embabel.agent.core.AgentProcess; +import com.embabel.agent.core.ProcessOptions; +import jakarta.annotation.Nonnull; +import org.elasticsoftware.akces.aggregate.*; +import org.elasticsoftware.akces.control.AggregateServiceRecord; +import org.elasticsoftware.akces.events.DomainEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; +import java.util.stream.Stream; + +/** + * An {@link EventHandlerFunction} implementation that routes an agent-handled external + * domain event through the Embabel AI agent framework instead of a deterministic + * {@code @EventHandler} method. + * + *

When {@link #apply(DomainEvent, AggregateState)} is called the adapter: + *

    + *
  1. Assembles a bindings {@link Map} containing the event, current aggregate state, + * memories, aggregate service records, and condition flags.
  2. + *
  3. Creates an {@link AgentProcess} via + * {@link AgentPlatform#createAgentProcess(Agent, ProcessOptions, Map)} using the + * provided {@link Agent} and the assembled bindings.
  4. + *
  5. Calls {@link AgentProcess#tick()} in a loop until the process reaches an end + * state (completed, failed, terminated, or killed).
  6. + *
  7. Collects {@link DomainEvent} objects placed on the agent's blackboard via + * {@link AgentProcessResultTranslator#collectEvents} and returns them as a + * {@link Stream}.
  8. + *
+ * + *

{@link #isCreate()} always returns {@code false} — agent-handled events cannot + * create aggregate state. Every agentic aggregate must have a separate deterministic + * {@code @CommandHandler(create = true)} method. + * + *

The returned stream of events is fed back into the standard + * {@code KafkaAggregateRuntime.handleEvent()} flow unchanged, where each event is + * applied through the registered {@code @EventSourcingHandler} methods. + * + * @param the aggregate state type; must implement {@link AggregateState} + * @param the external domain event type handled by this adapter + * @param the domain event type produced by this adapter + */ +public class AgenticEventHandlerFunctionAdapter + implements EventHandlerFunction { + + private static final Logger logger = + LoggerFactory.getLogger(AgenticEventHandlerFunctionAdapter.class); + + private final AgenticAggregate aggregate; + private final DomainEventType eventType; + private final AgentPlatform agentPlatform; + private final Agent agent; + private final List> producedDomainEventTypes; + private final List> errorEventTypes; + private final Supplier> aggregateServicesSupplier; + + /** + * Creates a new {@code AgenticEventHandlerFunctionAdapter}. + * + * @param aggregate the owning agentic aggregate instance + * @param eventType the external domain event type this adapter handles + * @param agentPlatform the Embabel platform used to create agent processes + * @param agent the deployed {@link Agent} for this aggregate, + * provided by the implementing application + * @param producedDomainEventTypes domain event types this adapter may produce + * @param errorEventTypes error event types this adapter may produce + * @param aggregateServicesSupplier supplier of all known {@link AggregateServiceRecord}s; + * used to populate the blackboard for service discovery + */ + public AgenticEventHandlerFunctionAdapter( + AgenticAggregate aggregate, + DomainEventType eventType, + AgentPlatform agentPlatform, + Agent agent, + List> producedDomainEventTypes, + List> errorEventTypes, + Supplier> aggregateServicesSupplier) { + this.aggregate = aggregate; + this.eventType = eventType; + this.agentPlatform = agentPlatform; + this.agent = agent; + this.producedDomainEventTypes = List.copyOf(producedDomainEventTypes); + this.errorEventTypes = List.copyOf(errorEventTypes); + this.aggregateServicesSupplier = aggregateServicesSupplier; + } + + /** + * Processes the external domain event through the Embabel AI agent framework. + * + *

Populates the agent blackboard with: + *

    + *
  • {@code "event"} — the external domain event being processed
  • + *
  • {@code "state"} — the current aggregate state
  • + *
  • {@code "agenticAggregateId"} — the aggregate identifier
  • + *
  • {@code "memories"} — the list of current memories from state
  • + *
  • {@code "aggregateServices"} — all known aggregate service records
  • + *
  • {@code "isCommandProcessing"} (condition) — {@code false}
  • + *
  • {@code "isExternalEvent"} (condition) — {@code true}
  • + *
  • {@code "hasMemories"} (condition) — whether any memories are present
  • + *
+ * + * @param event the external domain event to process; never {@code null} + * @param state the current aggregate state + * @return a stream of domain events produced by the agent; may be empty + */ + @Nonnull + @Override + @SuppressWarnings("unchecked") + public Stream apply(@Nonnull InputEvent event, S state) { + logger.debug("Processing agent-handled external event {} for aggregate {}", + eventType.typeName(), aggregate.getClass().getSimpleName()); + + Map bindings = new LinkedHashMap<>(); + bindings.put("event", event); + bindings.put("state", state); + bindings.put("agenticAggregateId", state.getAggregateId()); + List memories = state instanceof MemoryAwareState mas + ? mas.getMemories() + : List.of(); + bindings.put("memories", memories); + bindings.put("aggregateServices", aggregateServicesSupplier.get()); + bindings.put("isCommandProcessing", false); + bindings.put("isExternalEvent", true); + bindings.put("hasMemories", !memories.isEmpty()); + + AgentProcess agentProcess = + agentPlatform.createAgentProcess(agent, ProcessOptions.DEFAULT, bindings); + + // Tick to completion with defensive limits so a stuck agent process cannot + // block external event handling indefinitely. + final long maxTicks = 10_000L; + final long timeoutNanos = java.util.concurrent.TimeUnit.SECONDS.toNanos(30); + final long deadlineNanos = System.nanoTime() + timeoutNanos; + long tickCount = 0L; + + while (!agentProcess.getFinished() + && tickCount < maxTicks + && System.nanoTime() < deadlineNanos) { + agentProcess.tick(); + tickCount++; + } + + if (!agentProcess.getFinished()) { + logger.error( + "Agent process did not finish within safety limits for event {} on aggregate {}. " + + "aggregateId={}, tickCount={}, maxTicks={}, timeoutSeconds={}, status={}", + eventType.typeName(), + aggregate.getClass().getSimpleName(), + state.getAggregateId(), + tickCount, + maxTicks, + java.util.concurrent.TimeUnit.NANOSECONDS.toSeconds(timeoutNanos), + agentProcess.getStatus()); + throw new IllegalStateException( + "Agent process exceeded execution limits for event " + eventType.typeName() + + " on aggregate " + aggregate.getClass().getSimpleName()); + } + logger.debug("Agent process completed with status {} for event {} on aggregate {}", + agentProcess.getStatus(), eventType.typeName(), aggregate.getClass().getSimpleName()); + + return (Stream) AgentProcessResultTranslator + .collectEvents(agentProcess.getBlackboard(), getAllRegisteredEventTypes()) + .stream(); + } + + /** + * {@inheritDoc} + * + *

Always returns {@code false} — agent-handled external events cannot create + * aggregate state. + */ + @Override + public boolean isCreate() { + return false; + } + + @Override + public DomainEventType getEventType() { + return eventType; + } + + @Override + public Aggregate getAggregate() { + return aggregate; + } + + @Override + public List> getProducedDomainEventTypes() { + return producedDomainEventTypes; + } + + @Override + public List> getErrorEventTypes() { + return errorEventTypes; + } + + /** + * Returns all domain event types this adapter may produce (both state-changing and error types). + * Used by {@link AgentProcessResultTranslator} to filter out unknown {@link org.elasticsoftware.akces.events.ErrorEvent} + * instances that are not registered with the runtime. + * + * @return combined list of produced and error domain event types + */ + private List> getAllRegisteredEventTypes() { + List> all = new ArrayList<>(producedDomainEventTypes.size() + errorEventTypes.size()); + all.addAll(producedDomainEventTypes); + all.addAll(errorEventTypes); + return all; + } +} diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntime.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntime.java index 8d75f3de..3597ea8b 100644 --- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntime.java +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntime.java @@ -17,6 +17,7 @@ package org.elasticsoftware.akces.agentic.runtime; +import com.embabel.agent.core.AgentPlatform; import org.apache.kafka.common.errors.SerializationException; import org.elasticsoftware.akces.agentic.AgenticAggregateRuntime; import org.elasticsoftware.akces.agentic.events.MemoryRevokedEvent; @@ -36,6 +37,7 @@ import java.io.IOException; import java.util.Collection; import java.util.List; +import java.util.Objects; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Supplier; @@ -52,26 +54,39 @@ public class KafkaAgenticAggregateRuntime implements AgenticAggregateRuntime { private final AggregateRuntime delegate; private final ObjectMapper objectMapper; private final Class stateClass; + private final AgentPlatform agentPlatform; /** * Creates a new {@code KafkaAgenticAggregateRuntime}. * - * @param delegate the underlying aggregate runtime to delegate to - * @param objectMapper Jackson object mapper for state deserialization - * @param stateClass the concrete state class used by this aggregate + * @param delegate the underlying aggregate runtime to delegate to + * @param objectMapper Jackson object mapper for state deserialization + * @param stateClass the concrete state class used by this aggregate + * @param agentPlatform the Embabel {@link AgentPlatform} used for AI-assisted processing; + * must not be {@code null} */ public KafkaAgenticAggregateRuntime(AggregateRuntime delegate, ObjectMapper objectMapper, - Class stateClass) { - this.delegate = delegate; - this.objectMapper = objectMapper; - this.stateClass = stateClass; + Class stateClass, + AgentPlatform agentPlatform) { + this.delegate = Objects.requireNonNull(delegate, "delegate must not be null"); + this.objectMapper = Objects.requireNonNull(objectMapper, "objectMapper must not be null"); + this.stateClass = Objects.requireNonNull(stateClass, "stateClass must not be null"); + this.agentPlatform = Objects.requireNonNull(agentPlatform, "agentPlatform must not be null"); } // ------------------------------------------------------------------------- // AgenticAggregateRuntime extension // ------------------------------------------------------------------------- + /** + * {@inheritDoc} + */ + @Override + public AgentPlatform getAgentPlatform() { + return agentPlatform; + } + /** * {@inheritDoc} * diff --git a/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/AgentProcessResultTranslatorTest.java b/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/AgentProcessResultTranslatorTest.java new file mode 100644 index 00000000..116a8d66 --- /dev/null +++ b/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/AgentProcessResultTranslatorTest.java @@ -0,0 +1,194 @@ +/* + * Copyright 2022 - 2026 The Original Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.elasticsoftware.akces.agentic.runtime; + +import com.embabel.agent.core.Blackboard; +import com.embabel.agent.core.support.InMemoryBlackboard; +import org.elasticsoftware.akces.aggregate.DomainEventType; +import org.elasticsoftware.akces.annotations.AggregateIdentifier; +import org.elasticsoftware.akces.annotations.DomainEventInfo; +import org.elasticsoftware.akces.events.DomainEvent; +import org.elasticsoftware.akces.events.ErrorEvent; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Unit tests for {@link AgentProcessResultTranslator}, verifying event collection, + * blackboard draining, and handling of registered vs. unregistered error events. + */ +class AgentProcessResultTranslatorTest { + + // ------------------------------------------------------------------------- + // Test fixtures + // ------------------------------------------------------------------------- + + @DomainEventInfo(type = "TestStateChanged", version = 1) + record TestStateChangedEvent(@AggregateIdentifier String id) implements DomainEvent { + @Override + public String getAggregateId() { + return id; + } + } + + @DomainEventInfo(type = "TestRegisteredError", version = 1) + record TestRegisteredErrorEvent(@AggregateIdentifier String id) implements ErrorEvent { + @Override + public String getAggregateId() { + return id; + } + } + + @DomainEventInfo(type = "TestUnregisteredError", version = 1) + record TestUnregisteredErrorEvent(@AggregateIdentifier String id) implements ErrorEvent { + @Override + public String getAggregateId() { + return id; + } + } + + private static final DomainEventType STATE_CHANGED_TYPE = + new DomainEventType<>("TestStateChanged", 1, TestStateChangedEvent.class, + false, false, false, false); + + private static final DomainEventType REGISTERED_ERROR_TYPE = + new DomainEventType<>("TestRegisteredError", 1, TestRegisteredErrorEvent.class, + false, false, true, false); + + private Blackboard blackboard; + + @BeforeEach + void setUp() { + blackboard = new InMemoryBlackboard(); + } + + // ------------------------------------------------------------------------- + // Empty blackboard + // ------------------------------------------------------------------------- + + @Test + void collectEventsShouldReturnEmptyListForEmptyBlackboard() { + List events = AgentProcessResultTranslator.collectEvents(blackboard, List.of()); + assertThat(events).isEmpty(); + } + + // ------------------------------------------------------------------------- + // Non-error domain events + // ------------------------------------------------------------------------- + + @Test + void collectEventsShouldIncludeRegisteredStateChangingEvent() { + TestStateChangedEvent event = new TestStateChangedEvent("agg-1"); + blackboard.addObject(event); + + List events = AgentProcessResultTranslator.collectEvents( + blackboard, List.of(STATE_CHANGED_TYPE)); + + assertThat(events).containsExactly(event); + } + + @Test + void collectEventsShouldHideEventsAfterCollection() { + TestStateChangedEvent event = new TestStateChangedEvent("agg-1"); + blackboard.addObject(event); + + AgentProcessResultTranslator.collectEvents(blackboard, List.of(STATE_CHANGED_TYPE)); + + // Second call should return nothing — events were hidden on first call + List second = AgentProcessResultTranslator.collectEvents( + blackboard, List.of(STATE_CHANGED_TYPE)); + assertThat(second).isEmpty(); + } + + @Test + void collectEventsShouldNotIncludeNonDomainEventObjects() { + blackboard.addObject("not an event"); + blackboard.addObject(42); + blackboard.addObject(new TestStateChangedEvent("agg-1")); + + List events = AgentProcessResultTranslator.collectEvents( + blackboard, List.of(STATE_CHANGED_TYPE)); + + assertThat(events).hasSize(1); + assertThat(events.getFirst()).isInstanceOf(TestStateChangedEvent.class); + } + + // ------------------------------------------------------------------------- + // Registered error events — must be included + // ------------------------------------------------------------------------- + + @Test + void collectEventsShouldIncludeRegisteredErrorEvent() { + TestRegisteredErrorEvent error = new TestRegisteredErrorEvent("agg-1"); + blackboard.addObject(error); + + List events = AgentProcessResultTranslator.collectEvents( + blackboard, List.of(REGISTERED_ERROR_TYPE)); + + assertThat(events).containsExactly(error); + } + + // ------------------------------------------------------------------------- + // Unregistered error events — must be excluded with warning + // ------------------------------------------------------------------------- + + @Test + void collectEventsShouldExcludeUnregisteredErrorEvent() { + TestUnregisteredErrorEvent unknownError = new TestUnregisteredErrorEvent("agg-1"); + blackboard.addObject(unknownError); + + // Only STATE_CHANGED_TYPE and REGISTERED_ERROR_TYPE are registered — not the unknown one + List events = AgentProcessResultTranslator.collectEvents( + blackboard, List.of(STATE_CHANGED_TYPE, REGISTERED_ERROR_TYPE)); + + assertThat(events).isEmpty(); + } + + @Test + void collectEventsShouldExcludeUnregisteredErrorAndKeepRegisteredEvents() { + TestStateChangedEvent stateEvent = new TestStateChangedEvent("agg-1"); + TestRegisteredErrorEvent registeredError = new TestRegisteredErrorEvent("agg-1"); + TestUnregisteredErrorEvent unknownError = new TestUnregisteredErrorEvent("agg-1"); + blackboard.addObject(stateEvent); + blackboard.addObject(registeredError); + blackboard.addObject(unknownError); + + List events = AgentProcessResultTranslator.collectEvents( + blackboard, List.of(STATE_CHANGED_TYPE, REGISTERED_ERROR_TYPE)); + + assertThat(events).containsExactlyInAnyOrder(stateEvent, registeredError); + assertThat(events).doesNotContain(unknownError); + } + + @Test + void collectEventsShouldHideUnregisteredErrorEventSoItIsNotReturnedAgain() { + TestUnregisteredErrorEvent unknownError = new TestUnregisteredErrorEvent("agg-1"); + blackboard.addObject(unknownError); + + AgentProcessResultTranslator.collectEvents( + blackboard, List.of(STATE_CHANGED_TYPE, REGISTERED_ERROR_TYPE)); + + // The unknown error was hidden — second call also returns nothing + List second = AgentProcessResultTranslator.collectEvents( + blackboard, List.of(STATE_CHANGED_TYPE, REGISTERED_ERROR_TYPE)); + assertThat(second).isEmpty(); + } +} diff --git a/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntimeTest.java b/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntimeTest.java index 30d1003f..f3b15b88 100644 --- a/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntimeTest.java +++ b/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntimeTest.java @@ -17,6 +17,7 @@ package org.elasticsoftware.akces.agentic.runtime; +import com.embabel.agent.core.AgentPlatform; import org.elasticsoftware.akces.agentic.AgenticAggregateRuntime; import org.elasticsoftware.akces.aggregate.*; import org.elasticsoftware.akces.protocol.AggregateStateRecord; @@ -36,14 +37,16 @@ import java.util.List; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNullPointerException; import static org.mockito.Mockito.*; /** * Unit tests for {@link KafkaAgenticAggregateRuntime}, verifying the delegation pattern, - * memory extraction from state records, and the built-in domain event type constants. + * memory extraction from state records, AgentPlatform exposure, and the built-in domain + * event type constants. * - *

Uses Mockito to mock the underlying {@link AggregateRuntime} delegate, avoiding any - * Kafka or Spring dependencies. + *

Uses Mockito to mock the underlying {@link AggregateRuntime} delegate and the + * {@link com.embabel.agent.core.AgentPlatform}, avoiding any Kafka or Spring dependencies. */ @ExtendWith(MockitoExtension.class) class KafkaAgenticAggregateRuntimeTest { @@ -91,13 +94,16 @@ public String getAggregateId() { @Mock private AggregateRuntime delegate; + @Mock + private AgentPlatform agentPlatform; + private ObjectMapper objectMapper; private KafkaAgenticAggregateRuntime runtime; @BeforeEach void setUp() { objectMapper = JsonMapper.builder().build(); - runtime = new KafkaAgenticAggregateRuntime(delegate, objectMapper, TestMemoryState.class); + runtime = new KafkaAgenticAggregateRuntime(delegate, objectMapper, TestMemoryState.class, agentPlatform); } // ------------------------------------------------------------------------- @@ -133,7 +139,7 @@ void getMemoriesShouldReturnMemoriesFromMemoryAwareState() throws IOException { @Test void getMemoriesShouldReturnEmptyListForNonMemoryAwareState() throws IOException { - var plainRuntime = new KafkaAgenticAggregateRuntime(delegate, objectMapper, PlainState.class); + var plainRuntime = new KafkaAgenticAggregateRuntime(delegate, objectMapper, PlainState.class, agentPlatform); var state = new PlainState("agg-1"); byte[] payload = objectMapper.writeValueAsBytes(state); @@ -211,6 +217,43 @@ void getAllCommandTypesShouldDelegateToUnderlyingRuntime() { verify(delegate).getAllCommandTypes(); } + @Test + void getAgentPlatformShouldReturnInjectedPlatform() { + assertThat(runtime.getAgentPlatform()).isSameAs(agentPlatform); + } + + @Test + void constructorShouldRejectNullDelegate() { + assertThatNullPointerException() + .isThrownBy(() -> new KafkaAgenticAggregateRuntime( + null, objectMapper, TestMemoryState.class, agentPlatform)) + .withMessageContaining("delegate"); + } + + @Test + void constructorShouldRejectNullObjectMapper() { + assertThatNullPointerException() + .isThrownBy(() -> new KafkaAgenticAggregateRuntime( + delegate, null, TestMemoryState.class, agentPlatform)) + .withMessageContaining("objectMapper"); + } + + @Test + void constructorShouldRejectNullStateClass() { + assertThatNullPointerException() + .isThrownBy(() -> new KafkaAgenticAggregateRuntime( + delegate, objectMapper, null, agentPlatform)) + .withMessageContaining("stateClass"); + } + + @Test + void constructorShouldRejectNullAgentPlatform() { + assertThatNullPointerException() + .isThrownBy(() -> new KafkaAgenticAggregateRuntime( + delegate, objectMapper, TestMemoryState.class, null)) + .withMessageContaining("agentPlatform"); + } + // ------------------------------------------------------------------------- // Built-in DomainEventType constants // ------------------------------------------------------------------------- diff --git a/main/api/src/main/java/org/elasticsoftware/akces/annotations/AgenticAggregateInfo.java b/main/api/src/main/java/org/elasticsoftware/akces/annotations/AgenticAggregateInfo.java index fb3ea7c7..e941312f 100644 --- a/main/api/src/main/java/org/elasticsoftware/akces/annotations/AgenticAggregateInfo.java +++ b/main/api/src/main/java/org/elasticsoftware/akces/annotations/AgenticAggregateInfo.java @@ -18,6 +18,9 @@ package org.elasticsoftware.akces.annotations; import org.elasticsoftware.akces.aggregate.AggregateState; +import org.elasticsoftware.akces.commands.Command; +import org.elasticsoftware.akces.events.DomainEvent; +import org.elasticsoftware.akces.events.ErrorEvent; import org.springframework.core.annotation.AliasFor; import org.springframework.stereotype.Component; @@ -26,6 +29,24 @@ import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +/** + * Marks a class as an agentic aggregate, a special type of aggregate that integrates with + * the Embabel AI agent framework for AI-assisted command and event processing. + * + *

In addition to the standard aggregate capabilities, an agentic aggregate may declare: + *

    + *
  • {@link #agentHandledCommands} — commands processed entirely by the AI agent (no + * {@code @CommandHandler} method required)
  • + *
  • {@link #agentHandledEvents} — external domain events processed by the AI agent (no + * {@code @EventHandler} method required)
  • + *
  • {@link #agentProducedErrors} — error event types the AI agent may emit during + * processing (registered for schema validation and service discovery)
  • + *
+ * + *

Every {@code AgenticAggregate} must also have at least one deterministic + * {@code @CommandHandler(create = true)} method so that the aggregate can be created + * before the AI agent handles subsequent commands. + */ @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.TYPE}) @Component @@ -42,4 +63,37 @@ * exceeds this limit the oldest entries are evicted to make room for new ones. */ int maxMemories() default 100; + + /** + * Command classes to be processed by the AI agent instead of a deterministic + * {@code @CommandHandler} method. Each class must implement {@link Command} and be + * annotated with {@code @CommandInfo}. + * + *

Agent-handled commands cannot create aggregate state — every agentic aggregate must + * have a separate deterministic {@code @CommandHandler(create = true)} method. + */ + Class[] agentHandledCommands() default {}; + + /** + * External domain event classes to be processed by the AI agent instead of a + * deterministic {@code @EventHandler} method. Each class must implement + * {@link DomainEvent} and be annotated with {@code @DomainEventInfo}. + */ + Class[] agentHandledEvents() default {}; + + /** + * Error event classes that the AI agent may produce during command or event processing. + * Each class must implement {@link ErrorEvent} and be annotated with + * {@code @DomainEventInfo}. + * + *

These types are registered as {@code DomainEventType(error=true)} for JSON schema + * validation and service discovery. Agent-produced error events that are not + * declared here are not registered with the runtime and will be silently excluded from + * processing — they will not be serialized to the Kafka domain-events topic. A warning + * is logged for each excluded event so that operators can detect the misconfiguration. + * + *

To ensure an error event produced by the agent is persisted and visible to + * downstream consumers, it must be declared here. + */ + Class[] agentProducedErrors() default {}; } diff --git a/main/api/src/test/java/org/elasticsoftware/akces/aggregate/AgenticAggregateInfoTest.java b/main/api/src/test/java/org/elasticsoftware/akces/aggregate/AgenticAggregateInfoTest.java index fa513f68..ed0db235 100644 --- a/main/api/src/test/java/org/elasticsoftware/akces/aggregate/AgenticAggregateInfoTest.java +++ b/main/api/src/test/java/org/elasticsoftware/akces/aggregate/AgenticAggregateInfoTest.java @@ -17,7 +17,10 @@ package org.elasticsoftware.akces.aggregate; -import org.elasticsoftware.akces.annotations.AgenticAggregateInfo; +import org.elasticsoftware.akces.annotations.*; +import org.elasticsoftware.akces.commands.Command; +import org.elasticsoftware.akces.events.DomainEvent; +import org.elasticsoftware.akces.events.ErrorEvent; import org.junit.jupiter.api.Test; import org.springframework.stereotype.Component; @@ -117,4 +120,138 @@ void requiredAttributesShouldBePresent() { assertThat(hasValue).as("value() attribute must be present").isTrue(); assertThat(hasStateClass).as("stateClass() attribute must be present").isTrue(); } + + // ------------------------------------------------------------------------- + // Test fixtures for new agentic annotation properties + // ------------------------------------------------------------------------- + + /** Stub command for agent-handled command tests. */ + @CommandInfo(type = "TestAgentCommand", version = 1) + record TestAgentCommand(@AggregateIdentifier String id) implements Command { + @Override + public String getAggregateId() { + return id; + } + } + + /** Stub external event for agent-handled event tests. */ + @DomainEventInfo(type = "TestExternalEvent", version = 1) + record TestExternalEvent(@AggregateIdentifier String id) implements DomainEvent { + @Override + public String getAggregateId() { + return id; + } + } + + /** Stub error event for agent-produced error tests. */ + @DomainEventInfo(type = "TestAgentError", version = 1) + record TestAgentError(@AggregateIdentifier String id) implements ErrorEvent { + @Override + public String getAggregateId() { + return id; + } + } + + /** Aggregate with all three new annotation properties populated. */ + @AgenticAggregateInfo( + value = "FullAgentic", + stateClass = TestState.class, + agentHandledCommands = {TestAgentCommand.class}, + agentHandledEvents = {TestExternalEvent.class}, + agentProducedErrors = {TestAgentError.class} + ) + static class FullAgenticAggregate implements AgenticAggregate { + @Override + public Class getStateClass() { + return TestState.class; + } + } + + // ------------------------------------------------------------------------- + // Tests for agentHandledCommands + // ------------------------------------------------------------------------- + + @Test + void agentHandledCommandsShouldDefaultToEmpty() { + AgenticAggregateInfo info = DefaultAgenticAggregate.class.getAnnotation(AgenticAggregateInfo.class); + assertThat(info).isNotNull(); + assertThat(info.agentHandledCommands()).isEmpty(); + } + + @Test + void agentHandledCommandsShouldReturnConfiguredClasses() { + AgenticAggregateInfo info = FullAgenticAggregate.class.getAnnotation(AgenticAggregateInfo.class); + assertThat(info).isNotNull(); + assertThat(info.agentHandledCommands()).containsExactly(TestAgentCommand.class); + } + + @Test + void agentHandledCommandsShouldOnlyAcceptCommandImplementations() { + // Verify that the declared Command class actually implements Command + AgenticAggregateInfo info = FullAgenticAggregate.class.getAnnotation(AgenticAggregateInfo.class); + assertThat(info).isNotNull(); + for (Class commandClass : info.agentHandledCommands()) { + assertThat(Command.class.isAssignableFrom(commandClass)) + .as("Agent-handled command %s must implement Command", commandClass.getName()) + .isTrue(); + } + } + + // ------------------------------------------------------------------------- + // Tests for agentHandledEvents + // ------------------------------------------------------------------------- + + @Test + void agentHandledEventsShouldDefaultToEmpty() { + AgenticAggregateInfo info = DefaultAgenticAggregate.class.getAnnotation(AgenticAggregateInfo.class); + assertThat(info).isNotNull(); + assertThat(info.agentHandledEvents()).isEmpty(); + } + + @Test + void agentHandledEventsShouldReturnConfiguredClasses() { + AgenticAggregateInfo info = FullAgenticAggregate.class.getAnnotation(AgenticAggregateInfo.class); + assertThat(info).isNotNull(); + assertThat(info.agentHandledEvents()).containsExactly(TestExternalEvent.class); + } + + @Test + void agentHandledEventsShouldOnlyAcceptDomainEventImplementations() { + AgenticAggregateInfo info = FullAgenticAggregate.class.getAnnotation(AgenticAggregateInfo.class); + assertThat(info).isNotNull(); + for (Class eventClass : info.agentHandledEvents()) { + assertThat(DomainEvent.class.isAssignableFrom(eventClass)) + .as("Agent-handled event %s must implement DomainEvent", eventClass.getName()) + .isTrue(); + } + } + + // ------------------------------------------------------------------------- + // Tests for agentProducedErrors + // ------------------------------------------------------------------------- + + @Test + void agentProducedErrorsShouldDefaultToEmpty() { + AgenticAggregateInfo info = DefaultAgenticAggregate.class.getAnnotation(AgenticAggregateInfo.class); + assertThat(info).isNotNull(); + assertThat(info.agentProducedErrors()).isEmpty(); + } + + @Test + void agentProducedErrorsShouldReturnConfiguredClasses() { + AgenticAggregateInfo info = FullAgenticAggregate.class.getAnnotation(AgenticAggregateInfo.class); + assertThat(info).isNotNull(); + assertThat(info.agentProducedErrors()).containsExactly(TestAgentError.class); + } + + @Test + void agentProducedErrorsShouldOnlyAcceptErrorEventImplementations() { + AgenticAggregateInfo info = FullAgenticAggregate.class.getAnnotation(AgenticAggregateInfo.class); + assertThat(info).isNotNull(); + for (Class errorClass : info.agentProducedErrors()) { + assertThat(ErrorEvent.class.isAssignableFrom(errorClass)) + .as("Agent-produced error %s must implement ErrorEvent", errorClass.getName()) + .isTrue(); + } + } }