> EVENT_HANDLER_SYSTEM_ERRORS = List.of(
+ new DomainEventType<>("AggregateNotFoundError", 1,
+ AggregateNotFoundErrorEvent.class, false, false, true, false)
+ );
private ApplicationContext applicationContext;
private final ObjectMapper objectMapper;
@@ -72,13 +128,19 @@ public void setApplicationContext(ApplicationContext applicationContext) throws
@Override
public AgenticAggregateRuntime getObject() {
- AgenticAggregateInfo agenticInfo = aggregate.getClass().getAnnotation(AgenticAggregateInfo.class);
+ AgenticAggregateInfo agenticInfo =
+ aggregate.getClass().getAnnotation(AgenticAggregateInfo.class);
if (agenticInfo == null) {
throw new IllegalStateException(
"Class implementing AgenticAggregate must be annotated with @AgenticAggregateInfo");
}
- KafkaAggregateRuntime kafkaRuntime = createRuntime(agenticInfo, aggregate);
- return new KafkaAgenticAggregateRuntime(kafkaRuntime, objectMapper, agenticInfo.stateClass());
+
+ // Resolve AgentPlatform — mandatory for all agentic aggregates.
+ AgentPlatform agentPlatform = resolveAgentPlatform(agenticInfo.value());
+
+ KafkaAggregateRuntime kafkaRuntime = createRuntime(agenticInfo, aggregate, agentPlatform);
+ return new KafkaAgenticAggregateRuntime(
+ kafkaRuntime, objectMapper, agenticInfo.stateClass(), agentPlatform);
}
@Override
@@ -86,9 +148,52 @@ public Class> getObjectType() {
return AgenticAggregateRuntime.class;
}
+ /**
+ * Resolves the {@link AgentPlatform} from the {@link ApplicationContext}.
+ *
+ * @param aggregateName the aggregate name (for error messages)
+ * @return the resolved {@link AgentPlatform}; never {@code null}
+ * @throws IllegalStateException if no {@link AgentPlatform} bean is available
+ */
+ private AgentPlatform resolveAgentPlatform(String aggregateName) {
+ try {
+ return applicationContext.getBean(AgentPlatform.class);
+ } catch (BeansException e) {
+ throw new IllegalStateException(
+ "AgentPlatform is required for agentic aggregate '" + aggregateName
+ + "' but was not found in the ApplicationContext. "
+ + "Please ensure embabel-agent-starter is configured.", e);
+ }
+ }
+
+ /**
+ * Resolves the {@link Agent} for this aggregate from the {@link ApplicationContext}.
+ *
+ * Looks for a Spring bean of type {@link Agent} named {@code {aggregateName}Agent}.
+ * The implementing application is responsible for registering this bean. If no such
+ * bean is found, a fatal error is raised.
+ *
+ * @param aggregateName the aggregate name used to derive the agent bean name
+ * @return the resolved {@link Agent}; never {@code null}
+ * @throws IllegalStateException if no matching {@link Agent} bean is found
+ */
+ private Agent resolveAgent(String aggregateName) {
+ String agentBeanName = aggregateName + "Agent";
+ try {
+ return applicationContext.getBean(agentBeanName, Agent.class);
+ } catch (BeansException e) {
+ throw new IllegalStateException(
+ "No Agent bean found with name '" + agentBeanName + "' for agentic aggregate '"
+ + aggregateName + "'. "
+ + "The implementing application must provide a Spring bean of type "
+ + "com.embabel.agent.core.Agent named '" + agentBeanName + "'.", e);
+ }
+ }
+
@SuppressWarnings("unchecked")
private KafkaAggregateRuntime createRuntime(AgenticAggregateInfo agenticInfo,
- AgenticAggregate aggregate) {
+ AgenticAggregate aggregate,
+ AgentPlatform agentPlatform) {
KafkaAggregateRuntime.Builder runtimeBuilder = new KafkaAggregateRuntime.Builder();
AggregateStateInfo stateInfo = agenticInfo.stateClass().getAnnotation(AggregateStateInfo.class);
@@ -110,7 +215,7 @@ private KafkaAggregateRuntime createRuntime(AgenticAggregateInfo agenticInfo,
.setAggregateClass((Class extends Aggregate>>) aggregate.getClass())
.setObjectMapper(objectMapper);
- // CommandHandlerFunctions
+ // CommandHandlerFunctions (deterministic, from @CommandHandler methods)
applicationContext.getBeansOfType(CommandHandlerFunction.class).values().stream()
.filter(adapter -> adapter.getAggregate().equals(aggregate))
.forEach(adapter -> {
@@ -165,7 +270,8 @@ private KafkaAggregateRuntime createRuntime(AgenticAggregateInfo agenticInfo,
if (adapter.getInputType() instanceof AggregateStateType> stateType) {
runtimeBuilder.addStateUpcastingHandler(stateType, adapter);
} else if (adapter.getInputType() instanceof DomainEventType> eventType) {
- runtimeBuilder.addEventUpcastingHandler(eventType, adapter).addDomainEvent(eventType);
+ runtimeBuilder.addEventUpcastingHandler(eventType, adapter)
+ .addDomainEvent(eventType);
}
});
@@ -179,6 +285,176 @@ private KafkaAggregateRuntime createRuntime(AgenticAggregateInfo agenticInfo,
.addEventSourcingHandler(MEMORY_REVOKED_TYPE, KafkaAgenticAggregateRuntime::handleMemoryEvent)
.addDomainEvent(MEMORY_REVOKED_TYPE);
+ // Collect agent-produced error types for registration and inclusion in adapters.
+ List> agentProducedErrorTypes =
+ buildAgentProducedErrorTypes(agenticInfo.agentProducedErrors());
+ agentProducedErrorTypes.forEach(runtimeBuilder::addDomainEvent);
+
+ // Lazy Agent resolution: only needed when agent-handled commands or events exist.
+ boolean hasAgentHandlers = agenticInfo.agentHandledCommands().length > 0
+ || agenticInfo.agentHandledEvents().length > 0;
+ Agent agent = hasAgentHandlers ? resolveAgent(agenticInfo.value()) : null;
+
+ // Process agentHandledCommands — register AgenticCommandHandlerFunctionAdapter
+ for (Class extends Command> commandClass : agenticInfo.agentHandledCommands()) {
+ processAgentHandledCommand(
+ commandClass, aggregate, agentPlatform, agent,
+ agentProducedErrorTypes, runtimeBuilder);
+ }
+
+ // Process agentHandledEvents — register AgenticEventHandlerFunctionAdapter
+ for (Class extends DomainEvent> eventClass : agenticInfo.agentHandledEvents()) {
+ processAgentHandledEvent(
+ eventClass, aggregate, agentPlatform, agent,
+ agentProducedErrorTypes, runtimeBuilder);
+ }
+
return runtimeBuilder.validateAndBuild();
}
+
+ /**
+ * Builds the list of {@link DomainEventType} entries for all classes declared in
+ * {@code agentProducedErrors}.
+ *
+ * @param errorClasses the error event classes from the annotation
+ * @return a list of {@code DomainEventType(error=true)} entries
+ * @throws IllegalArgumentException if any class is not annotated with {@code @DomainEventInfo}
+ * or does not implement {@link ErrorEvent}
+ */
+ private List> buildAgentProducedErrorTypes(
+ Class extends ErrorEvent>[] errorClasses) {
+ List> result = new ArrayList<>(errorClasses.length);
+ for (Class extends ErrorEvent> errorClass : errorClasses) {
+ DomainEventInfo info = errorClass.getAnnotation(DomainEventInfo.class);
+ if (info == null) {
+ throw new IllegalArgumentException(
+ "Agent-produced error class " + errorClass.getName()
+ + " must be annotated with @DomainEventInfo");
+ }
+ result.add(new DomainEventType<>(
+ info.type(), info.version(), errorClass,
+ false, // create
+ false, // external
+ true, // error
+ false)); // piiData
+ }
+ return Collections.unmodifiableList(result);
+ }
+
+ /**
+ * Validates and registers an agent-handled command, creating an
+ * {@link AgenticCommandHandlerFunctionAdapter} and registering it with the runtime builder.
+ *
+ * @param commandClass the command class to register
+ * @param aggregate the owning aggregate instance
+ * @param agentPlatform the Embabel platform
+ * @param agent the deployed {@link Agent} for this aggregate
+ * @param agentProducedErrors the error types the agent may produce
+ * @param runtimeBuilder the runtime builder to register with
+ */
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ private void processAgentHandledCommand(
+ Class extends Command> commandClass,
+ AgenticAggregate aggregate,
+ AgentPlatform agentPlatform,
+ Agent agent,
+ List> agentProducedErrors,
+ KafkaAggregateRuntime.Builder runtimeBuilder) {
+
+ CommandInfo commandInfo = commandClass.getAnnotation(CommandInfo.class);
+ if (commandInfo == null) {
+ throw new IllegalArgumentException(
+ "Agent-handled command class " + commandClass.getName()
+ + " must be annotated with @CommandInfo");
+ }
+
+ // Agent-handled commands always have create=false (design constraint).
+ CommandType commandType = new CommandType<>(
+ commandInfo.type(), commandInfo.version(), commandClass,
+ false, // create — agent-handled commands cannot create state
+ false, // external
+ false); // piiData
+
+ // Error types: system errors + agent-produced errors
+ List> errorTypes = new ArrayList<>(COMMAND_HANDLER_SYSTEM_ERRORS);
+ errorTypes.addAll(agentProducedErrors);
+
+ AgenticCommandHandlerFunctionAdapter adapter = new AgenticCommandHandlerFunctionAdapter<>(
+ (AgenticAggregate) aggregate,
+ commandType,
+ agentPlatform,
+ agent,
+ (List) List.of(), // producedDomainEventTypes: empty (events registered via ESH)
+ (List) errorTypes,
+ Collections::emptyList); // aggregateServicesSupplier: wired by controller in later phase
+
+ runtimeBuilder.addCommandHandler(commandType, adapter).addCommand(commandType);
+ errorTypes.forEach(runtimeBuilder::addDomainEvent);
+
+ logger.debug("Registered agent-handled command {} v{} for aggregate {}",
+ commandInfo.type(), commandInfo.version(),
+ aggregate.getClass().getSimpleName());
+ }
+
+ /**
+ * Validates and registers an agent-handled external event, creating an
+ * {@link AgenticEventHandlerFunctionAdapter} and registering it with the runtime builder.
+ *
+ * @param eventClass the external domain event class to register
+ * @param aggregate the owning aggregate instance
+ * @param agentPlatform the Embabel platform
+ * @param agent the deployed {@link Agent} for this aggregate
+ * @param agentProducedErrors the error types the agent may produce
+ * @param runtimeBuilder the runtime builder to register with
+ */
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ private void processAgentHandledEvent(
+ Class extends DomainEvent> eventClass,
+ AgenticAggregate aggregate,
+ AgentPlatform agentPlatform,
+ Agent agent,
+ List> agentProducedErrors,
+ KafkaAggregateRuntime.Builder runtimeBuilder) {
+
+ if (ErrorEvent.class.isAssignableFrom(eventClass)) {
+ throw new IllegalArgumentException(
+ "Agent-handled event class " + eventClass.getName()
+ + " must not implement ErrorEvent. "
+ + "Error events should be declared in agentProducedErrors instead.");
+ }
+
+ DomainEventInfo eventInfo = eventClass.getAnnotation(DomainEventInfo.class);
+ if (eventInfo == null) {
+ throw new IllegalArgumentException(
+ "Agent-handled event class " + eventClass.getName()
+ + " must be annotated with @DomainEventInfo");
+ }
+
+ DomainEventType eventType = new DomainEventType<>(
+ eventInfo.type(), eventInfo.version(), eventClass,
+ false, // create
+ true, // external — agent-handled events are always external
+ false, // error
+ false); // piiData
+
+ // Error types: system errors + agent-produced errors
+ List> errorTypes = new ArrayList<>(EVENT_HANDLER_SYSTEM_ERRORS);
+ errorTypes.addAll(agentProducedErrors);
+
+ AgenticEventHandlerFunctionAdapter adapter = new AgenticEventHandlerFunctionAdapter<>(
+ (AgenticAggregate) aggregate,
+ eventType,
+ agentPlatform,
+ agent,
+ (List) List.of(), // producedDomainEventTypes: empty (events registered via ESH)
+ (List) errorTypes,
+ Collections::emptyList); // aggregateServicesSupplier: wired by controller in later phase
+
+ runtimeBuilder.addExternalEventHandler(eventType, adapter).addDomainEvent(eventType);
+ errorTypes.forEach(runtimeBuilder::addDomainEvent);
+
+ logger.debug("Registered agent-handled external event {} v{} for aggregate {}",
+ eventInfo.type(), eventInfo.version(),
+ aggregate.getClass().getSimpleName());
+ }
}
diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgentProcessResultTranslator.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgentProcessResultTranslator.java
new file mode 100644
index 00000000..55e33527
--- /dev/null
+++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgentProcessResultTranslator.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2022 - 2026 The Original Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.elasticsoftware.akces.agentic.runtime;
+
+import com.embabel.agent.core.Blackboard;
+import org.elasticsoftware.akces.events.DomainEvent;
+import org.elasticsoftware.akces.events.ErrorEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Utility class that translates the results of an Embabel {@code AgentProcess} back
+ * into Akces {@link DomainEvent} instances.
+ *
+ * After each agent tick (or after the process reaches an end state), call
+ * {@link #collectEvents(Blackboard)} to drain {@link DomainEvent} objects from the
+ * blackboard. Events are marked as hidden on the blackboard after collection, so
+ * subsequent calls to this method will not return the same events again — this
+ * supports both tick-to-completion and future incremental-tick processing patterns.
+ *
+ *
Undeclared {@link ErrorEvent} instances are accepted at runtime. A warning is
+ * logged so operators can detect unexpected error types without causing the transaction
+ * to abort.
+ */
+public final class AgentProcessResultTranslator {
+
+ private static final Logger logger = LoggerFactory.getLogger(AgentProcessResultTranslator.class);
+
+ private AgentProcessResultTranslator() {
+ // utility class
+ }
+
+ /**
+ * Collects all {@link DomainEvent} objects currently visible on the blackboard
+ * and removes them from visible scope (via {@link Blackboard#hide(Object)}) so that
+ * they are not collected again on the next call.
+ *
+ *
Any {@link ErrorEvent} instances found are logged at {@code WARN} level so
+ * that operators can detect unexpected error types emitted by the agent.
+ *
+ * @param blackboard the agent process blackboard to drain events from
+ * @return an unmodifiable list of collected domain events in the order they
+ * were encountered on the blackboard; never {@code null}, may be empty
+ */
+ public static List collectEvents(Blackboard blackboard) {
+ List events = blackboard.getObjects().stream()
+ .filter(o -> o instanceof DomainEvent)
+ .map(o -> (DomainEvent) o)
+ .toList();
+
+ for (DomainEvent event : events) {
+ if (event instanceof ErrorEvent) {
+ logger.warn("Agent produced error event of type {}: {}",
+ event.getClass().getName(), event);
+ }
+ blackboard.hide(event);
+ }
+
+ return events;
+ }
+}
diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java
new file mode 100644
index 00000000..5597da71
--- /dev/null
+++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java
@@ -0,0 +1,197 @@
+/*
+ * Copyright 2022 - 2026 The Original Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.elasticsoftware.akces.agentic.runtime;
+
+import com.embabel.agent.core.Agent;
+import com.embabel.agent.core.AgentPlatform;
+import com.embabel.agent.core.AgentProcess;
+import com.embabel.agent.core.ProcessOptions;
+import jakarta.annotation.Nonnull;
+import org.elasticsoftware.akces.aggregate.*;
+import org.elasticsoftware.akces.commands.Command;
+import org.elasticsoftware.akces.control.AggregateServiceRecord;
+import org.elasticsoftware.akces.events.DomainEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+import java.util.stream.Stream;
+
+/**
+ * A {@link CommandHandlerFunction} implementation that routes an agent-handled command
+ * through the Embabel AI agent framework instead of a deterministic
+ * {@code @CommandHandler} method.
+ *
+ * When {@link #apply(Command, AggregateState)} is called the adapter:
+ *
+ * - Assembles a bindings {@link Map} containing the command, current aggregate state,
+ * memories, aggregate service records, and condition flags.
+ * - Creates an {@link AgentProcess} via
+ * {@link AgentPlatform#createAgentProcess(Agent, ProcessOptions, Map)} using the
+ * provided {@link Agent} and the assembled bindings.
+ * - Calls {@link AgentProcess#tick()} in a loop until the process reaches an end
+ * state (completed, failed, terminated, or killed).
+ * - Collects {@link DomainEvent} objects placed on the agent's blackboard via
+ * {@link AgentProcessResultTranslator#collectEvents} and returns them as a
+ * {@link Stream}.
+ *
+ *
+ * {@link #isCreate()} always returns {@code false} — agent-handled commands cannot
+ * create aggregate state. Every agentic aggregate must have a separate deterministic
+ * {@code @CommandHandler(create = true)} method.
+ *
+ *
The returned stream of events is fed back into the standard
+ * {@code KafkaAggregateRuntime.handleCommand()} flow unchanged, where each event is
+ * applied through the registered {@code @EventSourcingHandler} methods.
+ *
+ * @param the aggregate state type; must implement both {@link AggregateState} and
+ * {@link MemoryAwareState}
+ * @param the command type handled by this adapter
+ * @param the domain event type produced by this adapter
+ */
+public class AgenticCommandHandlerFunctionAdapter
+ implements CommandHandlerFunction {
+
+ private static final Logger logger =
+ LoggerFactory.getLogger(AgenticCommandHandlerFunctionAdapter.class);
+
+ private final AgenticAggregate aggregate;
+ private final CommandType commandType;
+ private final AgentPlatform agentPlatform;
+ private final Agent agent;
+ private final List> producedDomainEventTypes;
+ private final List> errorEventTypes;
+ private final Supplier> aggregateServicesSupplier;
+
+ /**
+ * Creates a new {@code AgenticCommandHandlerFunctionAdapter}.
+ *
+ * @param aggregate the owning agentic aggregate instance
+ * @param commandType the command type this adapter handles
+ * @param agentPlatform the Embabel platform used to create agent processes
+ * @param agent the deployed {@link Agent} for this aggregate,
+ * provided by the implementing application
+ * @param producedDomainEventTypes domain event types this adapter may produce
+ * @param errorEventTypes error event types this adapter may produce
+ * @param aggregateServicesSupplier supplier of all known {@link AggregateServiceRecord}s;
+ * used to populate the blackboard for service discovery
+ */
+ public AgenticCommandHandlerFunctionAdapter(
+ AgenticAggregate aggregate,
+ CommandType commandType,
+ AgentPlatform agentPlatform,
+ Agent agent,
+ List> producedDomainEventTypes,
+ List> errorEventTypes,
+ Supplier> aggregateServicesSupplier) {
+ this.aggregate = aggregate;
+ this.commandType = commandType;
+ this.agentPlatform = agentPlatform;
+ this.agent = agent;
+ this.producedDomainEventTypes = List.copyOf(producedDomainEventTypes);
+ this.errorEventTypes = List.copyOf(errorEventTypes);
+ this.aggregateServicesSupplier = aggregateServicesSupplier;
+ }
+
+ /**
+ * Processes the command through the Embabel AI agent framework.
+ *
+ * Populates the agent blackboard with:
+ *
+ * - {@code "command"} — the command being processed
+ * - {@code "state"} — the current aggregate state
+ * - {@code "agenticAggregateId"} — the aggregate identifier
+ * - {@code "memories"} — the list of current memories from state
+ * - {@code "aggregateServices"} — all known aggregate service records
+ * - {@code "isCommandProcessing"} (condition) — {@code true}
+ * - {@code "isExternalEvent"} (condition) — {@code false}
+ * - {@code "hasMemories"} (condition) — whether any memories are present
+ *
+ *
+ * @param command the command to process; never {@code null}
+ * @param state the current aggregate state
+ * @return a stream of domain events produced by the agent; may be empty
+ */
+ @Nonnull
+ @Override
+ @SuppressWarnings("unchecked")
+ public Stream apply(@Nonnull C command, S state) {
+ logger.debug("Processing agent-handled command {} for aggregate {}",
+ commandType.typeName(), aggregate.getClass().getSimpleName());
+
+ Map bindings = new LinkedHashMap<>();
+ bindings.put("command", command);
+ bindings.put("state", state);
+ bindings.put("agenticAggregateId", state.getAggregateId());
+ bindings.put("memories", state.getMemories());
+ bindings.put("aggregateServices", aggregateServicesSupplier.get());
+ bindings.put("isCommandProcessing", true);
+ bindings.put("isExternalEvent", false);
+ bindings.put("hasMemories", !state.getMemories().isEmpty());
+
+ AgentProcess agentProcess =
+ agentPlatform.createAgentProcess(agent, ProcessOptions.DEFAULT, bindings);
+
+ while (!agentProcess.getFinished()) {
+ agentProcess.tick();
+ }
+
+ logger.debug("Agent process completed with status {} for command {} on aggregate {}",
+ agentProcess.getStatus(), commandType.typeName(), aggregate.getClass().getSimpleName());
+
+ return (Stream) AgentProcessResultTranslator
+ .collectEvents(agentProcess.getBlackboard())
+ .stream();
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * Always returns {@code false} — agent-handled commands cannot create aggregate
+ * state. A deterministic {@code @CommandHandler(create = true)} must exist on the
+ * aggregate class to handle creation.
+ */
+ @Override
+ public boolean isCreate() {
+ return false;
+ }
+
+ @Override
+ public CommandType getCommandType() {
+ return commandType;
+ }
+
+ @Override
+ public Aggregate getAggregate() {
+ return aggregate;
+ }
+
+ @Override
+ public List> getProducedDomainEventTypes() {
+ return producedDomainEventTypes;
+ }
+
+ @Override
+ public List> getErrorEventTypes() {
+ return errorEventTypes;
+ }
+}
diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java
new file mode 100644
index 00000000..cb7b1083
--- /dev/null
+++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java
@@ -0,0 +1,196 @@
+/*
+ * Copyright 2022 - 2026 The Original Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.elasticsoftware.akces.agentic.runtime;
+
+import com.embabel.agent.core.Agent;
+import com.embabel.agent.core.AgentPlatform;
+import com.embabel.agent.core.AgentProcess;
+import com.embabel.agent.core.ProcessOptions;
+import jakarta.annotation.Nonnull;
+import org.elasticsoftware.akces.aggregate.*;
+import org.elasticsoftware.akces.commands.Command;
+import org.elasticsoftware.akces.control.AggregateServiceRecord;
+import org.elasticsoftware.akces.events.DomainEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+import java.util.stream.Stream;
+
+/**
+ * An {@link EventHandlerFunction} implementation that routes an agent-handled external
+ * domain event through the Embabel AI agent framework instead of a deterministic
+ * {@code @EventHandler} method.
+ *
+ * When {@link #apply(DomainEvent, AggregateState)} is called the adapter:
+ *
+ * - Assembles a bindings {@link Map} containing the event, current aggregate state,
+ * memories, aggregate service records, and condition flags.
+ * - Creates an {@link AgentProcess} via
+ * {@link AgentPlatform#createAgentProcess(Agent, ProcessOptions, Map)} using the
+ * provided {@link Agent} and the assembled bindings.
+ * - Calls {@link AgentProcess#tick()} in a loop until the process reaches an end
+ * state (completed, failed, terminated, or killed).
+ * - Collects {@link DomainEvent} objects placed on the agent's blackboard via
+ * {@link AgentProcessResultTranslator#collectEvents} and returns them as a
+ * {@link Stream}.
+ *
+ *
+ * {@link #isCreate()} always returns {@code false} — agent-handled events cannot
+ * create aggregate state. Every agentic aggregate must have a separate deterministic
+ * {@code @CommandHandler(create = true)} method.
+ *
+ *
The returned stream of events is fed back into the standard
+ * {@code KafkaAggregateRuntime.handleEvent()} flow unchanged, where each event is
+ * applied through the registered {@code @EventSourcingHandler} methods.
+ *
+ * @param the aggregate state type; must implement both {@link AggregateState}
+ * and {@link MemoryAwareState}
+ * @param the external domain event type handled by this adapter
+ * @param the domain event type produced by this adapter
+ */
+public class AgenticEventHandlerFunctionAdapter
+ implements EventHandlerFunction {
+
+ private static final Logger logger =
+ LoggerFactory.getLogger(AgenticEventHandlerFunctionAdapter.class);
+
+ private final AgenticAggregate aggregate;
+ private final DomainEventType eventType;
+ private final AgentPlatform agentPlatform;
+ private final Agent agent;
+ private final List> producedDomainEventTypes;
+ private final List> errorEventTypes;
+ private final Supplier> aggregateServicesSupplier;
+
+ /**
+ * Creates a new {@code AgenticEventHandlerFunctionAdapter}.
+ *
+ * @param aggregate the owning agentic aggregate instance
+ * @param eventType the external domain event type this adapter handles
+ * @param agentPlatform the Embabel platform used to create agent processes
+ * @param agent the deployed {@link Agent} for this aggregate,
+ * provided by the implementing application
+ * @param producedDomainEventTypes domain event types this adapter may produce
+ * @param errorEventTypes error event types this adapter may produce
+ * @param aggregateServicesSupplier supplier of all known {@link AggregateServiceRecord}s;
+ * used to populate the blackboard for service discovery
+ */
+ public AgenticEventHandlerFunctionAdapter(
+ AgenticAggregate aggregate,
+ DomainEventType eventType,
+ AgentPlatform agentPlatform,
+ Agent agent,
+ List> producedDomainEventTypes,
+ List> errorEventTypes,
+ Supplier> aggregateServicesSupplier) {
+ this.aggregate = aggregate;
+ this.eventType = eventType;
+ this.agentPlatform = agentPlatform;
+ this.agent = agent;
+ this.producedDomainEventTypes = List.copyOf(producedDomainEventTypes);
+ this.errorEventTypes = List.copyOf(errorEventTypes);
+ this.aggregateServicesSupplier = aggregateServicesSupplier;
+ }
+
+ /**
+ * Processes the external domain event through the Embabel AI agent framework.
+ *
+ * Populates the agent blackboard with:
+ *
+ * - {@code "event"} — the external domain event being processed
+ * - {@code "state"} — the current aggregate state
+ * - {@code "agenticAggregateId"} — the aggregate identifier
+ * - {@code "memories"} — the list of current memories from state
+ * - {@code "aggregateServices"} — all known aggregate service records
+ * - {@code "isCommandProcessing"} (condition) — {@code false}
+ * - {@code "isExternalEvent"} (condition) — {@code true}
+ * - {@code "hasMemories"} (condition) — whether any memories are present
+ *
+ *
+ * @param event the external domain event to process; never {@code null}
+ * @param state the current aggregate state
+ * @return a stream of domain events produced by the agent; may be empty
+ */
+ @Nonnull
+ @Override
+ @SuppressWarnings("unchecked")
+ public Stream apply(@Nonnull InputEvent event, S state) {
+ logger.debug("Processing agent-handled external event {} for aggregate {}",
+ eventType.typeName(), aggregate.getClass().getSimpleName());
+
+ Map bindings = new LinkedHashMap<>();
+ bindings.put("event", event);
+ bindings.put("state", state);
+ bindings.put("agenticAggregateId", state.getAggregateId());
+ bindings.put("memories", state.getMemories());
+ bindings.put("aggregateServices", aggregateServicesSupplier.get());
+ bindings.put("isCommandProcessing", false);
+ bindings.put("isExternalEvent", true);
+ bindings.put("hasMemories", !state.getMemories().isEmpty());
+
+ AgentProcess agentProcess =
+ agentPlatform.createAgentProcess(agent, ProcessOptions.DEFAULT, bindings);
+
+ while (!agentProcess.getFinished()) {
+ agentProcess.tick();
+ }
+
+ logger.debug("Agent process completed with status {} for event {} on aggregate {}",
+ agentProcess.getStatus(), eventType.typeName(), aggregate.getClass().getSimpleName());
+
+ return (Stream) AgentProcessResultTranslator
+ .collectEvents(agentProcess.getBlackboard())
+ .stream();
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * Always returns {@code false} — agent-handled external events cannot create
+ * aggregate state.
+ */
+ @Override
+ public boolean isCreate() {
+ return false;
+ }
+
+ @Override
+ public DomainEventType getEventType() {
+ return eventType;
+ }
+
+ @Override
+ public Aggregate getAggregate() {
+ return aggregate;
+ }
+
+ @Override
+ public List> getProducedDomainEventTypes() {
+ return producedDomainEventTypes;
+ }
+
+ @Override
+ public List> getErrorEventTypes() {
+ return errorEventTypes;
+ }
+}
diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntime.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntime.java
index 8d75f3de..dd58d15a 100644
--- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntime.java
+++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntime.java
@@ -17,6 +17,7 @@
package org.elasticsoftware.akces.agentic.runtime;
+import com.embabel.agent.core.AgentPlatform;
import org.apache.kafka.common.errors.SerializationException;
import org.elasticsoftware.akces.agentic.AgenticAggregateRuntime;
import org.elasticsoftware.akces.agentic.events.MemoryRevokedEvent;
@@ -52,26 +53,39 @@ public class KafkaAgenticAggregateRuntime implements AgenticAggregateRuntime {
private final AggregateRuntime delegate;
private final ObjectMapper objectMapper;
private final Class extends AggregateState> stateClass;
+ private final AgentPlatform agentPlatform;
/**
* Creates a new {@code KafkaAgenticAggregateRuntime}.
*
- * @param delegate the underlying aggregate runtime to delegate to
- * @param objectMapper Jackson object mapper for state deserialization
- * @param stateClass the concrete state class used by this aggregate
+ * @param delegate the underlying aggregate runtime to delegate to
+ * @param objectMapper Jackson object mapper for state deserialization
+ * @param stateClass the concrete state class used by this aggregate
+ * @param agentPlatform the Embabel {@link AgentPlatform} used for AI-assisted processing;
+ * must not be {@code null}
*/
public KafkaAgenticAggregateRuntime(AggregateRuntime delegate,
ObjectMapper objectMapper,
- Class extends AggregateState> stateClass) {
+ Class extends AggregateState> stateClass,
+ AgentPlatform agentPlatform) {
this.delegate = delegate;
this.objectMapper = objectMapper;
this.stateClass = stateClass;
+ this.agentPlatform = agentPlatform;
}
// -------------------------------------------------------------------------
// AgenticAggregateRuntime extension
// -------------------------------------------------------------------------
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public AgentPlatform getAgentPlatform() {
+ return agentPlatform;
+ }
+
/**
* {@inheritDoc}
*
diff --git a/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntimeTest.java b/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntimeTest.java
index 30d1003f..bdd24a47 100644
--- a/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntimeTest.java
+++ b/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntimeTest.java
@@ -17,6 +17,7 @@
package org.elasticsoftware.akces.agentic.runtime;
+import com.embabel.agent.core.AgentPlatform;
import org.elasticsoftware.akces.agentic.AgenticAggregateRuntime;
import org.elasticsoftware.akces.aggregate.*;
import org.elasticsoftware.akces.protocol.AggregateStateRecord;
@@ -40,10 +41,11 @@
/**
* Unit tests for {@link KafkaAgenticAggregateRuntime}, verifying the delegation pattern,
- * memory extraction from state records, and the built-in domain event type constants.
+ * memory extraction from state records, AgentPlatform exposure, and the built-in domain
+ * event type constants.
*
- * Uses Mockito to mock the underlying {@link AggregateRuntime} delegate, avoiding any
- * Kafka or Spring dependencies.
+ *
Uses Mockito to mock the underlying {@link AggregateRuntime} delegate and the
+ * {@link com.embabel.agent.core.AgentPlatform}, avoiding any Kafka or Spring dependencies.
*/
@ExtendWith(MockitoExtension.class)
class KafkaAgenticAggregateRuntimeTest {
@@ -91,13 +93,16 @@ public String getAggregateId() {
@Mock
private AggregateRuntime delegate;
+ @Mock
+ private AgentPlatform agentPlatform;
+
private ObjectMapper objectMapper;
private KafkaAgenticAggregateRuntime runtime;
@BeforeEach
void setUp() {
objectMapper = JsonMapper.builder().build();
- runtime = new KafkaAgenticAggregateRuntime(delegate, objectMapper, TestMemoryState.class);
+ runtime = new KafkaAgenticAggregateRuntime(delegate, objectMapper, TestMemoryState.class, agentPlatform);
}
// -------------------------------------------------------------------------
@@ -133,7 +138,7 @@ void getMemoriesShouldReturnMemoriesFromMemoryAwareState() throws IOException {
@Test
void getMemoriesShouldReturnEmptyListForNonMemoryAwareState() throws IOException {
- var plainRuntime = new KafkaAgenticAggregateRuntime(delegate, objectMapper, PlainState.class);
+ var plainRuntime = new KafkaAgenticAggregateRuntime(delegate, objectMapper, PlainState.class, agentPlatform);
var state = new PlainState("agg-1");
byte[] payload = objectMapper.writeValueAsBytes(state);
@@ -211,6 +216,11 @@ void getAllCommandTypesShouldDelegateToUnderlyingRuntime() {
verify(delegate).getAllCommandTypes();
}
+ @Test
+ void getAgentPlatformShouldReturnInjectedPlatform() {
+ assertThat(runtime.getAgentPlatform()).isSameAs(agentPlatform);
+ }
+
// -------------------------------------------------------------------------
// Built-in DomainEventType constants
// -------------------------------------------------------------------------
diff --git a/main/api/src/main/java/org/elasticsoftware/akces/annotations/AgenticAggregateInfo.java b/main/api/src/main/java/org/elasticsoftware/akces/annotations/AgenticAggregateInfo.java
index fb3ea7c7..b06ab1db 100644
--- a/main/api/src/main/java/org/elasticsoftware/akces/annotations/AgenticAggregateInfo.java
+++ b/main/api/src/main/java/org/elasticsoftware/akces/annotations/AgenticAggregateInfo.java
@@ -18,6 +18,9 @@
package org.elasticsoftware.akces.annotations;
import org.elasticsoftware.akces.aggregate.AggregateState;
+import org.elasticsoftware.akces.commands.Command;
+import org.elasticsoftware.akces.events.DomainEvent;
+import org.elasticsoftware.akces.events.ErrorEvent;
import org.springframework.core.annotation.AliasFor;
import org.springframework.stereotype.Component;
@@ -26,6 +29,24 @@
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
+/**
+ * Marks a class as an agentic aggregate, a special type of aggregate that integrates with
+ * the Embabel AI agent framework for AI-assisted command and event processing.
+ *
+ *
In addition to the standard aggregate capabilities, an agentic aggregate may declare:
+ *
+ * - {@link #agentHandledCommands} — commands processed entirely by the AI agent (no
+ * {@code @CommandHandler} method required)
+ * - {@link #agentHandledEvents} — external domain events processed by the AI agent (no
+ * {@code @EventHandler} method required)
+ * - {@link #agentProducedErrors} — error event types the AI agent may emit during
+ * processing (registered for schema validation and service discovery)
+ *
+ *
+ * Every {@code AgenticAggregate} must also have at least one deterministic
+ * {@code @CommandHandler(create = true)} method so that the aggregate can be created
+ * before the AI agent handles subsequent commands.
+ */
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Component
@@ -42,4 +63,32 @@
* exceeds this limit the oldest entries are evicted to make room for new ones.
*/
int maxMemories() default 100;
+
+ /**
+ * Command classes to be processed by the AI agent instead of a deterministic
+ * {@code @CommandHandler} method. Each class must implement {@link Command} and be
+ * annotated with {@code @CommandInfo}.
+ *
+ *
Agent-handled commands cannot create aggregate state — every agentic aggregate must
+ * have a separate deterministic {@code @CommandHandler(create = true)} method.
+ */
+ Class extends Command>[] agentHandledCommands() default {};
+
+ /**
+ * External domain event classes to be processed by the AI agent instead of a
+ * deterministic {@code @EventHandler} method. Each class must implement
+ * {@link DomainEvent} and be annotated with {@code @DomainEventInfo}.
+ */
+ Class extends DomainEvent>[] agentHandledEvents() default {};
+
+ /**
+ * Error event classes that the AI agent may produce during command or event processing.
+ * Each class must implement {@link ErrorEvent} and be annotated with
+ * {@code @DomainEventInfo}.
+ *
+ *
These types are registered as {@code DomainEventType(error=true)} for JSON schema
+ * validation and service discovery. At runtime, undeclared {@link ErrorEvent} instances
+ * emitted by the agent are accepted with a warning rather than rejected.
+ */
+ Class extends ErrorEvent>[] agentProducedErrors() default {};
}
diff --git a/main/api/src/test/java/org/elasticsoftware/akces/aggregate/AgenticAggregateInfoTest.java b/main/api/src/test/java/org/elasticsoftware/akces/aggregate/AgenticAggregateInfoTest.java
index fa513f68..ed0db235 100644
--- a/main/api/src/test/java/org/elasticsoftware/akces/aggregate/AgenticAggregateInfoTest.java
+++ b/main/api/src/test/java/org/elasticsoftware/akces/aggregate/AgenticAggregateInfoTest.java
@@ -17,7 +17,10 @@
package org.elasticsoftware.akces.aggregate;
-import org.elasticsoftware.akces.annotations.AgenticAggregateInfo;
+import org.elasticsoftware.akces.annotations.*;
+import org.elasticsoftware.akces.commands.Command;
+import org.elasticsoftware.akces.events.DomainEvent;
+import org.elasticsoftware.akces.events.ErrorEvent;
import org.junit.jupiter.api.Test;
import org.springframework.stereotype.Component;
@@ -117,4 +120,138 @@ void requiredAttributesShouldBePresent() {
assertThat(hasValue).as("value() attribute must be present").isTrue();
assertThat(hasStateClass).as("stateClass() attribute must be present").isTrue();
}
+
+ // -------------------------------------------------------------------------
+ // Test fixtures for new agentic annotation properties
+ // -------------------------------------------------------------------------
+
+ /** Stub command for agent-handled command tests. */
+ @CommandInfo(type = "TestAgentCommand", version = 1)
+ record TestAgentCommand(@AggregateIdentifier String id) implements Command {
+ @Override
+ public String getAggregateId() {
+ return id;
+ }
+ }
+
+ /** Stub external event for agent-handled event tests. */
+ @DomainEventInfo(type = "TestExternalEvent", version = 1)
+ record TestExternalEvent(@AggregateIdentifier String id) implements DomainEvent {
+ @Override
+ public String getAggregateId() {
+ return id;
+ }
+ }
+
+ /** Stub error event for agent-produced error tests. */
+ @DomainEventInfo(type = "TestAgentError", version = 1)
+ record TestAgentError(@AggregateIdentifier String id) implements ErrorEvent {
+ @Override
+ public String getAggregateId() {
+ return id;
+ }
+ }
+
+ /** Aggregate with all three new annotation properties populated. */
+ @AgenticAggregateInfo(
+ value = "FullAgentic",
+ stateClass = TestState.class,
+ agentHandledCommands = {TestAgentCommand.class},
+ agentHandledEvents = {TestExternalEvent.class},
+ agentProducedErrors = {TestAgentError.class}
+ )
+ static class FullAgenticAggregate implements AgenticAggregate {
+ @Override
+ public Class getStateClass() {
+ return TestState.class;
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // Tests for agentHandledCommands
+ // -------------------------------------------------------------------------
+
+ @Test
+ void agentHandledCommandsShouldDefaultToEmpty() {
+ AgenticAggregateInfo info = DefaultAgenticAggregate.class.getAnnotation(AgenticAggregateInfo.class);
+ assertThat(info).isNotNull();
+ assertThat(info.agentHandledCommands()).isEmpty();
+ }
+
+ @Test
+ void agentHandledCommandsShouldReturnConfiguredClasses() {
+ AgenticAggregateInfo info = FullAgenticAggregate.class.getAnnotation(AgenticAggregateInfo.class);
+ assertThat(info).isNotNull();
+ assertThat(info.agentHandledCommands()).containsExactly(TestAgentCommand.class);
+ }
+
+ @Test
+ void agentHandledCommandsShouldOnlyAcceptCommandImplementations() {
+ // Verify that the declared Command class actually implements Command
+ AgenticAggregateInfo info = FullAgenticAggregate.class.getAnnotation(AgenticAggregateInfo.class);
+ assertThat(info).isNotNull();
+ for (Class> commandClass : info.agentHandledCommands()) {
+ assertThat(Command.class.isAssignableFrom(commandClass))
+ .as("Agent-handled command %s must implement Command", commandClass.getName())
+ .isTrue();
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // Tests for agentHandledEvents
+ // -------------------------------------------------------------------------
+
+ @Test
+ void agentHandledEventsShouldDefaultToEmpty() {
+ AgenticAggregateInfo info = DefaultAgenticAggregate.class.getAnnotation(AgenticAggregateInfo.class);
+ assertThat(info).isNotNull();
+ assertThat(info.agentHandledEvents()).isEmpty();
+ }
+
+ @Test
+ void agentHandledEventsShouldReturnConfiguredClasses() {
+ AgenticAggregateInfo info = FullAgenticAggregate.class.getAnnotation(AgenticAggregateInfo.class);
+ assertThat(info).isNotNull();
+ assertThat(info.agentHandledEvents()).containsExactly(TestExternalEvent.class);
+ }
+
+ @Test
+ void agentHandledEventsShouldOnlyAcceptDomainEventImplementations() {
+ AgenticAggregateInfo info = FullAgenticAggregate.class.getAnnotation(AgenticAggregateInfo.class);
+ assertThat(info).isNotNull();
+ for (Class> eventClass : info.agentHandledEvents()) {
+ assertThat(DomainEvent.class.isAssignableFrom(eventClass))
+ .as("Agent-handled event %s must implement DomainEvent", eventClass.getName())
+ .isTrue();
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // Tests for agentProducedErrors
+ // -------------------------------------------------------------------------
+
+ @Test
+ void agentProducedErrorsShouldDefaultToEmpty() {
+ AgenticAggregateInfo info = DefaultAgenticAggregate.class.getAnnotation(AgenticAggregateInfo.class);
+ assertThat(info).isNotNull();
+ assertThat(info.agentProducedErrors()).isEmpty();
+ }
+
+ @Test
+ void agentProducedErrorsShouldReturnConfiguredClasses() {
+ AgenticAggregateInfo info = FullAgenticAggregate.class.getAnnotation(AgenticAggregateInfo.class);
+ assertThat(info).isNotNull();
+ assertThat(info.agentProducedErrors()).containsExactly(TestAgentError.class);
+ }
+
+ @Test
+ void agentProducedErrorsShouldOnlyAcceptErrorEventImplementations() {
+ AgenticAggregateInfo info = FullAgenticAggregate.class.getAnnotation(AgenticAggregateInfo.class);
+ assertThat(info).isNotNull();
+ for (Class> errorClass : info.agentProducedErrors()) {
+ assertThat(ErrorEvent.class.isAssignableFrom(errorClass))
+ .as("Agent-produced error %s must implement ErrorEvent", errorClass.getName())
+ .isTrue();
+ }
+ }
}
From 8b2e702eb0e6a64c2393bd5599bba11e51641eb2 Mon Sep 17 00:00:00 2001
From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com>
Date: Wed, 8 Apr 2026 17:10:27 +0000
Subject: [PATCH 3/7] =?UTF-8?q?fix:=20address=20code=20review=20=E2=80=94?=
=?UTF-8?q?=20expand=20ESH=20comment,=20add=20TODO=20on=20aggregateService?=
=?UTF-8?q?sSupplier,=20clarify=20tick=20loop=20intent?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Agent-Logs-Url: https://github.com/elasticsoftwarefoundation/akces-framework/sessions/88dfa45b-d88a-42e2-9027-be4b23fea52d
Co-authored-by: jwijgerd <914840+jwijgerd@users.noreply.github.com>
---
.../agentic/beans/AgenticAggregateRuntimeFactory.java | 10 ++++++----
.../runtime/AgenticCommandHandlerFunctionAdapter.java | 4 ++++
.../runtime/AgenticEventHandlerFunctionAdapter.java | 4 ++++
3 files changed, 14 insertions(+), 4 deletions(-)
diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java
index 38452964..1ef80c91 100644
--- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java
+++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java
@@ -384,9 +384,10 @@ private void processAgentHandledCommand(
commandType,
agentPlatform,
agent,
- (List) List.of(), // producedDomainEventTypes: empty (events registered via ESH)
+ (List) List.of(), // producedDomainEventTypes: empty — events are registered via EventSourcingHandler adapters
(List) errorTypes,
- Collections::emptyList); // aggregateServicesSupplier: wired by controller in later phase
+ // TODO: wire aggregateServicesSupplier from AkcesAgenticAggregateController (Phase 3)
+ Collections::emptyList);
runtimeBuilder.addCommandHandler(commandType, adapter).addCommand(commandType);
errorTypes.forEach(runtimeBuilder::addDomainEvent);
@@ -446,9 +447,10 @@ private void processAgentHandledEvent(
eventType,
agentPlatform,
agent,
- (List) List.of(), // producedDomainEventTypes: empty (events registered via ESH)
+ (List) List.of(), // producedDomainEventTypes: empty — events are registered via EventSourcingHandler adapters
(List) errorTypes,
- Collections::emptyList); // aggregateServicesSupplier: wired by controller in later phase
+ // TODO: wire aggregateServicesSupplier from AkcesAgenticAggregateController (Phase 3)
+ Collections::emptyList);
runtimeBuilder.addExternalEventHandler(eventType, adapter).addDomainEvent(eventType);
errorTypes.forEach(runtimeBuilder::addDomainEvent);
diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java
index 5597da71..ee57be5b 100644
--- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java
+++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java
@@ -151,6 +151,10 @@ public Stream apply(@Nonnull C command, S state) {
AgentProcess agentProcess =
agentPlatform.createAgentProcess(agent, ProcessOptions.DEFAULT, bindings);
+ // Tick to completion. Phase 1 runs the full agent process synchronously.
+ // Timeout configuration and incremental tick support are deferred to a later phase
+ // (see plans/agenttasks.md). The transaction timeout is controlled externally via
+ // the 'akces.agentic.transaction-timeout-ms' property on the producer factory.
while (!agentProcess.getFinished()) {
agentProcess.tick();
}
diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java
index cb7b1083..db09bc95 100644
--- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java
+++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java
@@ -151,6 +151,10 @@ public Stream apply(@Nonnull InputEvent event, S state) {
AgentProcess agentProcess =
agentPlatform.createAgentProcess(agent, ProcessOptions.DEFAULT, bindings);
+ // Tick to completion. Phase 1 runs the full agent process synchronously.
+ // Timeout configuration and incremental tick support are deferred to a later phase
+ // (see plans/agenttasks.md). The transaction timeout is controlled externally via
+ // the 'akces.agentic.transaction-timeout-ms' property on the producer factory.
while (!agentProcess.getFinished()) {
agentProcess.tick();
}
From 138a4752002a600ed87a86d947aad0904824b023 Mon Sep 17 00:00:00 2001
From: Joost van de Wijgerd
Date: Thu, 9 Apr 2026 07:18:57 +0200
Subject: [PATCH 4/7] Apply suggestions from code review
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
---
.../beans/AgenticAggregateRuntimeFactory.java | 14 +++-----
.../AgenticCommandHandlerFunctionAdapter.java | 5 ++-
.../AgenticEventHandlerFunctionAdapter.java | 36 ++++++++++++++-----
3 files changed, 34 insertions(+), 21 deletions(-)
diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java
index 1ef80c91..55fc8f60 100644
--- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java
+++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java
@@ -93,18 +93,12 @@ public class AgenticAggregateRuntimeFactory
LoggerFactory.getLogger(AgenticAggregateRuntimeFactory.class);
/** System error types added to every non-create command handler (including agent-handled). */
- private static final List> COMMAND_HANDLER_SYSTEM_ERRORS = List.of(
- new DomainEventType<>("AggregateNotFoundError", 1,
- AggregateNotFoundErrorEvent.class, false, false, true, false),
- new DomainEventType<>("CommandExecutionError", 1,
- CommandExecutionErrorEvent.class, false, false, true, false)
- );
+ private static final List> COMMAND_HANDLER_SYSTEM_ERRORS =
+ AgenticAggregateBeanFactoryPostProcessor.COMMAND_HANDLER_SYSTEM_ERRORS;
/** System error types added to every non-create event handler (including agent-handled). */
- private static final List> EVENT_HANDLER_SYSTEM_ERRORS = List.of(
- new DomainEventType<>("AggregateNotFoundError", 1,
- AggregateNotFoundErrorEvent.class, false, false, true, false)
- );
+ private static final List> EVENT_HANDLER_SYSTEM_ERRORS =
+ AgenticAggregateBeanFactoryPostProcessor.EVENT_HANDLER_SYSTEM_ERRORS;
private ApplicationContext applicationContext;
private final ObjectMapper objectMapper;
diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java
index ee57be5b..4ac4d2a2 100644
--- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java
+++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java
@@ -63,12 +63,11 @@
* {@code KafkaAggregateRuntime.handleCommand()} flow unchanged, where each event is
* applied through the registered {@code @EventSourcingHandler} methods.
*
- * @param the aggregate state type; must implement both {@link AggregateState} and
- * {@link MemoryAwareState}
+ * @param the aggregate state type; must implement {@link AggregateState}
* @param the command type handled by this adapter
* @param the domain event type produced by this adapter
*/
-public class AgenticCommandHandlerFunctionAdapter
+public class AgenticCommandHandlerFunctionAdapter
implements CommandHandlerFunction {
private static final Logger logger =
diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java
index db09bc95..d1916ffd 100644
--- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java
+++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java
@@ -63,12 +63,11 @@
* {@code KafkaAggregateRuntime.handleEvent()} flow unchanged, where each event is
* applied through the registered {@code @EventSourcingHandler} methods.
*
- * @param the aggregate state type; must implement both {@link AggregateState}
- * and {@link MemoryAwareState}
+ * @param the aggregate state type; must implement {@link AggregateState}
* @param the external domain event type handled by this adapter
* @param the domain event type produced by this adapter
*/
-public class AgenticEventHandlerFunctionAdapter
+public class AgenticEventHandlerFunctionAdapter
implements EventHandlerFunction {
private static final Logger logger =
@@ -151,14 +150,35 @@ public Stream apply(@Nonnull InputEvent event, S state) {
AgentProcess agentProcess =
agentPlatform.createAgentProcess(agent, ProcessOptions.DEFAULT, bindings);
- // Tick to completion. Phase 1 runs the full agent process synchronously.
- // Timeout configuration and incremental tick support are deferred to a later phase
- // (see plans/agenttasks.md). The transaction timeout is controlled externally via
- // the 'akces.agentic.transaction-timeout-ms' property on the producer factory.
- while (!agentProcess.getFinished()) {
+ // Tick to completion with defensive limits so a stuck agent process cannot
+ // block external event handling indefinitely.
+ final long maxTicks = 10_000L;
+ final long timeoutNanos = java.util.concurrent.TimeUnit.SECONDS.toNanos(30);
+ final long deadlineNanos = System.nanoTime() + timeoutNanos;
+ long tickCount = 0L;
+
+ while (!agentProcess.getFinished()
+ && tickCount < maxTicks
+ && System.nanoTime() < deadlineNanos) {
agentProcess.tick();
+ tickCount++;
}
+ if (!agentProcess.getFinished()) {
+ logger.error(
+ "Agent process did not finish within safety limits for event {} on aggregate {}. " +
+ "aggregateId={}, tickCount={}, maxTicks={}, timeoutSeconds={}, status={}",
+ eventType.typeName(),
+ aggregate.getClass().getSimpleName(),
+ state.getAggregateId(),
+ tickCount,
+ maxTicks,
+ java.util.concurrent.TimeUnit.NANOSECONDS.toSeconds(timeoutNanos),
+ agentProcess.getStatus());
+ throw new IllegalStateException(
+ "Agent process exceeded execution limits for event " + eventType.typeName()
+ + " on aggregate " + aggregate.getClass().getSimpleName());
+ }
logger.debug("Agent process completed with status {} for event {} on aggregate {}",
agentProcess.getStatus(), eventType.typeName(), aggregate.getClass().getSimpleName());
From 68f371b8f515b31615e91ddb84ed70f3be07d27a Mon Sep 17 00:00:00 2001
From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com>
Date: Thu, 9 Apr 2026 05:29:07 +0000
Subject: [PATCH 5/7] =?UTF-8?q?fix:=20address=20review=20comments=20?=
=?UTF-8?q?=E2=80=94=20null=20guards=20in=20constructor,=20unknown=20Error?=
=?UTF-8?q?Event=20handling,=20MemoryAwareState=20guard?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
- KafkaAgenticAggregateRuntime: add Objects.requireNonNull for all 4 constructor params
- AgentProcessResultTranslator: accept registeredEventTypes param; exclude unknown ErrorEvents
with WARN log instead of passing them to processDomainEvent() where they would NPE
- Both adapter apply() methods: use instanceof MemoryAwareState guard instead of requiring
S extends MemoryAwareState bound; pass getAllRegisteredEventTypes() to translator
- Fix @AgenticAggregateInfo.agentProducedErrors() Javadoc to accurately describe exclusion
- Add AgentProcessResultTranslatorTest (8 tests) covering all event/error scenarios
- Add 4 constructor null-check tests to KafkaAgenticAggregateRuntimeTest
Agent-Logs-Url: https://github.com/elasticsoftwarefoundation/akces-framework/sessions/d56d40b0-ba4b-4381-8b08-568d5b4098e2
Co-authored-by: jwijgerd <914840+jwijgerd@users.noreply.github.com>
---
.../runtime/AgentProcessResultTranslator.java | 79 +++++--
.../AgenticCommandHandlerFunctionAdapter.java | 24 ++-
.../AgenticEventHandlerFunctionAdapter.java | 24 ++-
.../runtime/KafkaAgenticAggregateRuntime.java | 9 +-
.../AgentProcessResultTranslatorTest.java | 194 ++++++++++++++++++
.../KafkaAgenticAggregateRuntimeTest.java | 33 +++
.../annotations/AgenticAggregateInfo.java | 9 +-
7 files changed, 338 insertions(+), 34 deletions(-)
create mode 100644 main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/AgentProcessResultTranslatorTest.java
diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgentProcessResultTranslator.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgentProcessResultTranslator.java
index 55e33527..a227788b 100644
--- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgentProcessResultTranslator.java
+++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgentProcessResultTranslator.java
@@ -18,26 +18,36 @@
package org.elasticsoftware.akces.agentic.runtime;
import com.embabel.agent.core.Blackboard;
+import org.elasticsoftware.akces.aggregate.DomainEventType;
import org.elasticsoftware.akces.events.DomainEvent;
import org.elasticsoftware.akces.events.ErrorEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
/**
* Utility class that translates the results of an Embabel {@code AgentProcess} back
* into Akces {@link DomainEvent} instances.
*
* After each agent tick (or after the process reaches an end state), call
- * {@link #collectEvents(Blackboard)} to drain {@link DomainEvent} objects from the
- * blackboard. Events are marked as hidden on the blackboard after collection, so
- * subsequent calls to this method will not return the same events again — this
+ * {@link #collectEvents(Blackboard, Collection)} to drain {@link DomainEvent} objects
+ * from the blackboard. Events are marked as hidden on the blackboard after collection,
+ * so subsequent calls to this method will not return the same events again — this
* supports both tick-to-completion and future incremental-tick processing patterns.
*
- *
Undeclared {@link ErrorEvent} instances are accepted at runtime. A warning is
- * logged so operators can detect unexpected error types without causing the transaction
- * to abort.
+ *
Unknown {@link ErrorEvent} types (not declared in {@code agentProducedErrors} and
+ * therefore not registered as {@link DomainEventType}s in the runtime) are logged at
+ * {@code WARN} level and excluded from the returned list. This prevents
+ * {@code processDomainEvent()} from encountering a {@code null} type look-up and
+ * NPE-ing, while still allowing the transaction to commit with a meaningful result.
+ * Standard (non-error) {@link DomainEvent} types are always included and must be
+ * registered; passing an unregistered non-error event to the runtime will result in
+ * a {@code NullPointerException} in {@code processDomainEvent()}.
*/
public final class AgentProcessResultTranslator {
@@ -48,31 +58,56 @@ private AgentProcessResultTranslator() {
}
/**
- * Collects all {@link DomainEvent} objects currently visible on the blackboard
- * and removes them from visible scope (via {@link Blackboard#hide(Object)}) so that
- * they are not collected again on the next call.
+ * Collects all {@link DomainEvent} objects currently visible on the blackboard,
+ * removes them from visible scope (via {@link Blackboard#hide(Object)}), and returns
+ * the subset that can safely be passed to the runtime's {@code processDomainEvent()}
+ * method.
*
- *
Any {@link ErrorEvent} instances found are logged at {@code WARN} level so
- * that operators can detect unexpected error types emitted by the agent.
+ *
For every collected event:
+ *
+ * - Non-error {@link DomainEvent}s are always included and passed through as-is.
+ * - {@link ErrorEvent}s whose class is present in {@code registeredEventTypes} are
+ * included.
+ * - {@link ErrorEvent}s whose class is not present in
+ * {@code registeredEventTypes} are logged at {@code WARN} level and excluded.
+ * Excluding them prevents a {@code NullPointerException} inside
+ * {@code KafkaAggregateRuntime.processDomainEvent()} that would otherwise occur
+ * when the runtime looks up the unregistered type. The transaction still commits
+ * normally; only the unknown error event is silently dropped.
+ *
*
- * @param blackboard the agent process blackboard to drain events from
- * @return an unmodifiable list of collected domain events in the order they
- * were encountered on the blackboard; never {@code null}, may be empty
+ * @param blackboard the agent process blackboard to drain events from
+ * @param registeredEventTypes all {@link DomainEventType}s registered with the runtime;
+ * used to verify that agent-produced {@link ErrorEvent}s are
+ * known before passing them downstream
+ * @return an unmodifiable list of domain events that are safe to pass to the runtime;
+ * never {@code null}, may be empty
*/
- public static List collectEvents(Blackboard blackboard) {
- List events = blackboard.getObjects().stream()
+ public static List collectEvents(Blackboard blackboard,
+ Collection> registeredEventTypes) {
+ Set> registeredClasses = registeredEventTypes.stream()
+ .map(DomainEventType::typeClass)
+ .collect(Collectors.toSet());
+
+ List allEvents = blackboard.getObjects().stream()
.filter(o -> o instanceof DomainEvent)
.map(o -> (DomainEvent) o)
.toList();
- for (DomainEvent event : events) {
- if (event instanceof ErrorEvent) {
- logger.warn("Agent produced error event of type {}: {}",
+ List result = new ArrayList<>(allEvents.size());
+ for (DomainEvent event : allEvents) {
+ blackboard.hide(event);
+ if (event instanceof ErrorEvent && !registeredClasses.contains(event.getClass())) {
+ logger.warn(
+ "Agent produced undeclared error event of type '{}' which is not registered " +
+ "as a DomainEventType. The event will be excluded from processing to " +
+ "prevent a runtime failure. Declare it in agentProducedErrors to enable " +
+ "serialization and service discovery. Event: {}",
event.getClass().getName(), event);
+ } else {
+ result.add(event);
}
- blackboard.hide(event);
}
-
- return events;
+ return List.copyOf(result);
}
}
diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java
index 4ac4d2a2..53c1473c 100644
--- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java
+++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java
@@ -29,6 +29,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
@@ -141,11 +142,14 @@ public Stream apply(@Nonnull C command, S state) {
bindings.put("command", command);
bindings.put("state", state);
bindings.put("agenticAggregateId", state.getAggregateId());
- bindings.put("memories", state.getMemories());
+ List memories = state instanceof MemoryAwareState mas
+ ? mas.getMemories()
+ : List.of();
+ bindings.put("memories", memories);
bindings.put("aggregateServices", aggregateServicesSupplier.get());
bindings.put("isCommandProcessing", true);
bindings.put("isExternalEvent", false);
- bindings.put("hasMemories", !state.getMemories().isEmpty());
+ bindings.put("hasMemories", !memories.isEmpty());
AgentProcess agentProcess =
agentPlatform.createAgentProcess(agent, ProcessOptions.DEFAULT, bindings);
@@ -162,7 +166,7 @@ public Stream apply(@Nonnull C command, S state) {
agentProcess.getStatus(), commandType.typeName(), aggregate.getClass().getSimpleName());
return (Stream) AgentProcessResultTranslator
- .collectEvents(agentProcess.getBlackboard())
+ .collectEvents(agentProcess.getBlackboard(), getAllRegisteredEventTypes())
.stream();
}
@@ -197,4 +201,18 @@ public List> getProducedDomainEventTypes() {
public List> getErrorEventTypes() {
return errorEventTypes;
}
+
+ /**
+ * Returns all domain event types this adapter may produce (both state-changing and error types).
+ * Used by {@link AgentProcessResultTranslator} to filter out unknown {@link org.elasticsoftware.akces.events.ErrorEvent}
+ * instances that are not registered with the runtime.
+ *
+ * @return combined list of produced and error domain event types
+ */
+ private List> getAllRegisteredEventTypes() {
+ List> all = new ArrayList<>(producedDomainEventTypes.size() + errorEventTypes.size());
+ all.addAll(producedDomainEventTypes);
+ all.addAll(errorEventTypes);
+ return all;
+ }
}
diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java
index d1916ffd..fd0b35c0 100644
--- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java
+++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java
@@ -29,6 +29,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
@@ -141,11 +142,14 @@ public Stream apply(@Nonnull InputEvent event, S state) {
bindings.put("event", event);
bindings.put("state", state);
bindings.put("agenticAggregateId", state.getAggregateId());
- bindings.put("memories", state.getMemories());
+ List memories = state instanceof MemoryAwareState mas
+ ? mas.getMemories()
+ : List.of();
+ bindings.put("memories", memories);
bindings.put("aggregateServices", aggregateServicesSupplier.get());
bindings.put("isCommandProcessing", false);
bindings.put("isExternalEvent", true);
- bindings.put("hasMemories", !state.getMemories().isEmpty());
+ bindings.put("hasMemories", !memories.isEmpty());
AgentProcess agentProcess =
agentPlatform.createAgentProcess(agent, ProcessOptions.DEFAULT, bindings);
@@ -183,7 +187,7 @@ public Stream apply(@Nonnull InputEvent event, S state) {
agentProcess.getStatus(), eventType.typeName(), aggregate.getClass().getSimpleName());
return (Stream) AgentProcessResultTranslator
- .collectEvents(agentProcess.getBlackboard())
+ .collectEvents(agentProcess.getBlackboard(), getAllRegisteredEventTypes())
.stream();
}
@@ -217,4 +221,18 @@ public List> getProducedDomainEventTypes() {
public List> getErrorEventTypes() {
return errorEventTypes;
}
+
+ /**
+ * Returns all domain event types this adapter may produce (both state-changing and error types).
+ * Used by {@link AgentProcessResultTranslator} to filter out unknown {@link org.elasticsoftware.akces.events.ErrorEvent}
+ * instances that are not registered with the runtime.
+ *
+ * @return combined list of produced and error domain event types
+ */
+ private List> getAllRegisteredEventTypes() {
+ List> all = new ArrayList<>(producedDomainEventTypes.size() + errorEventTypes.size());
+ all.addAll(producedDomainEventTypes);
+ all.addAll(errorEventTypes);
+ return all;
+ }
}
diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntime.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntime.java
index dd58d15a..3597ea8b 100644
--- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntime.java
+++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntime.java
@@ -37,6 +37,7 @@
import java.io.IOException;
import java.util.Collection;
import java.util.List;
+import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
@@ -68,10 +69,10 @@ public KafkaAgenticAggregateRuntime(AggregateRuntime delegate,
ObjectMapper objectMapper,
Class extends AggregateState> stateClass,
AgentPlatform agentPlatform) {
- this.delegate = delegate;
- this.objectMapper = objectMapper;
- this.stateClass = stateClass;
- this.agentPlatform = agentPlatform;
+ this.delegate = Objects.requireNonNull(delegate, "delegate must not be null");
+ this.objectMapper = Objects.requireNonNull(objectMapper, "objectMapper must not be null");
+ this.stateClass = Objects.requireNonNull(stateClass, "stateClass must not be null");
+ this.agentPlatform = Objects.requireNonNull(agentPlatform, "agentPlatform must not be null");
}
// -------------------------------------------------------------------------
diff --git a/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/AgentProcessResultTranslatorTest.java b/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/AgentProcessResultTranslatorTest.java
new file mode 100644
index 00000000..116a8d66
--- /dev/null
+++ b/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/AgentProcessResultTranslatorTest.java
@@ -0,0 +1,194 @@
+/*
+ * Copyright 2022 - 2026 The Original Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.elasticsoftware.akces.agentic.runtime;
+
+import com.embabel.agent.core.Blackboard;
+import com.embabel.agent.core.support.InMemoryBlackboard;
+import org.elasticsoftware.akces.aggregate.DomainEventType;
+import org.elasticsoftware.akces.annotations.AggregateIdentifier;
+import org.elasticsoftware.akces.annotations.DomainEventInfo;
+import org.elasticsoftware.akces.events.DomainEvent;
+import org.elasticsoftware.akces.events.ErrorEvent;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Unit tests for {@link AgentProcessResultTranslator}, verifying event collection,
+ * blackboard draining, and handling of registered vs. unregistered error events.
+ */
+class AgentProcessResultTranslatorTest {
+
+ // -------------------------------------------------------------------------
+ // Test fixtures
+ // -------------------------------------------------------------------------
+
+ @DomainEventInfo(type = "TestStateChanged", version = 1)
+ record TestStateChangedEvent(@AggregateIdentifier String id) implements DomainEvent {
+ @Override
+ public String getAggregateId() {
+ return id;
+ }
+ }
+
+ @DomainEventInfo(type = "TestRegisteredError", version = 1)
+ record TestRegisteredErrorEvent(@AggregateIdentifier String id) implements ErrorEvent {
+ @Override
+ public String getAggregateId() {
+ return id;
+ }
+ }
+
+ @DomainEventInfo(type = "TestUnregisteredError", version = 1)
+ record TestUnregisteredErrorEvent(@AggregateIdentifier String id) implements ErrorEvent {
+ @Override
+ public String getAggregateId() {
+ return id;
+ }
+ }
+
+ private static final DomainEventType STATE_CHANGED_TYPE =
+ new DomainEventType<>("TestStateChanged", 1, TestStateChangedEvent.class,
+ false, false, false, false);
+
+ private static final DomainEventType REGISTERED_ERROR_TYPE =
+ new DomainEventType<>("TestRegisteredError", 1, TestRegisteredErrorEvent.class,
+ false, false, true, false);
+
+ private Blackboard blackboard;
+
+ @BeforeEach
+ void setUp() {
+ blackboard = new InMemoryBlackboard();
+ }
+
+ // -------------------------------------------------------------------------
+ // Empty blackboard
+ // -------------------------------------------------------------------------
+
+ @Test
+ void collectEventsShouldReturnEmptyListForEmptyBlackboard() {
+ List events = AgentProcessResultTranslator.collectEvents(blackboard, List.of());
+ assertThat(events).isEmpty();
+ }
+
+ // -------------------------------------------------------------------------
+ // Non-error domain events
+ // -------------------------------------------------------------------------
+
+ @Test
+ void collectEventsShouldIncludeRegisteredStateChangingEvent() {
+ TestStateChangedEvent event = new TestStateChangedEvent("agg-1");
+ blackboard.addObject(event);
+
+ List events = AgentProcessResultTranslator.collectEvents(
+ blackboard, List.of(STATE_CHANGED_TYPE));
+
+ assertThat(events).containsExactly(event);
+ }
+
+ @Test
+ void collectEventsShouldHideEventsAfterCollection() {
+ TestStateChangedEvent event = new TestStateChangedEvent("agg-1");
+ blackboard.addObject(event);
+
+ AgentProcessResultTranslator.collectEvents(blackboard, List.of(STATE_CHANGED_TYPE));
+
+ // Second call should return nothing — events were hidden on first call
+ List second = AgentProcessResultTranslator.collectEvents(
+ blackboard, List.of(STATE_CHANGED_TYPE));
+ assertThat(second).isEmpty();
+ }
+
+ @Test
+ void collectEventsShouldNotIncludeNonDomainEventObjects() {
+ blackboard.addObject("not an event");
+ blackboard.addObject(42);
+ blackboard.addObject(new TestStateChangedEvent("agg-1"));
+
+ List events = AgentProcessResultTranslator.collectEvents(
+ blackboard, List.of(STATE_CHANGED_TYPE));
+
+ assertThat(events).hasSize(1);
+ assertThat(events.getFirst()).isInstanceOf(TestStateChangedEvent.class);
+ }
+
+ // -------------------------------------------------------------------------
+ // Registered error events — must be included
+ // -------------------------------------------------------------------------
+
+ @Test
+ void collectEventsShouldIncludeRegisteredErrorEvent() {
+ TestRegisteredErrorEvent error = new TestRegisteredErrorEvent("agg-1");
+ blackboard.addObject(error);
+
+ List events = AgentProcessResultTranslator.collectEvents(
+ blackboard, List.of(REGISTERED_ERROR_TYPE));
+
+ assertThat(events).containsExactly(error);
+ }
+
+ // -------------------------------------------------------------------------
+ // Unregistered error events — must be excluded with warning
+ // -------------------------------------------------------------------------
+
+ @Test
+ void collectEventsShouldExcludeUnregisteredErrorEvent() {
+ TestUnregisteredErrorEvent unknownError = new TestUnregisteredErrorEvent("agg-1");
+ blackboard.addObject(unknownError);
+
+ // Only STATE_CHANGED_TYPE and REGISTERED_ERROR_TYPE are registered — not the unknown one
+ List events = AgentProcessResultTranslator.collectEvents(
+ blackboard, List.of(STATE_CHANGED_TYPE, REGISTERED_ERROR_TYPE));
+
+ assertThat(events).isEmpty();
+ }
+
+ @Test
+ void collectEventsShouldExcludeUnregisteredErrorAndKeepRegisteredEvents() {
+ TestStateChangedEvent stateEvent = new TestStateChangedEvent("agg-1");
+ TestRegisteredErrorEvent registeredError = new TestRegisteredErrorEvent("agg-1");
+ TestUnregisteredErrorEvent unknownError = new TestUnregisteredErrorEvent("agg-1");
+ blackboard.addObject(stateEvent);
+ blackboard.addObject(registeredError);
+ blackboard.addObject(unknownError);
+
+ List events = AgentProcessResultTranslator.collectEvents(
+ blackboard, List.of(STATE_CHANGED_TYPE, REGISTERED_ERROR_TYPE));
+
+ assertThat(events).containsExactlyInAnyOrder(stateEvent, registeredError);
+ assertThat(events).doesNotContain(unknownError);
+ }
+
+ @Test
+ void collectEventsShouldHideUnregisteredErrorEventSoItIsNotReturnedAgain() {
+ TestUnregisteredErrorEvent unknownError = new TestUnregisteredErrorEvent("agg-1");
+ blackboard.addObject(unknownError);
+
+ AgentProcessResultTranslator.collectEvents(
+ blackboard, List.of(STATE_CHANGED_TYPE, REGISTERED_ERROR_TYPE));
+
+ // The unknown error was hidden — second call also returns nothing
+ List second = AgentProcessResultTranslator.collectEvents(
+ blackboard, List.of(STATE_CHANGED_TYPE, REGISTERED_ERROR_TYPE));
+ assertThat(second).isEmpty();
+ }
+}
diff --git a/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntimeTest.java b/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntimeTest.java
index bdd24a47..f3b15b88 100644
--- a/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntimeTest.java
+++ b/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntimeTest.java
@@ -37,6 +37,7 @@
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNullPointerException;
import static org.mockito.Mockito.*;
/**
@@ -221,6 +222,38 @@ void getAgentPlatformShouldReturnInjectedPlatform() {
assertThat(runtime.getAgentPlatform()).isSameAs(agentPlatform);
}
+ @Test
+ void constructorShouldRejectNullDelegate() {
+ assertThatNullPointerException()
+ .isThrownBy(() -> new KafkaAgenticAggregateRuntime(
+ null, objectMapper, TestMemoryState.class, agentPlatform))
+ .withMessageContaining("delegate");
+ }
+
+ @Test
+ void constructorShouldRejectNullObjectMapper() {
+ assertThatNullPointerException()
+ .isThrownBy(() -> new KafkaAgenticAggregateRuntime(
+ delegate, null, TestMemoryState.class, agentPlatform))
+ .withMessageContaining("objectMapper");
+ }
+
+ @Test
+ void constructorShouldRejectNullStateClass() {
+ assertThatNullPointerException()
+ .isThrownBy(() -> new KafkaAgenticAggregateRuntime(
+ delegate, objectMapper, null, agentPlatform))
+ .withMessageContaining("stateClass");
+ }
+
+ @Test
+ void constructorShouldRejectNullAgentPlatform() {
+ assertThatNullPointerException()
+ .isThrownBy(() -> new KafkaAgenticAggregateRuntime(
+ delegate, objectMapper, TestMemoryState.class, null))
+ .withMessageContaining("agentPlatform");
+ }
+
// -------------------------------------------------------------------------
// Built-in DomainEventType constants
// -------------------------------------------------------------------------
diff --git a/main/api/src/main/java/org/elasticsoftware/akces/annotations/AgenticAggregateInfo.java b/main/api/src/main/java/org/elasticsoftware/akces/annotations/AgenticAggregateInfo.java
index b06ab1db..e941312f 100644
--- a/main/api/src/main/java/org/elasticsoftware/akces/annotations/AgenticAggregateInfo.java
+++ b/main/api/src/main/java/org/elasticsoftware/akces/annotations/AgenticAggregateInfo.java
@@ -87,8 +87,13 @@
* {@code @DomainEventInfo}.
*
* These types are registered as {@code DomainEventType(error=true)} for JSON schema
- * validation and service discovery. At runtime, undeclared {@link ErrorEvent} instances
- * emitted by the agent are accepted with a warning rather than rejected.
+ * validation and service discovery. Agent-produced error events that are not
+ * declared here are not registered with the runtime and will be silently excluded from
+ * processing — they will not be serialized to the Kafka domain-events topic. A warning
+ * is logged for each excluded event so that operators can detect the misconfiguration.
+ *
+ *
To ensure an error event produced by the agent is persisted and visible to
+ * downstream consumers, it must be declared here.
*/
Class extends ErrorEvent>[] agentProducedErrors() default {};
}
From 77c35e57d59361a88c7e849041c59ca80ab31a75 Mon Sep 17 00:00:00 2001
From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com>
Date: Thu, 9 Apr 2026 05:41:20 +0000
Subject: [PATCH 6/7] fix: add defensive tick limit to
AgenticCommandHandlerFunctionAdapter matching EventHandlerFunctionAdapter
Agent-Logs-Url: https://github.com/elasticsoftwarefoundation/akces-framework/sessions/2d9121bb-7262-4cfc-b4fd-a41735f4aa01
Co-authored-by: jwijgerd <914840+jwijgerd@users.noreply.github.com>
---
.../AgenticCommandHandlerFunctionAdapter.java | 32 ++++++++++++++++---
1 file changed, 27 insertions(+), 5 deletions(-)
diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java
index 53c1473c..718f90cf 100644
--- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java
+++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java
@@ -154,12 +154,34 @@ public Stream apply(@Nonnull C command, S state) {
AgentProcess agentProcess =
agentPlatform.createAgentProcess(agent, ProcessOptions.DEFAULT, bindings);
- // Tick to completion. Phase 1 runs the full agent process synchronously.
- // Timeout configuration and incremental tick support are deferred to a later phase
- // (see plans/agenttasks.md). The transaction timeout is controlled externally via
- // the 'akces.agentic.transaction-timeout-ms' property on the producer factory.
- while (!agentProcess.getFinished()) {
+ // Tick to completion with defensive limits so a stuck agent process cannot
+ // block command handling indefinitely.
+ final long maxTicks = 10_000L;
+ final long timeoutNanos = java.util.concurrent.TimeUnit.SECONDS.toNanos(30);
+ final long deadlineNanos = System.nanoTime() + timeoutNanos;
+ long tickCount = 0L;
+
+ while (!agentProcess.getFinished()
+ && tickCount < maxTicks
+ && System.nanoTime() < deadlineNanos) {
agentProcess.tick();
+ tickCount++;
+ }
+
+ if (!agentProcess.getFinished()) {
+ logger.error(
+ "Agent process did not finish within safety limits for command {} on aggregate {}. " +
+ "aggregateId={}, tickCount={}, maxTicks={}, timeoutSeconds={}, status={}",
+ commandType.typeName(),
+ aggregate.getClass().getSimpleName(),
+ state.getAggregateId(),
+ tickCount,
+ maxTicks,
+ java.util.concurrent.TimeUnit.NANOSECONDS.toSeconds(timeoutNanos),
+ agentProcess.getStatus());
+ throw new IllegalStateException(
+ "Agent process exceeded execution limits for command " + commandType.typeName()
+ + " on aggregate " + aggregate.getClass().getSimpleName());
}
logger.debug("Agent process completed with status {} for command {} on aggregate {}",
From 2c96ce24b3bd10df22ef642d0e77970ea379c238 Mon Sep 17 00:00:00 2001
From: Joost van de Wijgerd
Date: Thu, 9 Apr 2026 07:42:57 +0200
Subject: [PATCH 7/7] Apply suggestions from code review
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
---
.../akces/agentic/beans/AgenticAggregateRuntimeFactory.java | 2 --
.../agentic/runtime/AgenticEventHandlerFunctionAdapter.java | 1 -
2 files changed, 3 deletions(-)
diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java
index 55fc8f60..d3a90693 100644
--- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java
+++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java
@@ -30,8 +30,6 @@
import org.elasticsoftware.akces.annotations.CommandInfo;
import org.elasticsoftware.akces.annotations.DomainEventInfo;
import org.elasticsoftware.akces.commands.Command;
-import org.elasticsoftware.akces.errors.AggregateNotFoundErrorEvent;
-import org.elasticsoftware.akces.errors.CommandExecutionErrorEvent;
import org.elasticsoftware.akces.events.DomainEvent;
import org.elasticsoftware.akces.events.ErrorEvent;
import org.elasticsoftware.akces.kafka.KafkaAggregateRuntime;
diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java
index fd0b35c0..5a07ec43 100644
--- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java
+++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java
@@ -23,7 +23,6 @@
import com.embabel.agent.core.ProcessOptions;
import jakarta.annotation.Nonnull;
import org.elasticsoftware.akces.aggregate.*;
-import org.elasticsoftware.akces.commands.Command;
import org.elasticsoftware.akces.control.AggregateServiceRecord;
import org.elasticsoftware.akces.events.DomainEvent;
import org.slf4j.Logger;