From 5f03f3441595cfde7f3ca637eccf5b1b93463b7c Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 8 Apr 2026 16:39:36 +0000 Subject: [PATCH 1/7] Initial plan From f84ca025fedcc44b39e216c5a23e2ba307e5442b Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 8 Apr 2026 17:06:21 +0000 Subject: [PATCH 2/7] =?UTF-8?q?feat:=20Phase=201=20+=20Annotation=20Extens?= =?UTF-8?q?ions=20=E2=80=94=20AgentPlatform=20injection=20&=20metadata=20f?= =?UTF-8?q?oundation?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Extend @AgenticAggregateInfo with agentHandledCommands, agentHandledEvents, agentProducedErrors - Add AgentPlatform getAgentPlatform() to AgenticAggregateRuntime interface - Store/expose AgentPlatform in KafkaAgenticAggregateRuntime (new constructor param) - Create AgentProcessResultTranslator utility (drain DomainEvents from Blackboard) - Create AgenticCommandHandlerFunctionAdapter (CommandHandlerFunction via Embabel Agent) - Create AgenticEventHandlerFunctionAdapter (EventHandlerFunction via Embabel Agent) - Update AgenticAggregateRuntimeFactory: resolve AgentPlatform (fail-fast), resolve Agent by '{aggregateName}Agent' bean convention, register adapters for agentHandledCommands/Events, register DomainEventType(error=true) for agentProducedErrors - Update KafkaAgenticAggregateRuntimeTest for new constructor + add getAgentPlatform test - Add 9 new annotation tests for the 3 new @AgenticAggregateInfo properties Agent-Logs-Url: https://github.com/elasticsoftwarefoundation/akces-framework/sessions/88dfa45b-d88a-42e2-9027-be4b23fea52d Co-authored-by: jwijgerd <914840+jwijgerd@users.noreply.github.com> --- .../agentic/AgenticAggregateRuntime.java | 28 +- .../beans/AgenticAggregateRuntimeFactory.java | 292 +++++++++++++++++- .../runtime/AgentProcessResultTranslator.java | 78 +++++ .../AgenticCommandHandlerFunctionAdapter.java | 197 ++++++++++++ .../AgenticEventHandlerFunctionAdapter.java | 196 ++++++++++++ .../runtime/KafkaAgenticAggregateRuntime.java | 22 +- .../KafkaAgenticAggregateRuntimeTest.java | 20 +- .../annotations/AgenticAggregateInfo.java | 49 +++ .../aggregate/AgenticAggregateInfoTest.java | 139 ++++++++- 9 files changed, 998 insertions(+), 23 deletions(-) create mode 100644 main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgentProcessResultTranslator.java create mode 100644 main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java create mode 100644 main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/AgenticAggregateRuntime.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/AgenticAggregateRuntime.java index fa6e3854..97149be8 100644 --- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/AgenticAggregateRuntime.java +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/AgenticAggregateRuntime.java @@ -17,6 +17,7 @@ package org.elasticsoftware.akces.agentic; +import com.embabel.agent.core.AgentPlatform; import org.elasticsoftware.akces.agentic.events.MemoryRevokedEvent; import org.elasticsoftware.akces.agentic.events.MemoryStoredEvent; import org.elasticsoftware.akces.aggregate.AgenticAggregateMemory; @@ -31,13 +32,19 @@ /** * Extended runtime interface for {@link org.elasticsoftware.akces.aggregate.AgenticAggregate}s. * - *

Extends {@link AggregateRuntime} with memory-specific operations. Analogous to how - * {@link org.elasticsoftware.akces.aggregate.AgenticAggregate} extends + *

Extends {@link AggregateRuntime} with memory-specific and agent-platform operations. + * Analogous to how {@link org.elasticsoftware.akces.aggregate.AgenticAggregate} extends * {@link org.elasticsoftware.akces.aggregate.Aggregate} to add memory awareness. * - *

The key addition is {@link #getMemories(AggregateStateRecord)}, which allows the partition - * to derive current memory state directly from a loaded state record — avoiding a separate - * in-memory deque that must be rebuilt from the event log after restarts. + *

Key additions over the base interface: + *

*/ public interface AgenticAggregateRuntime extends AggregateRuntime { DomainEventType MEMORY_STORED_TYPE = new DomainEventType<>( @@ -45,6 +52,17 @@ public interface AgenticAggregateRuntime extends AggregateRuntime { DomainEventType MEMORY_REVOKED_TYPE = new DomainEventType<>( "MemoryRevoked", 1, MemoryRevokedEvent.class, false, false, false, false); + + /** + * Returns the Embabel {@link AgentPlatform} used to create and run agent processes. + * + *

This platform is the entry point for GOAP-based planning, LLM reasoning, and tool use + * during agent-handled command and event processing. + * + * @return the {@link AgentPlatform}; never {@code null} + */ + AgentPlatform getAgentPlatform(); + /** * Returns the memories from the given aggregate state record. * diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java index a0a2eeaa..38452964 100644 --- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java @@ -17,19 +17,36 @@ package org.elasticsoftware.akces.agentic.beans; +import com.embabel.agent.core.Agent; +import com.embabel.agent.core.AgentPlatform; import org.elasticsoftware.akces.agentic.AgenticAggregateRuntime; +import org.elasticsoftware.akces.agentic.runtime.AgenticCommandHandlerFunctionAdapter; +import org.elasticsoftware.akces.agentic.runtime.AgenticEventHandlerFunctionAdapter; import org.elasticsoftware.akces.agentic.runtime.KafkaAgenticAggregateRuntime; import org.elasticsoftware.akces.aggregate.*; import org.elasticsoftware.akces.aggregate.AgenticAggregate; import org.elasticsoftware.akces.annotations.AgenticAggregateInfo; import org.elasticsoftware.akces.annotations.AggregateStateInfo; +import org.elasticsoftware.akces.annotations.CommandInfo; +import org.elasticsoftware.akces.annotations.DomainEventInfo; +import org.elasticsoftware.akces.commands.Command; +import org.elasticsoftware.akces.errors.AggregateNotFoundErrorEvent; +import org.elasticsoftware.akces.errors.CommandExecutionErrorEvent; +import org.elasticsoftware.akces.events.DomainEvent; +import org.elasticsoftware.akces.events.ErrorEvent; import org.elasticsoftware.akces.kafka.KafkaAggregateRuntime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; import org.springframework.beans.factory.FactoryBean; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import tools.jackson.databind.ObjectMapper; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + import static org.elasticsoftware.akces.agentic.AgenticAggregateRuntime.MEMORY_REVOKED_TYPE; import static org.elasticsoftware.akces.agentic.AgenticAggregateRuntime.MEMORY_STORED_TYPE; @@ -43,12 +60,51 @@ * the agentic module: agentic aggregates are never GDPR-keyed, never indexed, and * never have PII-annotated state.

* + *

In addition to wiring standard command, event, and event-sourcing handler function + * adapters, this factory also processes the three agentic annotation properties: + *

+ * + *

Resolves the {@link AgentPlatform} from the {@link ApplicationContext} and fails + * fast if it is not available — agentic aggregates cannot start without a configured + * Embabel agent platform.

+ * + *

When {@code agentHandledCommands} or {@code agentHandledEvents} are declared, the + * factory looks up an {@link Agent} bean named {@code {aggregateName}Agent} from the + * {@link ApplicationContext}. This {@link Agent} is provided by the implementing + * application and must be registered as a Spring bean before this factory is invoked. + * A fatal error is raised if the bean cannot be found.

+ * *

Produces a {@link KafkaAgenticAggregateRuntime} wrapping the internally built - * {@link KafkaAggregateRuntime}, adding memory-aware operations.

+ * {@link KafkaAggregateRuntime}, adding memory-aware and agent-platform operations.

* * @param the aggregate state type */ -public class AgenticAggregateRuntimeFactory implements FactoryBean, ApplicationContextAware { +public class AgenticAggregateRuntimeFactory + implements FactoryBean, ApplicationContextAware { + + private static final Logger logger = + LoggerFactory.getLogger(AgenticAggregateRuntimeFactory.class); + + /** System error types added to every non-create command handler (including agent-handled). */ + private static final List> COMMAND_HANDLER_SYSTEM_ERRORS = List.of( + new DomainEventType<>("AggregateNotFoundError", 1, + AggregateNotFoundErrorEvent.class, false, false, true, false), + new DomainEventType<>("CommandExecutionError", 1, + CommandExecutionErrorEvent.class, false, false, true, false) + ); + + /** System error types added to every non-create event handler (including agent-handled). */ + private static final List> EVENT_HANDLER_SYSTEM_ERRORS = List.of( + new DomainEventType<>("AggregateNotFoundError", 1, + AggregateNotFoundErrorEvent.class, false, false, true, false) + ); private ApplicationContext applicationContext; private final ObjectMapper objectMapper; @@ -72,13 +128,19 @@ public void setApplicationContext(ApplicationContext applicationContext) throws @Override public AgenticAggregateRuntime getObject() { - AgenticAggregateInfo agenticInfo = aggregate.getClass().getAnnotation(AgenticAggregateInfo.class); + AgenticAggregateInfo agenticInfo = + aggregate.getClass().getAnnotation(AgenticAggregateInfo.class); if (agenticInfo == null) { throw new IllegalStateException( "Class implementing AgenticAggregate must be annotated with @AgenticAggregateInfo"); } - KafkaAggregateRuntime kafkaRuntime = createRuntime(agenticInfo, aggregate); - return new KafkaAgenticAggregateRuntime(kafkaRuntime, objectMapper, agenticInfo.stateClass()); + + // Resolve AgentPlatform — mandatory for all agentic aggregates. + AgentPlatform agentPlatform = resolveAgentPlatform(agenticInfo.value()); + + KafkaAggregateRuntime kafkaRuntime = createRuntime(agenticInfo, aggregate, agentPlatform); + return new KafkaAgenticAggregateRuntime( + kafkaRuntime, objectMapper, agenticInfo.stateClass(), agentPlatform); } @Override @@ -86,9 +148,52 @@ public Class getObjectType() { return AgenticAggregateRuntime.class; } + /** + * Resolves the {@link AgentPlatform} from the {@link ApplicationContext}. + * + * @param aggregateName the aggregate name (for error messages) + * @return the resolved {@link AgentPlatform}; never {@code null} + * @throws IllegalStateException if no {@link AgentPlatform} bean is available + */ + private AgentPlatform resolveAgentPlatform(String aggregateName) { + try { + return applicationContext.getBean(AgentPlatform.class); + } catch (BeansException e) { + throw new IllegalStateException( + "AgentPlatform is required for agentic aggregate '" + aggregateName + + "' but was not found in the ApplicationContext. " + + "Please ensure embabel-agent-starter is configured.", e); + } + } + + /** + * Resolves the {@link Agent} for this aggregate from the {@link ApplicationContext}. + * + *

Looks for a Spring bean of type {@link Agent} named {@code {aggregateName}Agent}. + * The implementing application is responsible for registering this bean. If no such + * bean is found, a fatal error is raised. + * + * @param aggregateName the aggregate name used to derive the agent bean name + * @return the resolved {@link Agent}; never {@code null} + * @throws IllegalStateException if no matching {@link Agent} bean is found + */ + private Agent resolveAgent(String aggregateName) { + String agentBeanName = aggregateName + "Agent"; + try { + return applicationContext.getBean(agentBeanName, Agent.class); + } catch (BeansException e) { + throw new IllegalStateException( + "No Agent bean found with name '" + agentBeanName + "' for agentic aggregate '" + + aggregateName + "'. " + + "The implementing application must provide a Spring bean of type " + + "com.embabel.agent.core.Agent named '" + agentBeanName + "'.", e); + } + } + @SuppressWarnings("unchecked") private KafkaAggregateRuntime createRuntime(AgenticAggregateInfo agenticInfo, - AgenticAggregate aggregate) { + AgenticAggregate aggregate, + AgentPlatform agentPlatform) { KafkaAggregateRuntime.Builder runtimeBuilder = new KafkaAggregateRuntime.Builder(); AggregateStateInfo stateInfo = agenticInfo.stateClass().getAnnotation(AggregateStateInfo.class); @@ -110,7 +215,7 @@ private KafkaAggregateRuntime createRuntime(AgenticAggregateInfo agenticInfo, .setAggregateClass((Class>) aggregate.getClass()) .setObjectMapper(objectMapper); - // CommandHandlerFunctions + // CommandHandlerFunctions (deterministic, from @CommandHandler methods) applicationContext.getBeansOfType(CommandHandlerFunction.class).values().stream() .filter(adapter -> adapter.getAggregate().equals(aggregate)) .forEach(adapter -> { @@ -165,7 +270,8 @@ private KafkaAggregateRuntime createRuntime(AgenticAggregateInfo agenticInfo, if (adapter.getInputType() instanceof AggregateStateType stateType) { runtimeBuilder.addStateUpcastingHandler(stateType, adapter); } else if (adapter.getInputType() instanceof DomainEventType eventType) { - runtimeBuilder.addEventUpcastingHandler(eventType, adapter).addDomainEvent(eventType); + runtimeBuilder.addEventUpcastingHandler(eventType, adapter) + .addDomainEvent(eventType); } }); @@ -179,6 +285,176 @@ private KafkaAggregateRuntime createRuntime(AgenticAggregateInfo agenticInfo, .addEventSourcingHandler(MEMORY_REVOKED_TYPE, KafkaAgenticAggregateRuntime::handleMemoryEvent) .addDomainEvent(MEMORY_REVOKED_TYPE); + // Collect agent-produced error types for registration and inclusion in adapters. + List> agentProducedErrorTypes = + buildAgentProducedErrorTypes(agenticInfo.agentProducedErrors()); + agentProducedErrorTypes.forEach(runtimeBuilder::addDomainEvent); + + // Lazy Agent resolution: only needed when agent-handled commands or events exist. + boolean hasAgentHandlers = agenticInfo.agentHandledCommands().length > 0 + || agenticInfo.agentHandledEvents().length > 0; + Agent agent = hasAgentHandlers ? resolveAgent(agenticInfo.value()) : null; + + // Process agentHandledCommands — register AgenticCommandHandlerFunctionAdapter + for (Class commandClass : agenticInfo.agentHandledCommands()) { + processAgentHandledCommand( + commandClass, aggregate, agentPlatform, agent, + agentProducedErrorTypes, runtimeBuilder); + } + + // Process agentHandledEvents — register AgenticEventHandlerFunctionAdapter + for (Class eventClass : agenticInfo.agentHandledEvents()) { + processAgentHandledEvent( + eventClass, aggregate, agentPlatform, agent, + agentProducedErrorTypes, runtimeBuilder); + } + return runtimeBuilder.validateAndBuild(); } + + /** + * Builds the list of {@link DomainEventType} entries for all classes declared in + * {@code agentProducedErrors}. + * + * @param errorClasses the error event classes from the annotation + * @return a list of {@code DomainEventType(error=true)} entries + * @throws IllegalArgumentException if any class is not annotated with {@code @DomainEventInfo} + * or does not implement {@link ErrorEvent} + */ + private List> buildAgentProducedErrorTypes( + Class[] errorClasses) { + List> result = new ArrayList<>(errorClasses.length); + for (Class errorClass : errorClasses) { + DomainEventInfo info = errorClass.getAnnotation(DomainEventInfo.class); + if (info == null) { + throw new IllegalArgumentException( + "Agent-produced error class " + errorClass.getName() + + " must be annotated with @DomainEventInfo"); + } + result.add(new DomainEventType<>( + info.type(), info.version(), errorClass, + false, // create + false, // external + true, // error + false)); // piiData + } + return Collections.unmodifiableList(result); + } + + /** + * Validates and registers an agent-handled command, creating an + * {@link AgenticCommandHandlerFunctionAdapter} and registering it with the runtime builder. + * + * @param commandClass the command class to register + * @param aggregate the owning aggregate instance + * @param agentPlatform the Embabel platform + * @param agent the deployed {@link Agent} for this aggregate + * @param agentProducedErrors the error types the agent may produce + * @param runtimeBuilder the runtime builder to register with + */ + @SuppressWarnings({"unchecked", "rawtypes"}) + private void processAgentHandledCommand( + Class commandClass, + AgenticAggregate aggregate, + AgentPlatform agentPlatform, + Agent agent, + List> agentProducedErrors, + KafkaAggregateRuntime.Builder runtimeBuilder) { + + CommandInfo commandInfo = commandClass.getAnnotation(CommandInfo.class); + if (commandInfo == null) { + throw new IllegalArgumentException( + "Agent-handled command class " + commandClass.getName() + + " must be annotated with @CommandInfo"); + } + + // Agent-handled commands always have create=false (design constraint). + CommandType commandType = new CommandType<>( + commandInfo.type(), commandInfo.version(), commandClass, + false, // create — agent-handled commands cannot create state + false, // external + false); // piiData + + // Error types: system errors + agent-produced errors + List> errorTypes = new ArrayList<>(COMMAND_HANDLER_SYSTEM_ERRORS); + errorTypes.addAll(agentProducedErrors); + + AgenticCommandHandlerFunctionAdapter adapter = new AgenticCommandHandlerFunctionAdapter<>( + (AgenticAggregate) aggregate, + commandType, + agentPlatform, + agent, + (List) List.of(), // producedDomainEventTypes: empty (events registered via ESH) + (List) errorTypes, + Collections::emptyList); // aggregateServicesSupplier: wired by controller in later phase + + runtimeBuilder.addCommandHandler(commandType, adapter).addCommand(commandType); + errorTypes.forEach(runtimeBuilder::addDomainEvent); + + logger.debug("Registered agent-handled command {} v{} for aggregate {}", + commandInfo.type(), commandInfo.version(), + aggregate.getClass().getSimpleName()); + } + + /** + * Validates and registers an agent-handled external event, creating an + * {@link AgenticEventHandlerFunctionAdapter} and registering it with the runtime builder. + * + * @param eventClass the external domain event class to register + * @param aggregate the owning aggregate instance + * @param agentPlatform the Embabel platform + * @param agent the deployed {@link Agent} for this aggregate + * @param agentProducedErrors the error types the agent may produce + * @param runtimeBuilder the runtime builder to register with + */ + @SuppressWarnings({"unchecked", "rawtypes"}) + private void processAgentHandledEvent( + Class eventClass, + AgenticAggregate aggregate, + AgentPlatform agentPlatform, + Agent agent, + List> agentProducedErrors, + KafkaAggregateRuntime.Builder runtimeBuilder) { + + if (ErrorEvent.class.isAssignableFrom(eventClass)) { + throw new IllegalArgumentException( + "Agent-handled event class " + eventClass.getName() + + " must not implement ErrorEvent. " + + "Error events should be declared in agentProducedErrors instead."); + } + + DomainEventInfo eventInfo = eventClass.getAnnotation(DomainEventInfo.class); + if (eventInfo == null) { + throw new IllegalArgumentException( + "Agent-handled event class " + eventClass.getName() + + " must be annotated with @DomainEventInfo"); + } + + DomainEventType eventType = new DomainEventType<>( + eventInfo.type(), eventInfo.version(), eventClass, + false, // create + true, // external — agent-handled events are always external + false, // error + false); // piiData + + // Error types: system errors + agent-produced errors + List> errorTypes = new ArrayList<>(EVENT_HANDLER_SYSTEM_ERRORS); + errorTypes.addAll(agentProducedErrors); + + AgenticEventHandlerFunctionAdapter adapter = new AgenticEventHandlerFunctionAdapter<>( + (AgenticAggregate) aggregate, + eventType, + agentPlatform, + agent, + (List) List.of(), // producedDomainEventTypes: empty (events registered via ESH) + (List) errorTypes, + Collections::emptyList); // aggregateServicesSupplier: wired by controller in later phase + + runtimeBuilder.addExternalEventHandler(eventType, adapter).addDomainEvent(eventType); + errorTypes.forEach(runtimeBuilder::addDomainEvent); + + logger.debug("Registered agent-handled external event {} v{} for aggregate {}", + eventInfo.type(), eventInfo.version(), + aggregate.getClass().getSimpleName()); + } } diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgentProcessResultTranslator.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgentProcessResultTranslator.java new file mode 100644 index 00000000..55e33527 --- /dev/null +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgentProcessResultTranslator.java @@ -0,0 +1,78 @@ +/* + * Copyright 2022 - 2026 The Original Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.elasticsoftware.akces.agentic.runtime; + +import com.embabel.agent.core.Blackboard; +import org.elasticsoftware.akces.events.DomainEvent; +import org.elasticsoftware.akces.events.ErrorEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * Utility class that translates the results of an Embabel {@code AgentProcess} back + * into Akces {@link DomainEvent} instances. + * + *

After each agent tick (or after the process reaches an end state), call + * {@link #collectEvents(Blackboard)} to drain {@link DomainEvent} objects from the + * blackboard. Events are marked as hidden on the blackboard after collection, so + * subsequent calls to this method will not return the same events again — this + * supports both tick-to-completion and future incremental-tick processing patterns. + * + *

Undeclared {@link ErrorEvent} instances are accepted at runtime. A warning is + * logged so operators can detect unexpected error types without causing the transaction + * to abort. + */ +public final class AgentProcessResultTranslator { + + private static final Logger logger = LoggerFactory.getLogger(AgentProcessResultTranslator.class); + + private AgentProcessResultTranslator() { + // utility class + } + + /** + * Collects all {@link DomainEvent} objects currently visible on the blackboard + * and removes them from visible scope (via {@link Blackboard#hide(Object)}) so that + * they are not collected again on the next call. + * + *

Any {@link ErrorEvent} instances found are logged at {@code WARN} level so + * that operators can detect unexpected error types emitted by the agent. + * + * @param blackboard the agent process blackboard to drain events from + * @return an unmodifiable list of collected domain events in the order they + * were encountered on the blackboard; never {@code null}, may be empty + */ + public static List collectEvents(Blackboard blackboard) { + List events = blackboard.getObjects().stream() + .filter(o -> o instanceof DomainEvent) + .map(o -> (DomainEvent) o) + .toList(); + + for (DomainEvent event : events) { + if (event instanceof ErrorEvent) { + logger.warn("Agent produced error event of type {}: {}", + event.getClass().getName(), event); + } + blackboard.hide(event); + } + + return events; + } +} diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java new file mode 100644 index 00000000..5597da71 --- /dev/null +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java @@ -0,0 +1,197 @@ +/* + * Copyright 2022 - 2026 The Original Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.elasticsoftware.akces.agentic.runtime; + +import com.embabel.agent.core.Agent; +import com.embabel.agent.core.AgentPlatform; +import com.embabel.agent.core.AgentProcess; +import com.embabel.agent.core.ProcessOptions; +import jakarta.annotation.Nonnull; +import org.elasticsoftware.akces.aggregate.*; +import org.elasticsoftware.akces.commands.Command; +import org.elasticsoftware.akces.control.AggregateServiceRecord; +import org.elasticsoftware.akces.events.DomainEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; +import java.util.stream.Stream; + +/** + * A {@link CommandHandlerFunction} implementation that routes an agent-handled command + * through the Embabel AI agent framework instead of a deterministic + * {@code @CommandHandler} method. + * + *

When {@link #apply(Command, AggregateState)} is called the adapter: + *

    + *
  1. Assembles a bindings {@link Map} containing the command, current aggregate state, + * memories, aggregate service records, and condition flags.
  2. + *
  3. Creates an {@link AgentProcess} via + * {@link AgentPlatform#createAgentProcess(Agent, ProcessOptions, Map)} using the + * provided {@link Agent} and the assembled bindings.
  4. + *
  5. Calls {@link AgentProcess#tick()} in a loop until the process reaches an end + * state (completed, failed, terminated, or killed).
  6. + *
  7. Collects {@link DomainEvent} objects placed on the agent's blackboard via + * {@link AgentProcessResultTranslator#collectEvents} and returns them as a + * {@link Stream}.
  8. + *
+ * + *

{@link #isCreate()} always returns {@code false} — agent-handled commands cannot + * create aggregate state. Every agentic aggregate must have a separate deterministic + * {@code @CommandHandler(create = true)} method. + * + *

The returned stream of events is fed back into the standard + * {@code KafkaAggregateRuntime.handleCommand()} flow unchanged, where each event is + * applied through the registered {@code @EventSourcingHandler} methods. + * + * @param the aggregate state type; must implement both {@link AggregateState} and + * {@link MemoryAwareState} + * @param the command type handled by this adapter + * @param the domain event type produced by this adapter + */ +public class AgenticCommandHandlerFunctionAdapter + implements CommandHandlerFunction { + + private static final Logger logger = + LoggerFactory.getLogger(AgenticCommandHandlerFunctionAdapter.class); + + private final AgenticAggregate aggregate; + private final CommandType commandType; + private final AgentPlatform agentPlatform; + private final Agent agent; + private final List> producedDomainEventTypes; + private final List> errorEventTypes; + private final Supplier> aggregateServicesSupplier; + + /** + * Creates a new {@code AgenticCommandHandlerFunctionAdapter}. + * + * @param aggregate the owning agentic aggregate instance + * @param commandType the command type this adapter handles + * @param agentPlatform the Embabel platform used to create agent processes + * @param agent the deployed {@link Agent} for this aggregate, + * provided by the implementing application + * @param producedDomainEventTypes domain event types this adapter may produce + * @param errorEventTypes error event types this adapter may produce + * @param aggregateServicesSupplier supplier of all known {@link AggregateServiceRecord}s; + * used to populate the blackboard for service discovery + */ + public AgenticCommandHandlerFunctionAdapter( + AgenticAggregate aggregate, + CommandType commandType, + AgentPlatform agentPlatform, + Agent agent, + List> producedDomainEventTypes, + List> errorEventTypes, + Supplier> aggregateServicesSupplier) { + this.aggregate = aggregate; + this.commandType = commandType; + this.agentPlatform = agentPlatform; + this.agent = agent; + this.producedDomainEventTypes = List.copyOf(producedDomainEventTypes); + this.errorEventTypes = List.copyOf(errorEventTypes); + this.aggregateServicesSupplier = aggregateServicesSupplier; + } + + /** + * Processes the command through the Embabel AI agent framework. + * + *

Populates the agent blackboard with: + *

    + *
  • {@code "command"} — the command being processed
  • + *
  • {@code "state"} — the current aggregate state
  • + *
  • {@code "agenticAggregateId"} — the aggregate identifier
  • + *
  • {@code "memories"} — the list of current memories from state
  • + *
  • {@code "aggregateServices"} — all known aggregate service records
  • + *
  • {@code "isCommandProcessing"} (condition) — {@code true}
  • + *
  • {@code "isExternalEvent"} (condition) — {@code false}
  • + *
  • {@code "hasMemories"} (condition) — whether any memories are present
  • + *
+ * + * @param command the command to process; never {@code null} + * @param state the current aggregate state + * @return a stream of domain events produced by the agent; may be empty + */ + @Nonnull + @Override + @SuppressWarnings("unchecked") + public Stream apply(@Nonnull C command, S state) { + logger.debug("Processing agent-handled command {} for aggregate {}", + commandType.typeName(), aggregate.getClass().getSimpleName()); + + Map bindings = new LinkedHashMap<>(); + bindings.put("command", command); + bindings.put("state", state); + bindings.put("agenticAggregateId", state.getAggregateId()); + bindings.put("memories", state.getMemories()); + bindings.put("aggregateServices", aggregateServicesSupplier.get()); + bindings.put("isCommandProcessing", true); + bindings.put("isExternalEvent", false); + bindings.put("hasMemories", !state.getMemories().isEmpty()); + + AgentProcess agentProcess = + agentPlatform.createAgentProcess(agent, ProcessOptions.DEFAULT, bindings); + + while (!agentProcess.getFinished()) { + agentProcess.tick(); + } + + logger.debug("Agent process completed with status {} for command {} on aggregate {}", + agentProcess.getStatus(), commandType.typeName(), aggregate.getClass().getSimpleName()); + + return (Stream) AgentProcessResultTranslator + .collectEvents(agentProcess.getBlackboard()) + .stream(); + } + + /** + * {@inheritDoc} + * + *

Always returns {@code false} — agent-handled commands cannot create aggregate + * state. A deterministic {@code @CommandHandler(create = true)} must exist on the + * aggregate class to handle creation. + */ + @Override + public boolean isCreate() { + return false; + } + + @Override + public CommandType getCommandType() { + return commandType; + } + + @Override + public Aggregate getAggregate() { + return aggregate; + } + + @Override + public List> getProducedDomainEventTypes() { + return producedDomainEventTypes; + } + + @Override + public List> getErrorEventTypes() { + return errorEventTypes; + } +} diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java new file mode 100644 index 00000000..cb7b1083 --- /dev/null +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java @@ -0,0 +1,196 @@ +/* + * Copyright 2022 - 2026 The Original Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.elasticsoftware.akces.agentic.runtime; + +import com.embabel.agent.core.Agent; +import com.embabel.agent.core.AgentPlatform; +import com.embabel.agent.core.AgentProcess; +import com.embabel.agent.core.ProcessOptions; +import jakarta.annotation.Nonnull; +import org.elasticsoftware.akces.aggregate.*; +import org.elasticsoftware.akces.commands.Command; +import org.elasticsoftware.akces.control.AggregateServiceRecord; +import org.elasticsoftware.akces.events.DomainEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; +import java.util.stream.Stream; + +/** + * An {@link EventHandlerFunction} implementation that routes an agent-handled external + * domain event through the Embabel AI agent framework instead of a deterministic + * {@code @EventHandler} method. + * + *

When {@link #apply(DomainEvent, AggregateState)} is called the adapter: + *

    + *
  1. Assembles a bindings {@link Map} containing the event, current aggregate state, + * memories, aggregate service records, and condition flags.
  2. + *
  3. Creates an {@link AgentProcess} via + * {@link AgentPlatform#createAgentProcess(Agent, ProcessOptions, Map)} using the + * provided {@link Agent} and the assembled bindings.
  4. + *
  5. Calls {@link AgentProcess#tick()} in a loop until the process reaches an end + * state (completed, failed, terminated, or killed).
  6. + *
  7. Collects {@link DomainEvent} objects placed on the agent's blackboard via + * {@link AgentProcessResultTranslator#collectEvents} and returns them as a + * {@link Stream}.
  8. + *
+ * + *

{@link #isCreate()} always returns {@code false} — agent-handled events cannot + * create aggregate state. Every agentic aggregate must have a separate deterministic + * {@code @CommandHandler(create = true)} method. + * + *

The returned stream of events is fed back into the standard + * {@code KafkaAggregateRuntime.handleEvent()} flow unchanged, where each event is + * applied through the registered {@code @EventSourcingHandler} methods. + * + * @param the aggregate state type; must implement both {@link AggregateState} + * and {@link MemoryAwareState} + * @param the external domain event type handled by this adapter + * @param the domain event type produced by this adapter + */ +public class AgenticEventHandlerFunctionAdapter + implements EventHandlerFunction { + + private static final Logger logger = + LoggerFactory.getLogger(AgenticEventHandlerFunctionAdapter.class); + + private final AgenticAggregate aggregate; + private final DomainEventType eventType; + private final AgentPlatform agentPlatform; + private final Agent agent; + private final List> producedDomainEventTypes; + private final List> errorEventTypes; + private final Supplier> aggregateServicesSupplier; + + /** + * Creates a new {@code AgenticEventHandlerFunctionAdapter}. + * + * @param aggregate the owning agentic aggregate instance + * @param eventType the external domain event type this adapter handles + * @param agentPlatform the Embabel platform used to create agent processes + * @param agent the deployed {@link Agent} for this aggregate, + * provided by the implementing application + * @param producedDomainEventTypes domain event types this adapter may produce + * @param errorEventTypes error event types this adapter may produce + * @param aggregateServicesSupplier supplier of all known {@link AggregateServiceRecord}s; + * used to populate the blackboard for service discovery + */ + public AgenticEventHandlerFunctionAdapter( + AgenticAggregate aggregate, + DomainEventType eventType, + AgentPlatform agentPlatform, + Agent agent, + List> producedDomainEventTypes, + List> errorEventTypes, + Supplier> aggregateServicesSupplier) { + this.aggregate = aggregate; + this.eventType = eventType; + this.agentPlatform = agentPlatform; + this.agent = agent; + this.producedDomainEventTypes = List.copyOf(producedDomainEventTypes); + this.errorEventTypes = List.copyOf(errorEventTypes); + this.aggregateServicesSupplier = aggregateServicesSupplier; + } + + /** + * Processes the external domain event through the Embabel AI agent framework. + * + *

Populates the agent blackboard with: + *

    + *
  • {@code "event"} — the external domain event being processed
  • + *
  • {@code "state"} — the current aggregate state
  • + *
  • {@code "agenticAggregateId"} — the aggregate identifier
  • + *
  • {@code "memories"} — the list of current memories from state
  • + *
  • {@code "aggregateServices"} — all known aggregate service records
  • + *
  • {@code "isCommandProcessing"} (condition) — {@code false}
  • + *
  • {@code "isExternalEvent"} (condition) — {@code true}
  • + *
  • {@code "hasMemories"} (condition) — whether any memories are present
  • + *
+ * + * @param event the external domain event to process; never {@code null} + * @param state the current aggregate state + * @return a stream of domain events produced by the agent; may be empty + */ + @Nonnull + @Override + @SuppressWarnings("unchecked") + public Stream apply(@Nonnull InputEvent event, S state) { + logger.debug("Processing agent-handled external event {} for aggregate {}", + eventType.typeName(), aggregate.getClass().getSimpleName()); + + Map bindings = new LinkedHashMap<>(); + bindings.put("event", event); + bindings.put("state", state); + bindings.put("agenticAggregateId", state.getAggregateId()); + bindings.put("memories", state.getMemories()); + bindings.put("aggregateServices", aggregateServicesSupplier.get()); + bindings.put("isCommandProcessing", false); + bindings.put("isExternalEvent", true); + bindings.put("hasMemories", !state.getMemories().isEmpty()); + + AgentProcess agentProcess = + agentPlatform.createAgentProcess(agent, ProcessOptions.DEFAULT, bindings); + + while (!agentProcess.getFinished()) { + agentProcess.tick(); + } + + logger.debug("Agent process completed with status {} for event {} on aggregate {}", + agentProcess.getStatus(), eventType.typeName(), aggregate.getClass().getSimpleName()); + + return (Stream) AgentProcessResultTranslator + .collectEvents(agentProcess.getBlackboard()) + .stream(); + } + + /** + * {@inheritDoc} + * + *

Always returns {@code false} — agent-handled external events cannot create + * aggregate state. + */ + @Override + public boolean isCreate() { + return false; + } + + @Override + public DomainEventType getEventType() { + return eventType; + } + + @Override + public Aggregate getAggregate() { + return aggregate; + } + + @Override + public List> getProducedDomainEventTypes() { + return producedDomainEventTypes; + } + + @Override + public List> getErrorEventTypes() { + return errorEventTypes; + } +} diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntime.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntime.java index 8d75f3de..dd58d15a 100644 --- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntime.java +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntime.java @@ -17,6 +17,7 @@ package org.elasticsoftware.akces.agentic.runtime; +import com.embabel.agent.core.AgentPlatform; import org.apache.kafka.common.errors.SerializationException; import org.elasticsoftware.akces.agentic.AgenticAggregateRuntime; import org.elasticsoftware.akces.agentic.events.MemoryRevokedEvent; @@ -52,26 +53,39 @@ public class KafkaAgenticAggregateRuntime implements AgenticAggregateRuntime { private final AggregateRuntime delegate; private final ObjectMapper objectMapper; private final Class stateClass; + private final AgentPlatform agentPlatform; /** * Creates a new {@code KafkaAgenticAggregateRuntime}. * - * @param delegate the underlying aggregate runtime to delegate to - * @param objectMapper Jackson object mapper for state deserialization - * @param stateClass the concrete state class used by this aggregate + * @param delegate the underlying aggregate runtime to delegate to + * @param objectMapper Jackson object mapper for state deserialization + * @param stateClass the concrete state class used by this aggregate + * @param agentPlatform the Embabel {@link AgentPlatform} used for AI-assisted processing; + * must not be {@code null} */ public KafkaAgenticAggregateRuntime(AggregateRuntime delegate, ObjectMapper objectMapper, - Class stateClass) { + Class stateClass, + AgentPlatform agentPlatform) { this.delegate = delegate; this.objectMapper = objectMapper; this.stateClass = stateClass; + this.agentPlatform = agentPlatform; } // ------------------------------------------------------------------------- // AgenticAggregateRuntime extension // ------------------------------------------------------------------------- + /** + * {@inheritDoc} + */ + @Override + public AgentPlatform getAgentPlatform() { + return agentPlatform; + } + /** * {@inheritDoc} * diff --git a/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntimeTest.java b/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntimeTest.java index 30d1003f..bdd24a47 100644 --- a/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntimeTest.java +++ b/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntimeTest.java @@ -17,6 +17,7 @@ package org.elasticsoftware.akces.agentic.runtime; +import com.embabel.agent.core.AgentPlatform; import org.elasticsoftware.akces.agentic.AgenticAggregateRuntime; import org.elasticsoftware.akces.aggregate.*; import org.elasticsoftware.akces.protocol.AggregateStateRecord; @@ -40,10 +41,11 @@ /** * Unit tests for {@link KafkaAgenticAggregateRuntime}, verifying the delegation pattern, - * memory extraction from state records, and the built-in domain event type constants. + * memory extraction from state records, AgentPlatform exposure, and the built-in domain + * event type constants. * - *

Uses Mockito to mock the underlying {@link AggregateRuntime} delegate, avoiding any - * Kafka or Spring dependencies. + *

Uses Mockito to mock the underlying {@link AggregateRuntime} delegate and the + * {@link com.embabel.agent.core.AgentPlatform}, avoiding any Kafka or Spring dependencies. */ @ExtendWith(MockitoExtension.class) class KafkaAgenticAggregateRuntimeTest { @@ -91,13 +93,16 @@ public String getAggregateId() { @Mock private AggregateRuntime delegate; + @Mock + private AgentPlatform agentPlatform; + private ObjectMapper objectMapper; private KafkaAgenticAggregateRuntime runtime; @BeforeEach void setUp() { objectMapper = JsonMapper.builder().build(); - runtime = new KafkaAgenticAggregateRuntime(delegate, objectMapper, TestMemoryState.class); + runtime = new KafkaAgenticAggregateRuntime(delegate, objectMapper, TestMemoryState.class, agentPlatform); } // ------------------------------------------------------------------------- @@ -133,7 +138,7 @@ void getMemoriesShouldReturnMemoriesFromMemoryAwareState() throws IOException { @Test void getMemoriesShouldReturnEmptyListForNonMemoryAwareState() throws IOException { - var plainRuntime = new KafkaAgenticAggregateRuntime(delegate, objectMapper, PlainState.class); + var plainRuntime = new KafkaAgenticAggregateRuntime(delegate, objectMapper, PlainState.class, agentPlatform); var state = new PlainState("agg-1"); byte[] payload = objectMapper.writeValueAsBytes(state); @@ -211,6 +216,11 @@ void getAllCommandTypesShouldDelegateToUnderlyingRuntime() { verify(delegate).getAllCommandTypes(); } + @Test + void getAgentPlatformShouldReturnInjectedPlatform() { + assertThat(runtime.getAgentPlatform()).isSameAs(agentPlatform); + } + // ------------------------------------------------------------------------- // Built-in DomainEventType constants // ------------------------------------------------------------------------- diff --git a/main/api/src/main/java/org/elasticsoftware/akces/annotations/AgenticAggregateInfo.java b/main/api/src/main/java/org/elasticsoftware/akces/annotations/AgenticAggregateInfo.java index fb3ea7c7..b06ab1db 100644 --- a/main/api/src/main/java/org/elasticsoftware/akces/annotations/AgenticAggregateInfo.java +++ b/main/api/src/main/java/org/elasticsoftware/akces/annotations/AgenticAggregateInfo.java @@ -18,6 +18,9 @@ package org.elasticsoftware.akces.annotations; import org.elasticsoftware.akces.aggregate.AggregateState; +import org.elasticsoftware.akces.commands.Command; +import org.elasticsoftware.akces.events.DomainEvent; +import org.elasticsoftware.akces.events.ErrorEvent; import org.springframework.core.annotation.AliasFor; import org.springframework.stereotype.Component; @@ -26,6 +29,24 @@ import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +/** + * Marks a class as an agentic aggregate, a special type of aggregate that integrates with + * the Embabel AI agent framework for AI-assisted command and event processing. + * + *

In addition to the standard aggregate capabilities, an agentic aggregate may declare: + *

    + *
  • {@link #agentHandledCommands} — commands processed entirely by the AI agent (no + * {@code @CommandHandler} method required)
  • + *
  • {@link #agentHandledEvents} — external domain events processed by the AI agent (no + * {@code @EventHandler} method required)
  • + *
  • {@link #agentProducedErrors} — error event types the AI agent may emit during + * processing (registered for schema validation and service discovery)
  • + *
+ * + *

Every {@code AgenticAggregate} must also have at least one deterministic + * {@code @CommandHandler(create = true)} method so that the aggregate can be created + * before the AI agent handles subsequent commands. + */ @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.TYPE}) @Component @@ -42,4 +63,32 @@ * exceeds this limit the oldest entries are evicted to make room for new ones. */ int maxMemories() default 100; + + /** + * Command classes to be processed by the AI agent instead of a deterministic + * {@code @CommandHandler} method. Each class must implement {@link Command} and be + * annotated with {@code @CommandInfo}. + * + *

Agent-handled commands cannot create aggregate state — every agentic aggregate must + * have a separate deterministic {@code @CommandHandler(create = true)} method. + */ + Class[] agentHandledCommands() default {}; + + /** + * External domain event classes to be processed by the AI agent instead of a + * deterministic {@code @EventHandler} method. Each class must implement + * {@link DomainEvent} and be annotated with {@code @DomainEventInfo}. + */ + Class[] agentHandledEvents() default {}; + + /** + * Error event classes that the AI agent may produce during command or event processing. + * Each class must implement {@link ErrorEvent} and be annotated with + * {@code @DomainEventInfo}. + * + *

These types are registered as {@code DomainEventType(error=true)} for JSON schema + * validation and service discovery. At runtime, undeclared {@link ErrorEvent} instances + * emitted by the agent are accepted with a warning rather than rejected. + */ + Class[] agentProducedErrors() default {}; } diff --git a/main/api/src/test/java/org/elasticsoftware/akces/aggregate/AgenticAggregateInfoTest.java b/main/api/src/test/java/org/elasticsoftware/akces/aggregate/AgenticAggregateInfoTest.java index fa513f68..ed0db235 100644 --- a/main/api/src/test/java/org/elasticsoftware/akces/aggregate/AgenticAggregateInfoTest.java +++ b/main/api/src/test/java/org/elasticsoftware/akces/aggregate/AgenticAggregateInfoTest.java @@ -17,7 +17,10 @@ package org.elasticsoftware.akces.aggregate; -import org.elasticsoftware.akces.annotations.AgenticAggregateInfo; +import org.elasticsoftware.akces.annotations.*; +import org.elasticsoftware.akces.commands.Command; +import org.elasticsoftware.akces.events.DomainEvent; +import org.elasticsoftware.akces.events.ErrorEvent; import org.junit.jupiter.api.Test; import org.springframework.stereotype.Component; @@ -117,4 +120,138 @@ void requiredAttributesShouldBePresent() { assertThat(hasValue).as("value() attribute must be present").isTrue(); assertThat(hasStateClass).as("stateClass() attribute must be present").isTrue(); } + + // ------------------------------------------------------------------------- + // Test fixtures for new agentic annotation properties + // ------------------------------------------------------------------------- + + /** Stub command for agent-handled command tests. */ + @CommandInfo(type = "TestAgentCommand", version = 1) + record TestAgentCommand(@AggregateIdentifier String id) implements Command { + @Override + public String getAggregateId() { + return id; + } + } + + /** Stub external event for agent-handled event tests. */ + @DomainEventInfo(type = "TestExternalEvent", version = 1) + record TestExternalEvent(@AggregateIdentifier String id) implements DomainEvent { + @Override + public String getAggregateId() { + return id; + } + } + + /** Stub error event for agent-produced error tests. */ + @DomainEventInfo(type = "TestAgentError", version = 1) + record TestAgentError(@AggregateIdentifier String id) implements ErrorEvent { + @Override + public String getAggregateId() { + return id; + } + } + + /** Aggregate with all three new annotation properties populated. */ + @AgenticAggregateInfo( + value = "FullAgentic", + stateClass = TestState.class, + agentHandledCommands = {TestAgentCommand.class}, + agentHandledEvents = {TestExternalEvent.class}, + agentProducedErrors = {TestAgentError.class} + ) + static class FullAgenticAggregate implements AgenticAggregate { + @Override + public Class getStateClass() { + return TestState.class; + } + } + + // ------------------------------------------------------------------------- + // Tests for agentHandledCommands + // ------------------------------------------------------------------------- + + @Test + void agentHandledCommandsShouldDefaultToEmpty() { + AgenticAggregateInfo info = DefaultAgenticAggregate.class.getAnnotation(AgenticAggregateInfo.class); + assertThat(info).isNotNull(); + assertThat(info.agentHandledCommands()).isEmpty(); + } + + @Test + void agentHandledCommandsShouldReturnConfiguredClasses() { + AgenticAggregateInfo info = FullAgenticAggregate.class.getAnnotation(AgenticAggregateInfo.class); + assertThat(info).isNotNull(); + assertThat(info.agentHandledCommands()).containsExactly(TestAgentCommand.class); + } + + @Test + void agentHandledCommandsShouldOnlyAcceptCommandImplementations() { + // Verify that the declared Command class actually implements Command + AgenticAggregateInfo info = FullAgenticAggregate.class.getAnnotation(AgenticAggregateInfo.class); + assertThat(info).isNotNull(); + for (Class commandClass : info.agentHandledCommands()) { + assertThat(Command.class.isAssignableFrom(commandClass)) + .as("Agent-handled command %s must implement Command", commandClass.getName()) + .isTrue(); + } + } + + // ------------------------------------------------------------------------- + // Tests for agentHandledEvents + // ------------------------------------------------------------------------- + + @Test + void agentHandledEventsShouldDefaultToEmpty() { + AgenticAggregateInfo info = DefaultAgenticAggregate.class.getAnnotation(AgenticAggregateInfo.class); + assertThat(info).isNotNull(); + assertThat(info.agentHandledEvents()).isEmpty(); + } + + @Test + void agentHandledEventsShouldReturnConfiguredClasses() { + AgenticAggregateInfo info = FullAgenticAggregate.class.getAnnotation(AgenticAggregateInfo.class); + assertThat(info).isNotNull(); + assertThat(info.agentHandledEvents()).containsExactly(TestExternalEvent.class); + } + + @Test + void agentHandledEventsShouldOnlyAcceptDomainEventImplementations() { + AgenticAggregateInfo info = FullAgenticAggregate.class.getAnnotation(AgenticAggregateInfo.class); + assertThat(info).isNotNull(); + for (Class eventClass : info.agentHandledEvents()) { + assertThat(DomainEvent.class.isAssignableFrom(eventClass)) + .as("Agent-handled event %s must implement DomainEvent", eventClass.getName()) + .isTrue(); + } + } + + // ------------------------------------------------------------------------- + // Tests for agentProducedErrors + // ------------------------------------------------------------------------- + + @Test + void agentProducedErrorsShouldDefaultToEmpty() { + AgenticAggregateInfo info = DefaultAgenticAggregate.class.getAnnotation(AgenticAggregateInfo.class); + assertThat(info).isNotNull(); + assertThat(info.agentProducedErrors()).isEmpty(); + } + + @Test + void agentProducedErrorsShouldReturnConfiguredClasses() { + AgenticAggregateInfo info = FullAgenticAggregate.class.getAnnotation(AgenticAggregateInfo.class); + assertThat(info).isNotNull(); + assertThat(info.agentProducedErrors()).containsExactly(TestAgentError.class); + } + + @Test + void agentProducedErrorsShouldOnlyAcceptErrorEventImplementations() { + AgenticAggregateInfo info = FullAgenticAggregate.class.getAnnotation(AgenticAggregateInfo.class); + assertThat(info).isNotNull(); + for (Class errorClass : info.agentProducedErrors()) { + assertThat(ErrorEvent.class.isAssignableFrom(errorClass)) + .as("Agent-produced error %s must implement ErrorEvent", errorClass.getName()) + .isTrue(); + } + } } From 8b2e702eb0e6a64c2393bd5599bba11e51641eb2 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 8 Apr 2026 17:10:27 +0000 Subject: [PATCH 3/7] =?UTF-8?q?fix:=20address=20code=20review=20=E2=80=94?= =?UTF-8?q?=20expand=20ESH=20comment,=20add=20TODO=20on=20aggregateService?= =?UTF-8?q?sSupplier,=20clarify=20tick=20loop=20intent?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Agent-Logs-Url: https://github.com/elasticsoftwarefoundation/akces-framework/sessions/88dfa45b-d88a-42e2-9027-be4b23fea52d Co-authored-by: jwijgerd <914840+jwijgerd@users.noreply.github.com> --- .../agentic/beans/AgenticAggregateRuntimeFactory.java | 10 ++++++---- .../runtime/AgenticCommandHandlerFunctionAdapter.java | 4 ++++ .../runtime/AgenticEventHandlerFunctionAdapter.java | 4 ++++ 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java index 38452964..1ef80c91 100644 --- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java @@ -384,9 +384,10 @@ private void processAgentHandledCommand( commandType, agentPlatform, agent, - (List) List.of(), // producedDomainEventTypes: empty (events registered via ESH) + (List) List.of(), // producedDomainEventTypes: empty — events are registered via EventSourcingHandler adapters (List) errorTypes, - Collections::emptyList); // aggregateServicesSupplier: wired by controller in later phase + // TODO: wire aggregateServicesSupplier from AkcesAgenticAggregateController (Phase 3) + Collections::emptyList); runtimeBuilder.addCommandHandler(commandType, adapter).addCommand(commandType); errorTypes.forEach(runtimeBuilder::addDomainEvent); @@ -446,9 +447,10 @@ private void processAgentHandledEvent( eventType, agentPlatform, agent, - (List) List.of(), // producedDomainEventTypes: empty (events registered via ESH) + (List) List.of(), // producedDomainEventTypes: empty — events are registered via EventSourcingHandler adapters (List) errorTypes, - Collections::emptyList); // aggregateServicesSupplier: wired by controller in later phase + // TODO: wire aggregateServicesSupplier from AkcesAgenticAggregateController (Phase 3) + Collections::emptyList); runtimeBuilder.addExternalEventHandler(eventType, adapter).addDomainEvent(eventType); errorTypes.forEach(runtimeBuilder::addDomainEvent); diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java index 5597da71..ee57be5b 100644 --- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java @@ -151,6 +151,10 @@ public Stream apply(@Nonnull C command, S state) { AgentProcess agentProcess = agentPlatform.createAgentProcess(agent, ProcessOptions.DEFAULT, bindings); + // Tick to completion. Phase 1 runs the full agent process synchronously. + // Timeout configuration and incremental tick support are deferred to a later phase + // (see plans/agenttasks.md). The transaction timeout is controlled externally via + // the 'akces.agentic.transaction-timeout-ms' property on the producer factory. while (!agentProcess.getFinished()) { agentProcess.tick(); } diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java index cb7b1083..db09bc95 100644 --- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java @@ -151,6 +151,10 @@ public Stream apply(@Nonnull InputEvent event, S state) { AgentProcess agentProcess = agentPlatform.createAgentProcess(agent, ProcessOptions.DEFAULT, bindings); + // Tick to completion. Phase 1 runs the full agent process synchronously. + // Timeout configuration and incremental tick support are deferred to a later phase + // (see plans/agenttasks.md). The transaction timeout is controlled externally via + // the 'akces.agentic.transaction-timeout-ms' property on the producer factory. while (!agentProcess.getFinished()) { agentProcess.tick(); } From 138a4752002a600ed87a86d947aad0904824b023 Mon Sep 17 00:00:00 2001 From: Joost van de Wijgerd Date: Thu, 9 Apr 2026 07:18:57 +0200 Subject: [PATCH 4/7] Apply suggestions from code review Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../beans/AgenticAggregateRuntimeFactory.java | 14 +++----- .../AgenticCommandHandlerFunctionAdapter.java | 5 ++- .../AgenticEventHandlerFunctionAdapter.java | 36 ++++++++++++++----- 3 files changed, 34 insertions(+), 21 deletions(-) diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java index 1ef80c91..55fc8f60 100644 --- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java @@ -93,18 +93,12 @@ public class AgenticAggregateRuntimeFactory LoggerFactory.getLogger(AgenticAggregateRuntimeFactory.class); /** System error types added to every non-create command handler (including agent-handled). */ - private static final List> COMMAND_HANDLER_SYSTEM_ERRORS = List.of( - new DomainEventType<>("AggregateNotFoundError", 1, - AggregateNotFoundErrorEvent.class, false, false, true, false), - new DomainEventType<>("CommandExecutionError", 1, - CommandExecutionErrorEvent.class, false, false, true, false) - ); + private static final List> COMMAND_HANDLER_SYSTEM_ERRORS = + AgenticAggregateBeanFactoryPostProcessor.COMMAND_HANDLER_SYSTEM_ERRORS; /** System error types added to every non-create event handler (including agent-handled). */ - private static final List> EVENT_HANDLER_SYSTEM_ERRORS = List.of( - new DomainEventType<>("AggregateNotFoundError", 1, - AggregateNotFoundErrorEvent.class, false, false, true, false) - ); + private static final List> EVENT_HANDLER_SYSTEM_ERRORS = + AgenticAggregateBeanFactoryPostProcessor.EVENT_HANDLER_SYSTEM_ERRORS; private ApplicationContext applicationContext; private final ObjectMapper objectMapper; diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java index ee57be5b..4ac4d2a2 100644 --- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java @@ -63,12 +63,11 @@ * {@code KafkaAggregateRuntime.handleCommand()} flow unchanged, where each event is * applied through the registered {@code @EventSourcingHandler} methods. * - * @param the aggregate state type; must implement both {@link AggregateState} and - * {@link MemoryAwareState} + * @param the aggregate state type; must implement {@link AggregateState} * @param the command type handled by this adapter * @param the domain event type produced by this adapter */ -public class AgenticCommandHandlerFunctionAdapter +public class AgenticCommandHandlerFunctionAdapter implements CommandHandlerFunction { private static final Logger logger = diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java index db09bc95..d1916ffd 100644 --- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java @@ -63,12 +63,11 @@ * {@code KafkaAggregateRuntime.handleEvent()} flow unchanged, where each event is * applied through the registered {@code @EventSourcingHandler} methods. * - * @param the aggregate state type; must implement both {@link AggregateState} - * and {@link MemoryAwareState} + * @param the aggregate state type; must implement {@link AggregateState} * @param the external domain event type handled by this adapter * @param the domain event type produced by this adapter */ -public class AgenticEventHandlerFunctionAdapter +public class AgenticEventHandlerFunctionAdapter implements EventHandlerFunction { private static final Logger logger = @@ -151,14 +150,35 @@ public Stream apply(@Nonnull InputEvent event, S state) { AgentProcess agentProcess = agentPlatform.createAgentProcess(agent, ProcessOptions.DEFAULT, bindings); - // Tick to completion. Phase 1 runs the full agent process synchronously. - // Timeout configuration and incremental tick support are deferred to a later phase - // (see plans/agenttasks.md). The transaction timeout is controlled externally via - // the 'akces.agentic.transaction-timeout-ms' property on the producer factory. - while (!agentProcess.getFinished()) { + // Tick to completion with defensive limits so a stuck agent process cannot + // block external event handling indefinitely. + final long maxTicks = 10_000L; + final long timeoutNanos = java.util.concurrent.TimeUnit.SECONDS.toNanos(30); + final long deadlineNanos = System.nanoTime() + timeoutNanos; + long tickCount = 0L; + + while (!agentProcess.getFinished() + && tickCount < maxTicks + && System.nanoTime() < deadlineNanos) { agentProcess.tick(); + tickCount++; } + if (!agentProcess.getFinished()) { + logger.error( + "Agent process did not finish within safety limits for event {} on aggregate {}. " + + "aggregateId={}, tickCount={}, maxTicks={}, timeoutSeconds={}, status={}", + eventType.typeName(), + aggregate.getClass().getSimpleName(), + state.getAggregateId(), + tickCount, + maxTicks, + java.util.concurrent.TimeUnit.NANOSECONDS.toSeconds(timeoutNanos), + agentProcess.getStatus()); + throw new IllegalStateException( + "Agent process exceeded execution limits for event " + eventType.typeName() + + " on aggregate " + aggregate.getClass().getSimpleName()); + } logger.debug("Agent process completed with status {} for event {} on aggregate {}", agentProcess.getStatus(), eventType.typeName(), aggregate.getClass().getSimpleName()); From 68f371b8f515b31615e91ddb84ed70f3be07d27a Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 9 Apr 2026 05:29:07 +0000 Subject: [PATCH 5/7] =?UTF-8?q?fix:=20address=20review=20comments=20?= =?UTF-8?q?=E2=80=94=20null=20guards=20in=20constructor,=20unknown=20Error?= =?UTF-8?q?Event=20handling,=20MemoryAwareState=20guard?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - KafkaAgenticAggregateRuntime: add Objects.requireNonNull for all 4 constructor params - AgentProcessResultTranslator: accept registeredEventTypes param; exclude unknown ErrorEvents with WARN log instead of passing them to processDomainEvent() where they would NPE - Both adapter apply() methods: use instanceof MemoryAwareState guard instead of requiring S extends MemoryAwareState bound; pass getAllRegisteredEventTypes() to translator - Fix @AgenticAggregateInfo.agentProducedErrors() Javadoc to accurately describe exclusion - Add AgentProcessResultTranslatorTest (8 tests) covering all event/error scenarios - Add 4 constructor null-check tests to KafkaAgenticAggregateRuntimeTest Agent-Logs-Url: https://github.com/elasticsoftwarefoundation/akces-framework/sessions/d56d40b0-ba4b-4381-8b08-568d5b4098e2 Co-authored-by: jwijgerd <914840+jwijgerd@users.noreply.github.com> --- .../runtime/AgentProcessResultTranslator.java | 79 +++++-- .../AgenticCommandHandlerFunctionAdapter.java | 24 ++- .../AgenticEventHandlerFunctionAdapter.java | 24 ++- .../runtime/KafkaAgenticAggregateRuntime.java | 9 +- .../AgentProcessResultTranslatorTest.java | 194 ++++++++++++++++++ .../KafkaAgenticAggregateRuntimeTest.java | 33 +++ .../annotations/AgenticAggregateInfo.java | 9 +- 7 files changed, 338 insertions(+), 34 deletions(-) create mode 100644 main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/AgentProcessResultTranslatorTest.java diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgentProcessResultTranslator.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgentProcessResultTranslator.java index 55e33527..a227788b 100644 --- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgentProcessResultTranslator.java +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgentProcessResultTranslator.java @@ -18,26 +18,36 @@ package org.elasticsoftware.akces.agentic.runtime; import com.embabel.agent.core.Blackboard; +import org.elasticsoftware.akces.aggregate.DomainEventType; import org.elasticsoftware.akces.events.DomainEvent; import org.elasticsoftware.akces.events.ErrorEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collection; import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; /** * Utility class that translates the results of an Embabel {@code AgentProcess} back * into Akces {@link DomainEvent} instances. * *

After each agent tick (or after the process reaches an end state), call - * {@link #collectEvents(Blackboard)} to drain {@link DomainEvent} objects from the - * blackboard. Events are marked as hidden on the blackboard after collection, so - * subsequent calls to this method will not return the same events again — this + * {@link #collectEvents(Blackboard, Collection)} to drain {@link DomainEvent} objects + * from the blackboard. Events are marked as hidden on the blackboard after collection, + * so subsequent calls to this method will not return the same events again — this * supports both tick-to-completion and future incremental-tick processing patterns. * - *

Undeclared {@link ErrorEvent} instances are accepted at runtime. A warning is - * logged so operators can detect unexpected error types without causing the transaction - * to abort. + *

Unknown {@link ErrorEvent} types (not declared in {@code agentProducedErrors} and + * therefore not registered as {@link DomainEventType}s in the runtime) are logged at + * {@code WARN} level and excluded from the returned list. This prevents + * {@code processDomainEvent()} from encountering a {@code null} type look-up and + * NPE-ing, while still allowing the transaction to commit with a meaningful result. + * Standard (non-error) {@link DomainEvent} types are always included and must be + * registered; passing an unregistered non-error event to the runtime will result in + * a {@code NullPointerException} in {@code processDomainEvent()}. */ public final class AgentProcessResultTranslator { @@ -48,31 +58,56 @@ private AgentProcessResultTranslator() { } /** - * Collects all {@link DomainEvent} objects currently visible on the blackboard - * and removes them from visible scope (via {@link Blackboard#hide(Object)}) so that - * they are not collected again on the next call. + * Collects all {@link DomainEvent} objects currently visible on the blackboard, + * removes them from visible scope (via {@link Blackboard#hide(Object)}), and returns + * the subset that can safely be passed to the runtime's {@code processDomainEvent()} + * method. * - *

Any {@link ErrorEvent} instances found are logged at {@code WARN} level so - * that operators can detect unexpected error types emitted by the agent. + *

For every collected event: + *

    + *
  • Non-error {@link DomainEvent}s are always included and passed through as-is.
  • + *
  • {@link ErrorEvent}s whose class is present in {@code registeredEventTypes} are + * included.
  • + *
  • {@link ErrorEvent}s whose class is not present in + * {@code registeredEventTypes} are logged at {@code WARN} level and excluded. + * Excluding them prevents a {@code NullPointerException} inside + * {@code KafkaAggregateRuntime.processDomainEvent()} that would otherwise occur + * when the runtime looks up the unregistered type. The transaction still commits + * normally; only the unknown error event is silently dropped.
  • + *
* - * @param blackboard the agent process blackboard to drain events from - * @return an unmodifiable list of collected domain events in the order they - * were encountered on the blackboard; never {@code null}, may be empty + * @param blackboard the agent process blackboard to drain events from + * @param registeredEventTypes all {@link DomainEventType}s registered with the runtime; + * used to verify that agent-produced {@link ErrorEvent}s are + * known before passing them downstream + * @return an unmodifiable list of domain events that are safe to pass to the runtime; + * never {@code null}, may be empty */ - public static List collectEvents(Blackboard blackboard) { - List events = blackboard.getObjects().stream() + public static List collectEvents(Blackboard blackboard, + Collection> registeredEventTypes) { + Set> registeredClasses = registeredEventTypes.stream() + .map(DomainEventType::typeClass) + .collect(Collectors.toSet()); + + List allEvents = blackboard.getObjects().stream() .filter(o -> o instanceof DomainEvent) .map(o -> (DomainEvent) o) .toList(); - for (DomainEvent event : events) { - if (event instanceof ErrorEvent) { - logger.warn("Agent produced error event of type {}: {}", + List result = new ArrayList<>(allEvents.size()); + for (DomainEvent event : allEvents) { + blackboard.hide(event); + if (event instanceof ErrorEvent && !registeredClasses.contains(event.getClass())) { + logger.warn( + "Agent produced undeclared error event of type '{}' which is not registered " + + "as a DomainEventType. The event will be excluded from processing to " + + "prevent a runtime failure. Declare it in agentProducedErrors to enable " + + "serialization and service discovery. Event: {}", event.getClass().getName(), event); + } else { + result.add(event); } - blackboard.hide(event); } - - return events; + return List.copyOf(result); } } diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java index 4ac4d2a2..53c1473c 100644 --- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java @@ -29,6 +29,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Collection; import java.util.LinkedHashMap; import java.util.List; @@ -141,11 +142,14 @@ public Stream apply(@Nonnull C command, S state) { bindings.put("command", command); bindings.put("state", state); bindings.put("agenticAggregateId", state.getAggregateId()); - bindings.put("memories", state.getMemories()); + List memories = state instanceof MemoryAwareState mas + ? mas.getMemories() + : List.of(); + bindings.put("memories", memories); bindings.put("aggregateServices", aggregateServicesSupplier.get()); bindings.put("isCommandProcessing", true); bindings.put("isExternalEvent", false); - bindings.put("hasMemories", !state.getMemories().isEmpty()); + bindings.put("hasMemories", !memories.isEmpty()); AgentProcess agentProcess = agentPlatform.createAgentProcess(agent, ProcessOptions.DEFAULT, bindings); @@ -162,7 +166,7 @@ public Stream apply(@Nonnull C command, S state) { agentProcess.getStatus(), commandType.typeName(), aggregate.getClass().getSimpleName()); return (Stream) AgentProcessResultTranslator - .collectEvents(agentProcess.getBlackboard()) + .collectEvents(agentProcess.getBlackboard(), getAllRegisteredEventTypes()) .stream(); } @@ -197,4 +201,18 @@ public List> getProducedDomainEventTypes() { public List> getErrorEventTypes() { return errorEventTypes; } + + /** + * Returns all domain event types this adapter may produce (both state-changing and error types). + * Used by {@link AgentProcessResultTranslator} to filter out unknown {@link org.elasticsoftware.akces.events.ErrorEvent} + * instances that are not registered with the runtime. + * + * @return combined list of produced and error domain event types + */ + private List> getAllRegisteredEventTypes() { + List> all = new ArrayList<>(producedDomainEventTypes.size() + errorEventTypes.size()); + all.addAll(producedDomainEventTypes); + all.addAll(errorEventTypes); + return all; + } } diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java index d1916ffd..fd0b35c0 100644 --- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java @@ -29,6 +29,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Collection; import java.util.LinkedHashMap; import java.util.List; @@ -141,11 +142,14 @@ public Stream apply(@Nonnull InputEvent event, S state) { bindings.put("event", event); bindings.put("state", state); bindings.put("agenticAggregateId", state.getAggregateId()); - bindings.put("memories", state.getMemories()); + List memories = state instanceof MemoryAwareState mas + ? mas.getMemories() + : List.of(); + bindings.put("memories", memories); bindings.put("aggregateServices", aggregateServicesSupplier.get()); bindings.put("isCommandProcessing", false); bindings.put("isExternalEvent", true); - bindings.put("hasMemories", !state.getMemories().isEmpty()); + bindings.put("hasMemories", !memories.isEmpty()); AgentProcess agentProcess = agentPlatform.createAgentProcess(agent, ProcessOptions.DEFAULT, bindings); @@ -183,7 +187,7 @@ public Stream apply(@Nonnull InputEvent event, S state) { agentProcess.getStatus(), eventType.typeName(), aggregate.getClass().getSimpleName()); return (Stream) AgentProcessResultTranslator - .collectEvents(agentProcess.getBlackboard()) + .collectEvents(agentProcess.getBlackboard(), getAllRegisteredEventTypes()) .stream(); } @@ -217,4 +221,18 @@ public List> getProducedDomainEventTypes() { public List> getErrorEventTypes() { return errorEventTypes; } + + /** + * Returns all domain event types this adapter may produce (both state-changing and error types). + * Used by {@link AgentProcessResultTranslator} to filter out unknown {@link org.elasticsoftware.akces.events.ErrorEvent} + * instances that are not registered with the runtime. + * + * @return combined list of produced and error domain event types + */ + private List> getAllRegisteredEventTypes() { + List> all = new ArrayList<>(producedDomainEventTypes.size() + errorEventTypes.size()); + all.addAll(producedDomainEventTypes); + all.addAll(errorEventTypes); + return all; + } } diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntime.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntime.java index dd58d15a..3597ea8b 100644 --- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntime.java +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntime.java @@ -37,6 +37,7 @@ import java.io.IOException; import java.util.Collection; import java.util.List; +import java.util.Objects; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Supplier; @@ -68,10 +69,10 @@ public KafkaAgenticAggregateRuntime(AggregateRuntime delegate, ObjectMapper objectMapper, Class stateClass, AgentPlatform agentPlatform) { - this.delegate = delegate; - this.objectMapper = objectMapper; - this.stateClass = stateClass; - this.agentPlatform = agentPlatform; + this.delegate = Objects.requireNonNull(delegate, "delegate must not be null"); + this.objectMapper = Objects.requireNonNull(objectMapper, "objectMapper must not be null"); + this.stateClass = Objects.requireNonNull(stateClass, "stateClass must not be null"); + this.agentPlatform = Objects.requireNonNull(agentPlatform, "agentPlatform must not be null"); } // ------------------------------------------------------------------------- diff --git a/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/AgentProcessResultTranslatorTest.java b/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/AgentProcessResultTranslatorTest.java new file mode 100644 index 00000000..116a8d66 --- /dev/null +++ b/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/AgentProcessResultTranslatorTest.java @@ -0,0 +1,194 @@ +/* + * Copyright 2022 - 2026 The Original Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.elasticsoftware.akces.agentic.runtime; + +import com.embabel.agent.core.Blackboard; +import com.embabel.agent.core.support.InMemoryBlackboard; +import org.elasticsoftware.akces.aggregate.DomainEventType; +import org.elasticsoftware.akces.annotations.AggregateIdentifier; +import org.elasticsoftware.akces.annotations.DomainEventInfo; +import org.elasticsoftware.akces.events.DomainEvent; +import org.elasticsoftware.akces.events.ErrorEvent; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Unit tests for {@link AgentProcessResultTranslator}, verifying event collection, + * blackboard draining, and handling of registered vs. unregistered error events. + */ +class AgentProcessResultTranslatorTest { + + // ------------------------------------------------------------------------- + // Test fixtures + // ------------------------------------------------------------------------- + + @DomainEventInfo(type = "TestStateChanged", version = 1) + record TestStateChangedEvent(@AggregateIdentifier String id) implements DomainEvent { + @Override + public String getAggregateId() { + return id; + } + } + + @DomainEventInfo(type = "TestRegisteredError", version = 1) + record TestRegisteredErrorEvent(@AggregateIdentifier String id) implements ErrorEvent { + @Override + public String getAggregateId() { + return id; + } + } + + @DomainEventInfo(type = "TestUnregisteredError", version = 1) + record TestUnregisteredErrorEvent(@AggregateIdentifier String id) implements ErrorEvent { + @Override + public String getAggregateId() { + return id; + } + } + + private static final DomainEventType STATE_CHANGED_TYPE = + new DomainEventType<>("TestStateChanged", 1, TestStateChangedEvent.class, + false, false, false, false); + + private static final DomainEventType REGISTERED_ERROR_TYPE = + new DomainEventType<>("TestRegisteredError", 1, TestRegisteredErrorEvent.class, + false, false, true, false); + + private Blackboard blackboard; + + @BeforeEach + void setUp() { + blackboard = new InMemoryBlackboard(); + } + + // ------------------------------------------------------------------------- + // Empty blackboard + // ------------------------------------------------------------------------- + + @Test + void collectEventsShouldReturnEmptyListForEmptyBlackboard() { + List events = AgentProcessResultTranslator.collectEvents(blackboard, List.of()); + assertThat(events).isEmpty(); + } + + // ------------------------------------------------------------------------- + // Non-error domain events + // ------------------------------------------------------------------------- + + @Test + void collectEventsShouldIncludeRegisteredStateChangingEvent() { + TestStateChangedEvent event = new TestStateChangedEvent("agg-1"); + blackboard.addObject(event); + + List events = AgentProcessResultTranslator.collectEvents( + blackboard, List.of(STATE_CHANGED_TYPE)); + + assertThat(events).containsExactly(event); + } + + @Test + void collectEventsShouldHideEventsAfterCollection() { + TestStateChangedEvent event = new TestStateChangedEvent("agg-1"); + blackboard.addObject(event); + + AgentProcessResultTranslator.collectEvents(blackboard, List.of(STATE_CHANGED_TYPE)); + + // Second call should return nothing — events were hidden on first call + List second = AgentProcessResultTranslator.collectEvents( + blackboard, List.of(STATE_CHANGED_TYPE)); + assertThat(second).isEmpty(); + } + + @Test + void collectEventsShouldNotIncludeNonDomainEventObjects() { + blackboard.addObject("not an event"); + blackboard.addObject(42); + blackboard.addObject(new TestStateChangedEvent("agg-1")); + + List events = AgentProcessResultTranslator.collectEvents( + blackboard, List.of(STATE_CHANGED_TYPE)); + + assertThat(events).hasSize(1); + assertThat(events.getFirst()).isInstanceOf(TestStateChangedEvent.class); + } + + // ------------------------------------------------------------------------- + // Registered error events — must be included + // ------------------------------------------------------------------------- + + @Test + void collectEventsShouldIncludeRegisteredErrorEvent() { + TestRegisteredErrorEvent error = new TestRegisteredErrorEvent("agg-1"); + blackboard.addObject(error); + + List events = AgentProcessResultTranslator.collectEvents( + blackboard, List.of(REGISTERED_ERROR_TYPE)); + + assertThat(events).containsExactly(error); + } + + // ------------------------------------------------------------------------- + // Unregistered error events — must be excluded with warning + // ------------------------------------------------------------------------- + + @Test + void collectEventsShouldExcludeUnregisteredErrorEvent() { + TestUnregisteredErrorEvent unknownError = new TestUnregisteredErrorEvent("agg-1"); + blackboard.addObject(unknownError); + + // Only STATE_CHANGED_TYPE and REGISTERED_ERROR_TYPE are registered — not the unknown one + List events = AgentProcessResultTranslator.collectEvents( + blackboard, List.of(STATE_CHANGED_TYPE, REGISTERED_ERROR_TYPE)); + + assertThat(events).isEmpty(); + } + + @Test + void collectEventsShouldExcludeUnregisteredErrorAndKeepRegisteredEvents() { + TestStateChangedEvent stateEvent = new TestStateChangedEvent("agg-1"); + TestRegisteredErrorEvent registeredError = new TestRegisteredErrorEvent("agg-1"); + TestUnregisteredErrorEvent unknownError = new TestUnregisteredErrorEvent("agg-1"); + blackboard.addObject(stateEvent); + blackboard.addObject(registeredError); + blackboard.addObject(unknownError); + + List events = AgentProcessResultTranslator.collectEvents( + blackboard, List.of(STATE_CHANGED_TYPE, REGISTERED_ERROR_TYPE)); + + assertThat(events).containsExactlyInAnyOrder(stateEvent, registeredError); + assertThat(events).doesNotContain(unknownError); + } + + @Test + void collectEventsShouldHideUnregisteredErrorEventSoItIsNotReturnedAgain() { + TestUnregisteredErrorEvent unknownError = new TestUnregisteredErrorEvent("agg-1"); + blackboard.addObject(unknownError); + + AgentProcessResultTranslator.collectEvents( + blackboard, List.of(STATE_CHANGED_TYPE, REGISTERED_ERROR_TYPE)); + + // The unknown error was hidden — second call also returns nothing + List second = AgentProcessResultTranslator.collectEvents( + blackboard, List.of(STATE_CHANGED_TYPE, REGISTERED_ERROR_TYPE)); + assertThat(second).isEmpty(); + } +} diff --git a/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntimeTest.java b/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntimeTest.java index bdd24a47..f3b15b88 100644 --- a/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntimeTest.java +++ b/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntimeTest.java @@ -37,6 +37,7 @@ import java.util.List; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNullPointerException; import static org.mockito.Mockito.*; /** @@ -221,6 +222,38 @@ void getAgentPlatformShouldReturnInjectedPlatform() { assertThat(runtime.getAgentPlatform()).isSameAs(agentPlatform); } + @Test + void constructorShouldRejectNullDelegate() { + assertThatNullPointerException() + .isThrownBy(() -> new KafkaAgenticAggregateRuntime( + null, objectMapper, TestMemoryState.class, agentPlatform)) + .withMessageContaining("delegate"); + } + + @Test + void constructorShouldRejectNullObjectMapper() { + assertThatNullPointerException() + .isThrownBy(() -> new KafkaAgenticAggregateRuntime( + delegate, null, TestMemoryState.class, agentPlatform)) + .withMessageContaining("objectMapper"); + } + + @Test + void constructorShouldRejectNullStateClass() { + assertThatNullPointerException() + .isThrownBy(() -> new KafkaAgenticAggregateRuntime( + delegate, objectMapper, null, agentPlatform)) + .withMessageContaining("stateClass"); + } + + @Test + void constructorShouldRejectNullAgentPlatform() { + assertThatNullPointerException() + .isThrownBy(() -> new KafkaAgenticAggregateRuntime( + delegate, objectMapper, TestMemoryState.class, null)) + .withMessageContaining("agentPlatform"); + } + // ------------------------------------------------------------------------- // Built-in DomainEventType constants // ------------------------------------------------------------------------- diff --git a/main/api/src/main/java/org/elasticsoftware/akces/annotations/AgenticAggregateInfo.java b/main/api/src/main/java/org/elasticsoftware/akces/annotations/AgenticAggregateInfo.java index b06ab1db..e941312f 100644 --- a/main/api/src/main/java/org/elasticsoftware/akces/annotations/AgenticAggregateInfo.java +++ b/main/api/src/main/java/org/elasticsoftware/akces/annotations/AgenticAggregateInfo.java @@ -87,8 +87,13 @@ * {@code @DomainEventInfo}. * *

These types are registered as {@code DomainEventType(error=true)} for JSON schema - * validation and service discovery. At runtime, undeclared {@link ErrorEvent} instances - * emitted by the agent are accepted with a warning rather than rejected. + * validation and service discovery. Agent-produced error events that are not + * declared here are not registered with the runtime and will be silently excluded from + * processing — they will not be serialized to the Kafka domain-events topic. A warning + * is logged for each excluded event so that operators can detect the misconfiguration. + * + *

To ensure an error event produced by the agent is persisted and visible to + * downstream consumers, it must be declared here. */ Class[] agentProducedErrors() default {}; } From 77c35e57d59361a88c7e849041c59ca80ab31a75 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 9 Apr 2026 05:41:20 +0000 Subject: [PATCH 6/7] fix: add defensive tick limit to AgenticCommandHandlerFunctionAdapter matching EventHandlerFunctionAdapter Agent-Logs-Url: https://github.com/elasticsoftwarefoundation/akces-framework/sessions/2d9121bb-7262-4cfc-b4fd-a41735f4aa01 Co-authored-by: jwijgerd <914840+jwijgerd@users.noreply.github.com> --- .../AgenticCommandHandlerFunctionAdapter.java | 32 ++++++++++++++++--- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java index 53c1473c..718f90cf 100644 --- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java @@ -154,12 +154,34 @@ public Stream apply(@Nonnull C command, S state) { AgentProcess agentProcess = agentPlatform.createAgentProcess(agent, ProcessOptions.DEFAULT, bindings); - // Tick to completion. Phase 1 runs the full agent process synchronously. - // Timeout configuration and incremental tick support are deferred to a later phase - // (see plans/agenttasks.md). The transaction timeout is controlled externally via - // the 'akces.agentic.transaction-timeout-ms' property on the producer factory. - while (!agentProcess.getFinished()) { + // Tick to completion with defensive limits so a stuck agent process cannot + // block command handling indefinitely. + final long maxTicks = 10_000L; + final long timeoutNanos = java.util.concurrent.TimeUnit.SECONDS.toNanos(30); + final long deadlineNanos = System.nanoTime() + timeoutNanos; + long tickCount = 0L; + + while (!agentProcess.getFinished() + && tickCount < maxTicks + && System.nanoTime() < deadlineNanos) { agentProcess.tick(); + tickCount++; + } + + if (!agentProcess.getFinished()) { + logger.error( + "Agent process did not finish within safety limits for command {} on aggregate {}. " + + "aggregateId={}, tickCount={}, maxTicks={}, timeoutSeconds={}, status={}", + commandType.typeName(), + aggregate.getClass().getSimpleName(), + state.getAggregateId(), + tickCount, + maxTicks, + java.util.concurrent.TimeUnit.NANOSECONDS.toSeconds(timeoutNanos), + agentProcess.getStatus()); + throw new IllegalStateException( + "Agent process exceeded execution limits for command " + commandType.typeName() + + " on aggregate " + aggregate.getClass().getSimpleName()); } logger.debug("Agent process completed with status {} for command {} on aggregate {}", From 2c96ce24b3bd10df22ef642d0e77970ea379c238 Mon Sep 17 00:00:00 2001 From: Joost van de Wijgerd Date: Thu, 9 Apr 2026 07:42:57 +0200 Subject: [PATCH 7/7] Apply suggestions from code review Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../akces/agentic/beans/AgenticAggregateRuntimeFactory.java | 2 -- .../agentic/runtime/AgenticEventHandlerFunctionAdapter.java | 1 - 2 files changed, 3 deletions(-) diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java index 55fc8f60..d3a90693 100644 --- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java @@ -30,8 +30,6 @@ import org.elasticsoftware.akces.annotations.CommandInfo; import org.elasticsoftware.akces.annotations.DomainEventInfo; import org.elasticsoftware.akces.commands.Command; -import org.elasticsoftware.akces.errors.AggregateNotFoundErrorEvent; -import org.elasticsoftware.akces.errors.CommandExecutionErrorEvent; import org.elasticsoftware.akces.events.DomainEvent; import org.elasticsoftware.akces.events.ErrorEvent; import org.elasticsoftware.akces.kafka.KafkaAggregateRuntime; diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java index fd0b35c0..5a07ec43 100644 --- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java @@ -23,7 +23,6 @@ import com.embabel.agent.core.ProcessOptions; import jakarta.annotation.Nonnull; import org.elasticsoftware.akces.aggregate.*; -import org.elasticsoftware.akces.commands.Command; import org.elasticsoftware.akces.control.AggregateServiceRecord; import org.elasticsoftware.akces.events.DomainEvent; import org.slf4j.Logger;