diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java index d990d0f9..aa587bf4 100644 --- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java @@ -17,7 +17,6 @@ package org.elasticsoftware.akces.agentic.beans; -import com.embabel.agent.core.Agent; import com.embabel.agent.core.AgentPlatform; import org.elasticsoftware.akces.agentic.AgenticAggregateRuntime; import org.elasticsoftware.akces.agentic.runtime.AgenticCommandHandlerFunctionAdapter; @@ -76,10 +75,9 @@ * Embabel agent platform.

* *

When {@code agentHandledCommands} or {@code agentHandledEvents} are declared, the - * factory looks up an {@link Agent} bean named {@code {aggregateName}Agent} from the - * {@link ApplicationContext}. This {@link Agent} is provided by the implementing - * application and must be registered as a Spring bean before this factory is invoked. - * A fatal error is raised if the bean cannot be found.

+ * factory creates adapter instances that resolve the matching agent at runtime from + * the {@link AgentPlatform} by aggregate name. No eager bean lookup is performed at + * startup — agent resolution is deferred to command/event processing time.

* *

Produces a {@link KafkaAgenticAggregateRuntime} wrapping the internally built * {@link KafkaAggregateRuntime}, adding memory-aware and agent-platform operations.

@@ -160,30 +158,6 @@ private AgentPlatform resolveAgentPlatform(String aggregateName) { } } - /** - * Resolves the {@link Agent} for this aggregate from the {@link ApplicationContext}. - * - *

Looks for a Spring bean of type {@link Agent} named {@code {aggregateName}Agent}. - * The implementing application is responsible for registering this bean. If no such - * bean is found, a fatal error is raised. - * - * @param aggregateName the aggregate name used to derive the agent bean name - * @return the resolved {@link Agent}; never {@code null} - * @throws IllegalStateException if no matching {@link Agent} bean is found - */ - private Agent resolveAgent(String aggregateName) { - String agentBeanName = aggregateName + "Agent"; - try { - return applicationContext.getBean(agentBeanName, Agent.class); - } catch (BeansException e) { - throw new IllegalStateException( - "No Agent bean found with name '" + agentBeanName + "' for agentic aggregate '" - + aggregateName + "'. " - + "The implementing application must provide a Spring bean of type " - + "com.embabel.agent.core.Agent named '" + agentBeanName + "'.", e); - } - } - @SuppressWarnings("unchecked") private KafkaAggregateRuntime createRuntime(AgenticAggregateInfo agenticInfo, AgenticAggregate aggregate, @@ -295,22 +269,17 @@ private KafkaAggregateRuntime createRuntime(AgenticAggregateInfo agenticInfo, buildAgentProducedErrorTypes(agenticInfo.agentProducedErrors()); agentProducedErrorTypes.forEach(runtimeBuilder::addDomainEvent); - // Lazy Agent resolution: only needed when agent-handled commands or events exist. - boolean hasAgentHandlers = agenticInfo.agentHandledCommands().length > 0 - || agenticInfo.agentHandledEvents().length > 0; - Agent agent = hasAgentHandlers ? resolveAgent(agenticInfo.value()) : null; - // Process agentHandledCommands — register AgenticCommandHandlerFunctionAdapter for (Class commandClass : agenticInfo.agentHandledCommands()) { processAgentHandledCommand( - commandClass, aggregate, agentPlatform, agent, + commandClass, aggregate, agenticInfo.value(), agentPlatform, agentProducedErrorTypes, runtimeBuilder); } // Process agentHandledEvents — register AgenticEventHandlerFunctionAdapter for (Class eventClass : agenticInfo.agentHandledEvents()) { processAgentHandledEvent( - eventClass, aggregate, agentPlatform, agent, + eventClass, aggregate, agenticInfo.value(), agentPlatform, agentProducedErrorTypes, runtimeBuilder); } @@ -352,8 +321,8 @@ private List> buildAgentProducedErrorTypes( * * @param commandClass the command class to register * @param aggregate the owning aggregate instance + * @param aggregateName the aggregate name used for agent inference at runtime * @param agentPlatform the Embabel platform - * @param agent the deployed {@link Agent} for this aggregate * @param agentProducedErrors the error types the agent may produce * @param runtimeBuilder the runtime builder to register with */ @@ -361,8 +330,8 @@ private List> buildAgentProducedErrorTypes( private void processAgentHandledCommand( Class commandClass, AgenticAggregate aggregate, + String aggregateName, AgentPlatform agentPlatform, - Agent agent, List> agentProducedErrors, KafkaAggregateRuntime.Builder runtimeBuilder) { @@ -386,9 +355,9 @@ private void processAgentHandledCommand( AgenticCommandHandlerFunctionAdapter adapter = new AgenticCommandHandlerFunctionAdapter<>( (AgenticAggregate) aggregate, + aggregateName, commandType, agentPlatform, - agent, (List) List.of(), // producedDomainEventTypes: empty — events are registered via EventSourcingHandler adapters (List) errorTypes, // TODO: wire aggregateServicesSupplier from AkcesAgenticAggregateController (Phase 3) @@ -408,8 +377,8 @@ private void processAgentHandledCommand( * * @param eventClass the external domain event class to register * @param aggregate the owning aggregate instance + * @param aggregateName the aggregate name used for agent inference at runtime * @param agentPlatform the Embabel platform - * @param agent the deployed {@link Agent} for this aggregate * @param agentProducedErrors the error types the agent may produce * @param runtimeBuilder the runtime builder to register with */ @@ -417,8 +386,8 @@ private void processAgentHandledCommand( private void processAgentHandledEvent( Class eventClass, AgenticAggregate aggregate, + String aggregateName, AgentPlatform agentPlatform, - Agent agent, List> agentProducedErrors, KafkaAggregateRuntime.Builder runtimeBuilder) { @@ -449,9 +418,9 @@ private void processAgentHandledEvent( AgenticEventHandlerFunctionAdapter adapter = new AgenticEventHandlerFunctionAdapter<>( (AgenticAggregate) aggregate, + aggregateName, eventType, agentPlatform, - agent, (List) List.of(), // producedDomainEventTypes: empty — events are registered via EventSourcingHandler adapters (List) errorTypes, // TODO: wire aggregateServicesSupplier from AkcesAgenticAggregateController (Phase 3) diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/embabel/DefaultAgent.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/embabel/DefaultAgent.java new file mode 100644 index 00000000..7d2c1618 --- /dev/null +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/embabel/DefaultAgent.java @@ -0,0 +1,39 @@ +/* + * Copyright 2022 - 2026 The Original Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.elasticsoftware.akces.agentic.embabel; + +import com.embabel.agent.api.annotation.Agent; +import com.embabel.agent.api.common.PlannerType; + +/** + * The default Embabel agent that is always present on the {@link com.embabel.agent.core.AgentPlatform}. + * This agent is used as a fallback when no other agent matches the aggregate name during + * agent resolution in the agentic command and event handler adapters. + * + *

Uses the {@link PlannerType#UTILITY UTILITY} planner for flexible goal-based planning. + * Actions and conditions will be added as the agentic framework evolves. + */ +@Agent(name = DefaultAgent.AGENT_NAME, description = "Default Akces agentic aggregate agent", planner = PlannerType.UTILITY) +public class DefaultAgent { + + /** + * The well-known agent name used to locate this default agent on the + * {@link com.embabel.agent.core.AgentPlatform} during fallback resolution. + */ + public static final String AGENT_NAME = "DefaultAgent"; +} diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java index 718f90cf..0e2dd539 100644 --- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapter.java @@ -22,6 +22,7 @@ import com.embabel.agent.core.AgentProcess; import com.embabel.agent.core.ProcessOptions; import jakarta.annotation.Nonnull; +import org.elasticsoftware.akces.agentic.embabel.DefaultAgent; import org.elasticsoftware.akces.aggregate.*; import org.elasticsoftware.akces.commands.Command; import org.elasticsoftware.akces.control.AggregateServiceRecord; @@ -46,11 +47,14 @@ *

    *
  1. Assembles a bindings {@link Map} containing the command, current aggregate state, * memories, aggregate service records, and condition flags.
  2. + *
  3. Attempts to resolve an {@link Agent} from the {@link AgentPlatform} by matching + * the aggregate name against deployed agent names (exact match or + * {@code {aggregateName}Agent} suffix match). If no match is found, the + * {@link DefaultAgent} is used as a fallback.
  4. *
  5. Creates an {@link AgentProcess} via - * {@link AgentPlatform#createAgentProcess(Agent, ProcessOptions, Map)} using the - * provided {@link Agent} and the assembled bindings.
  6. - *
  7. Calls {@link AgentProcess#tick()} in a loop until the process reaches an end - * state (completed, failed, terminated, or killed).
  8. + * {@link AgentPlatform#createAgentProcess(Agent, ProcessOptions, Map)} and calls + * {@link AgentProcess#tick()} in a loop until the process reaches an end state + * (completed, failed, terminated, or killed). *
  9. Collects {@link DomainEvent} objects placed on the agent's blackboard via * {@link AgentProcessResultTranslator#collectEvents} and returns them as a * {@link Stream}.
  10. @@ -75,9 +79,9 @@ public class AgenticCommandHandlerFunctionAdapter aggregate; + private final String aggregateName; private final CommandType commandType; private final AgentPlatform agentPlatform; - private final Agent agent; private final List> producedDomainEventTypes; private final List> errorEventTypes; private final Supplier> aggregateServicesSupplier; @@ -86,10 +90,10 @@ public class AgenticCommandHandlerFunctionAdapter aggregate, + String aggregateName, CommandType commandType, AgentPlatform agentPlatform, - Agent agent, List> producedDomainEventTypes, List> errorEventTypes, Supplier> aggregateServicesSupplier) { this.aggregate = aggregate; + this.aggregateName = aggregateName; this.commandType = commandType; this.agentPlatform = agentPlatform; - this.agent = agent; this.producedDomainEventTypes = List.copyOf(producedDomainEventTypes); this.errorEventTypes = List.copyOf(errorEventTypes); this.aggregateServicesSupplier = aggregateServicesSupplier; @@ -136,7 +140,7 @@ public AgenticCommandHandlerFunctionAdapter( @SuppressWarnings("unchecked") public Stream apply(@Nonnull C command, S state) { logger.debug("Processing agent-handled command {} for aggregate {}", - commandType.typeName(), aggregate.getClass().getSimpleName()); + commandType.typeName(), aggregateName); Map bindings = new LinkedHashMap<>(); bindings.put("command", command); @@ -151,41 +155,15 @@ public Stream apply(@Nonnull C command, S state) { bindings.put("isExternalEvent", false); bindings.put("hasMemories", !memories.isEmpty()); - AgentProcess agentProcess = - agentPlatform.createAgentProcess(agent, ProcessOptions.DEFAULT, bindings); - - // Tick to completion with defensive limits so a stuck agent process cannot - // block command handling indefinitely. - final long maxTicks = 10_000L; - final long timeoutNanos = java.util.concurrent.TimeUnit.SECONDS.toNanos(30); - final long deadlineNanos = System.nanoTime() + timeoutNanos; - long tickCount = 0L; - - while (!agentProcess.getFinished() - && tickCount < maxTicks - && System.nanoTime() < deadlineNanos) { - agentProcess.tick(); - tickCount++; - } - - if (!agentProcess.getFinished()) { - logger.error( - "Agent process did not finish within safety limits for command {} on aggregate {}. " + - "aggregateId={}, tickCount={}, maxTicks={}, timeoutSeconds={}, status={}", - commandType.typeName(), - aggregate.getClass().getSimpleName(), - state.getAggregateId(), - tickCount, - maxTicks, - java.util.concurrent.TimeUnit.NANOSECONDS.toSeconds(timeoutNanos), - agentProcess.getStatus()); - throw new IllegalStateException( - "Agent process exceeded execution limits for command " + commandType.typeName() - + " on aggregate " + aggregate.getClass().getSimpleName()); - } + Agent resolvedAgent = resolveAgentByName(agentPlatform, aggregateName); + logger.debug("Resolved agent '{}' for aggregate '{}'", + resolvedAgent.getName(), aggregateName); + AgentProcess agentProcess = agentPlatform.createAgentProcess( + resolvedAgent, ProcessOptions.DEFAULT, bindings); + tickToCompletion(agentProcess); logger.debug("Agent process completed with status {} for command {} on aggregate {}", - agentProcess.getStatus(), commandType.typeName(), aggregate.getClass().getSimpleName()); + agentProcess.getStatus(), commandType.typeName(), aggregateName); return (Stream) AgentProcessResultTranslator .collectEvents(agentProcess.getBlackboard(), getAllRegisteredEventTypes()) @@ -237,4 +215,111 @@ private List> getAllRegisteredEventTypes() { all.addAll(errorEventTypes); return all; } + + /** + * Resolves an {@link Agent} from the {@link AgentPlatform} by matching deployed agent + * names against the given aggregate name. If no match is found, the + * {@link DefaultAgent} is returned as a fallback. + * + *

    Matching rules (checked in order): + *

      + *
    1. Exact match: {@code agent.getName().equals(aggregateName)}
    2. + *
    3. Agent-suffix match: {@code agent.getName().equals(aggregateName + "Agent")}
    4. + *
    5. Default: the {@link DefaultAgent} identified by {@link DefaultAgent#AGENT_NAME}
    6. + *
    + * + * @param platform the agent platform containing deployed agents + * @param aggregateName the aggregate name to match against + * @return the matched or default agent; never {@code null} + * @throws IllegalStateException if the {@link DefaultAgent} is not deployed on the platform + */ + static Agent resolveAgentByName(AgentPlatform platform, String aggregateName) { + String suffixName = aggregateName + "Agent"; + Collection agents = platform.agents(); + Agent suffixMatch = null; + Agent defaultAgentMatch = null; + + for (Agent agent : agents) { + String agentName = agent.getName(); + if (agentName.equals(aggregateName)) { + return agent; + } + if (suffixMatch == null && agentName.equals(suffixName)) { + suffixMatch = agent; + } + if (defaultAgentMatch == null && agentName.equals(DefaultAgent.AGENT_NAME)) { + defaultAgentMatch = agent; + } + } + + if (suffixMatch != null) { + return suffixMatch; + } + + // No name match — fall back to the DefaultAgent + if (defaultAgentMatch != null) { + logger.info("No agent found matching aggregate name '{}'; using default agent '{}'", + aggregateName, defaultAgentMatch.getName()); + return defaultAgentMatch; + } + + throw new IllegalStateException( + "No DefaultAgent ('" + DefaultAgent.AGENT_NAME + "') is deployed on the AgentPlatform. " + + "At least the DefaultAgent must be available to handle aggregate '" + + aggregateName + "'."); + } + + /** + * Ticks an {@link AgentProcess} to completion with defensive limits so a stuck agent + * process cannot block command or event handling indefinitely. + * + *

    This overload preserves the existing call sites and attempts to resolve the + * aggregate identifier from SLF4J MDC using the {@code aggregateId} key. + * + * @param agentProcess the agent process to drive to completion + * @throws IllegalStateException if the process does not finish within the safety limits + */ + private void tickToCompletion(AgentProcess agentProcess) { + String aggregateId = org.slf4j.MDC.get("aggregateId"); + tickToCompletion(agentProcess, aggregateId != null ? aggregateId : ""); + } + + /** + * Ticks an {@link AgentProcess} to completion with defensive limits so a stuck agent + * process cannot block command or event handling indefinitely. + * + * @param agentProcess the agent process to drive to completion + * @param aggregateId the aggregate instance identifier for diagnostic logging + * @throws IllegalStateException if the process does not finish within the safety limits + */ + private void tickToCompletion(AgentProcess agentProcess, String aggregateId) { + final long maxTicks = 10_000L; + final long timeoutNanos = java.util.concurrent.TimeUnit.SECONDS.toNanos(30); + final long deadlineNanos = System.nanoTime() + timeoutNanos; + long tickCount = 0L; + + while (!agentProcess.getFinished() + && tickCount < maxTicks + && System.nanoTime() < deadlineNanos) { + agentProcess.tick(); + tickCount++; + } + + if (!agentProcess.getFinished()) { + logger.error( + "Agent process did not finish within safety limits for command {} on aggregate {} " + + "with aggregateId {}. tickCount={}, maxTicks={}, timeoutSeconds={}, status={}", + commandType.typeName(), + aggregateName, + aggregateId, + tickCount, + maxTicks, + java.util.concurrent.TimeUnit.NANOSECONDS.toSeconds(timeoutNanos), + agentProcess.getStatus()); + throw new IllegalStateException( + "Agent process exceeded execution limits for command " + commandType.typeName() + + " on aggregate " + aggregateName + + " with aggregateId " + aggregateId); + } + } } diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java index 5a07ec43..c096c57a 100644 --- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapter.java @@ -22,6 +22,7 @@ import com.embabel.agent.core.AgentProcess; import com.embabel.agent.core.ProcessOptions; import jakarta.annotation.Nonnull; +import org.elasticsoftware.akces.agentic.embabel.DefaultAgent; import org.elasticsoftware.akces.aggregate.*; import org.elasticsoftware.akces.control.AggregateServiceRecord; import org.elasticsoftware.akces.events.DomainEvent; @@ -45,11 +46,14 @@ *

      *
    1. Assembles a bindings {@link Map} containing the event, current aggregate state, * memories, aggregate service records, and condition flags.
    2. + *
    3. Attempts to resolve an {@link Agent} from the {@link AgentPlatform} by matching + * the aggregate name against deployed agent names (exact match or + * {@code {aggregateName}Agent} suffix match). If no match is found, the + * {@link DefaultAgent} is used as a fallback.
    4. *
    5. Creates an {@link AgentProcess} via - * {@link AgentPlatform#createAgentProcess(Agent, ProcessOptions, Map)} using the - * provided {@link Agent} and the assembled bindings.
    6. - *
    7. Calls {@link AgentProcess#tick()} in a loop until the process reaches an end - * state (completed, failed, terminated, or killed).
    8. + * {@link AgentPlatform#createAgentProcess(Agent, ProcessOptions, Map)} and calls + * {@link AgentProcess#tick()} in a loop until the process reaches an end state + * (completed, failed, terminated, or killed). *
    9. Collects {@link DomainEvent} objects placed on the agent's blackboard via * {@link AgentProcessResultTranslator#collectEvents} and returns them as a * {@link Stream}.
    10. @@ -74,9 +78,9 @@ public class AgenticEventHandlerFunctionAdapter aggregate; + private final String aggregateName; private final DomainEventType eventType; private final AgentPlatform agentPlatform; - private final Agent agent; private final List> producedDomainEventTypes; private final List> errorEventTypes; private final Supplier> aggregateServicesSupplier; @@ -85,10 +89,10 @@ public class AgenticEventHandlerFunctionAdapter aggregate, + String aggregateName, DomainEventType eventType, AgentPlatform agentPlatform, - Agent agent, List> producedDomainEventTypes, List> errorEventTypes, Supplier> aggregateServicesSupplier) { this.aggregate = aggregate; + this.aggregateName = aggregateName; this.eventType = eventType; this.agentPlatform = agentPlatform; - this.agent = agent; this.producedDomainEventTypes = List.copyOf(producedDomainEventTypes); this.errorEventTypes = List.copyOf(errorEventTypes); this.aggregateServicesSupplier = aggregateServicesSupplier; @@ -131,11 +135,47 @@ public AgenticEventHandlerFunctionAdapter( * @return a stream of domain events produced by the agent; may be empty */ @Nonnull + private static Agent resolveAgentByName(AgentPlatform agentPlatform, String aggregateName) { + String suffixName = aggregateName + "Agent"; + Collection agents = agentPlatform.agents(); + Agent suffixMatch = null; + Agent defaultAgentMatch = null; + + for (Agent agent : agents) { + String agentName = agent.getName(); + if (agentName.equals(aggregateName)) { + return agent; + } + if (suffixMatch == null && agentName.equals(suffixName)) { + suffixMatch = agent; + } + if (defaultAgentMatch == null && agentName.equals(DefaultAgent.AGENT_NAME)) { + defaultAgentMatch = agent; + } + } + + if (suffixMatch != null) { + return suffixMatch; + } + + // No name match — fall back to the DefaultAgent + if (defaultAgentMatch != null) { + logger.info("No agent found matching aggregate name '{}'; using default agent '{}'", + aggregateName, defaultAgentMatch.getName()); + return defaultAgentMatch; + } + + throw new IllegalStateException( + "No DefaultAgent ('" + DefaultAgent.AGENT_NAME + "') is deployed on the AgentPlatform. " + + "At least the DefaultAgent must be available to handle aggregate '" + + aggregateName + "'."); + } + @Override @SuppressWarnings("unchecked") public Stream apply(@Nonnull InputEvent event, S state) { logger.debug("Processing agent-handled external event {} for aggregate {}", - eventType.typeName(), aggregate.getClass().getSimpleName()); + eventType.typeName(), aggregateName); Map bindings = new LinkedHashMap<>(); bindings.put("event", event); @@ -150,40 +190,15 @@ public Stream apply(@Nonnull InputEvent event, S state) { bindings.put("isExternalEvent", true); bindings.put("hasMemories", !memories.isEmpty()); - AgentProcess agentProcess = - agentPlatform.createAgentProcess(agent, ProcessOptions.DEFAULT, bindings); - - // Tick to completion with defensive limits so a stuck agent process cannot - // block external event handling indefinitely. - final long maxTicks = 10_000L; - final long timeoutNanos = java.util.concurrent.TimeUnit.SECONDS.toNanos(30); - final long deadlineNanos = System.nanoTime() + timeoutNanos; - long tickCount = 0L; + Agent resolvedAgent = resolveAgentByName(agentPlatform, aggregateName); + logger.debug("Resolved agent '{}' for aggregate '{}'", + resolvedAgent.getName(), aggregateName); + AgentProcess agentProcess = agentPlatform.createAgentProcess( + resolvedAgent, ProcessOptions.DEFAULT, bindings); + tickToCompletion(agentProcess); - while (!agentProcess.getFinished() - && tickCount < maxTicks - && System.nanoTime() < deadlineNanos) { - agentProcess.tick(); - tickCount++; - } - - if (!agentProcess.getFinished()) { - logger.error( - "Agent process did not finish within safety limits for event {} on aggregate {}. " + - "aggregateId={}, tickCount={}, maxTicks={}, timeoutSeconds={}, status={}", - eventType.typeName(), - aggregate.getClass().getSimpleName(), - state.getAggregateId(), - tickCount, - maxTicks, - java.util.concurrent.TimeUnit.NANOSECONDS.toSeconds(timeoutNanos), - agentProcess.getStatus()); - throw new IllegalStateException( - "Agent process exceeded execution limits for event " + eventType.typeName() - + " on aggregate " + aggregate.getClass().getSimpleName()); - } logger.debug("Agent process completed with status {} for event {} on aggregate {}", - agentProcess.getStatus(), eventType.typeName(), aggregate.getClass().getSimpleName()); + agentProcess.getStatus(), eventType.typeName(), aggregateName); return (Stream) AgentProcessResultTranslator .collectEvents(agentProcess.getBlackboard(), getAllRegisteredEventTypes()) @@ -234,4 +249,62 @@ private List> getAllRegisteredEventTypes() { all.addAll(errorEventTypes); return all; } + + /** + * Ticks an {@link AgentProcess} to completion with defensive limits so a stuck agent + * process cannot block event handling indefinitely. + * + * @param agentProcess the agent process to drive to completion + * @throws IllegalStateException if the process does not finish within the safety limits + */ + private void tickToCompletion(AgentProcess agentProcess) { + AgenticProcessTickHelper.tickToCompletion( + agentProcess, + logger, + "event", + eventType.typeName(), + aggregateName); + } +} + +final class AgenticProcessTickHelper { + + private static final long MAX_TICKS = 10_000L; + private static final long TIMEOUT_NANOS = java.util.concurrent.TimeUnit.SECONDS.toNanos(30); + + private AgenticProcessTickHelper() { + } + + static void tickToCompletion( + AgentProcess agentProcess, + Logger logger, + String handledTypeLabel, + String handledTypeName, + String aggregateName) { + final long deadlineNanos = System.nanoTime() + TIMEOUT_NANOS; + long tickCount = 0L; + + while (!agentProcess.getFinished() + && tickCount < MAX_TICKS + && System.nanoTime() < deadlineNanos) { + agentProcess.tick(); + tickCount++; + } + + if (!agentProcess.getFinished()) { + logger.error( + "Agent process did not finish within safety limits for {} {} on aggregate {}. " + + "tickCount={}, maxTicks={}, timeoutSeconds={}, status={}", + handledTypeLabel, + handledTypeName, + aggregateName, + tickCount, + MAX_TICKS, + java.util.concurrent.TimeUnit.NANOSECONDS.toSeconds(TIMEOUT_NANOS), + agentProcess.getStatus()); + throw new IllegalStateException( + "Agent process exceeded execution limits for " + handledTypeLabel + " " + handledTypeName + + " on aggregate " + aggregateName); + } + } } diff --git a/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapterTest.java b/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapterTest.java new file mode 100644 index 00000000..33e6dd76 --- /dev/null +++ b/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/AgenticCommandHandlerFunctionAdapterTest.java @@ -0,0 +1,303 @@ +/* + * Copyright 2022 - 2026 The Original Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.elasticsoftware.akces.agentic.runtime; + +import com.embabel.agent.core.Agent; +import com.embabel.agent.core.AgentPlatform; +import com.embabel.agent.core.AgentProcess; +import com.embabel.agent.core.Blackboard; +import com.embabel.agent.core.ProcessOptions; +import org.elasticsoftware.akces.agentic.embabel.DefaultAgent; +import org.elasticsoftware.akces.aggregate.*; +import org.elasticsoftware.akces.annotations.CommandInfo; +import org.elasticsoftware.akces.annotations.DomainEventInfo; +import org.elasticsoftware.akces.commands.Command; +import org.elasticsoftware.akces.events.DomainEvent; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.*; + +/** + * Unit tests for {@link AgenticCommandHandlerFunctionAdapter}, verifying the runtime + * agent resolution by aggregate name and the default agent fallback when no matching + * agent is found. + */ +@ExtendWith(MockitoExtension.class) +class AgenticCommandHandlerFunctionAdapterTest { + + @CommandInfo(type = "TestCommand", version = 1) + record TestCommand(String id) implements Command { + @Override + public String getAggregateId() { + return id; + } + } + + @DomainEventInfo(type = "TestEvent", version = 1) + record TestEvent(String id) implements DomainEvent { + @Override + public String getAggregateId() { + return id; + } + } + + record TestState(String id) implements AggregateState { + @Override + public String getAggregateId() { + return id; + } + } + + @Mock + private AgenticAggregate aggregate; + + @Mock + private AgentPlatform agentPlatform; + + @Mock + private AgentProcess agentProcess; + + @Mock + private Blackboard blackboard; + + private final CommandType commandType = + new CommandType<>("TestCommand", 1, TestCommand.class, false, false, false); + + @BeforeEach + void setUp() { + lenient().when(agentProcess.getBlackboard()).thenReturn(blackboard); + lenient().when(blackboard.get(any(String.class))).thenReturn(null); + } + + // ------------------------------------------------------------------------- + // Agent resolution by exact name + // ------------------------------------------------------------------------- + + @Test + void shouldResolveAgentByExactNameMatch() { + Agent agent = mock(Agent.class); + when(agent.getName()).thenReturn("TestAggregate"); + when(agentPlatform.agents()).thenReturn(List.of(agent)); + when(agentPlatform.createAgentProcess(eq(agent), eq(ProcessOptions.DEFAULT), any(Map.class))) + .thenReturn(agentProcess); + when(agentProcess.getFinished()).thenReturn(true); + when(agentProcess.getStatus()).thenReturn(null); + + var adapter = new AgenticCommandHandlerFunctionAdapter<>( + aggregate, "TestAggregate", commandType, agentPlatform, + List.of(), List.of(), Collections::emptyList); + + Stream result = adapter.apply(new TestCommand("agg-1"), new TestState("agg-1")); + + assertThat(result).isEmpty(); + verify(agentPlatform).createAgentProcess(eq(agent), eq(ProcessOptions.DEFAULT), any(Map.class)); + } + + // ------------------------------------------------------------------------- + // Agent resolution by suffix name + // ------------------------------------------------------------------------- + + @Test + void shouldResolveAgentBySuffixNameMatch() { + Agent agent = mock(Agent.class); + when(agent.getName()).thenReturn("TestAggregateAgent"); + when(agentPlatform.agents()).thenReturn(List.of(agent)); + when(agentPlatform.createAgentProcess(eq(agent), eq(ProcessOptions.DEFAULT), any(Map.class))) + .thenReturn(agentProcess); + when(agentProcess.getFinished()).thenReturn(true); + when(agentProcess.getStatus()).thenReturn(null); + + var adapter = new AgenticCommandHandlerFunctionAdapter<>( + aggregate, "TestAggregate", commandType, agentPlatform, + List.of(), List.of(), Collections::emptyList); + + Stream result = adapter.apply(new TestCommand("agg-1"), new TestState("agg-1")); + + assertThat(result).isEmpty(); + verify(agentPlatform).createAgentProcess(eq(agent), eq(ProcessOptions.DEFAULT), any(Map.class)); + } + + // ------------------------------------------------------------------------- + // Default agent fallback + // ------------------------------------------------------------------------- + + @Test + void shouldUseDefaultAgentWhenNoNameMatch() { + Agent defaultAgent = mock(Agent.class); + when(defaultAgent.getName()).thenReturn(DefaultAgent.AGENT_NAME); + when(agentPlatform.agents()).thenReturn(List.of(defaultAgent)); + when(agentPlatform.createAgentProcess(eq(defaultAgent), eq(ProcessOptions.DEFAULT), any(Map.class))) + .thenReturn(agentProcess); + when(agentProcess.getFinished()).thenReturn(true); + when(agentProcess.getStatus()).thenReturn(null); + + var adapter = new AgenticCommandHandlerFunctionAdapter<>( + aggregate, "UnknownAggregate", commandType, agentPlatform, + List.of(), List.of(), Collections::emptyList); + + Stream result = adapter.apply(new TestCommand("agg-1"), new TestState("agg-1")); + + assertThat(result).isEmpty(); + verify(agentPlatform).createAgentProcess(eq(defaultAgent), eq(ProcessOptions.DEFAULT), any(Map.class)); + } + + @Test + void shouldThrowWhenNoDefaultAgentDeployed() { + Agent otherAgent = mock(Agent.class); + when(otherAgent.getName()).thenReturn("SomeOtherAgent"); + when(agentPlatform.agents()).thenReturn(List.of(otherAgent)); + + var adapter = new AgenticCommandHandlerFunctionAdapter<>( + aggregate, "UnknownAggregate", commandType, agentPlatform, + List.of(), List.of(), Collections::emptyList); + + assertThatThrownBy(() -> adapter.apply(new TestCommand("agg-1"), new TestState("agg-1"))) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("No DefaultAgent") + .hasMessageContaining("UnknownAggregate"); + } + + @Test + void shouldThrowWhenNoAgentsDeployed() { + when(agentPlatform.agents()).thenReturn(List.of()); + + var adapter = new AgenticCommandHandlerFunctionAdapter<>( + aggregate, "UnknownAggregate", commandType, agentPlatform, + List.of(), List.of(), Collections::emptyList); + + assertThatThrownBy(() -> adapter.apply(new TestCommand("agg-1"), new TestState("agg-1"))) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("No DefaultAgent") + .hasMessageContaining("UnknownAggregate"); + } + + // ------------------------------------------------------------------------- + // Exact match takes precedence over suffix match + // ------------------------------------------------------------------------- + + @Test + void shouldPreferExactMatchOverSuffixMatch() { + Agent exactAgent = mock(Agent.class); + when(exactAgent.getName()).thenReturn("MyAggregate"); + + Agent suffixAgent = mock(Agent.class); + lenient().when(suffixAgent.getName()).thenReturn("MyAggregateAgent"); + + when(agentPlatform.agents()).thenReturn(List.of(suffixAgent, exactAgent)); + when(agentPlatform.createAgentProcess(eq(exactAgent), eq(ProcessOptions.DEFAULT), any(Map.class))) + .thenReturn(agentProcess); + when(agentProcess.getFinished()).thenReturn(true); + when(agentProcess.getStatus()).thenReturn(null); + + var adapter = new AgenticCommandHandlerFunctionAdapter<>( + aggregate, "MyAggregate", commandType, agentPlatform, + List.of(), List.of(), Collections::emptyList); + + adapter.apply(new TestCommand("agg-1"), new TestState("agg-1")); + + verify(agentPlatform).createAgentProcess(eq(exactAgent), eq(ProcessOptions.DEFAULT), any(Map.class)); + } + + // ------------------------------------------------------------------------- + // resolveAgentByName static helper + // ------------------------------------------------------------------------- + + @Test + void resolveAgentByNameShouldThrowWhenNoAgentsDeployed() { + when(agentPlatform.agents()).thenReturn(List.of()); + + assertThatThrownBy(() -> + AgenticCommandHandlerFunctionAdapter.resolveAgentByName(agentPlatform, "Test")) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("No DefaultAgent"); + } + + @Test + void resolveAgentByNameShouldReturnDefaultAgentWhenNoNameMatches() { + Agent agent = mock(Agent.class); + when(agent.getName()).thenReturn(DefaultAgent.AGENT_NAME); + when(agentPlatform.agents()).thenReturn(List.of(agent)); + + assertThat(AgenticCommandHandlerFunctionAdapter.resolveAgentByName(agentPlatform, "Test")) + .isSameAs(agent); + } + + @Test + void resolveAgentByNameShouldFindExactMatch() { + Agent agent = mock(Agent.class); + when(agent.getName()).thenReturn("Wallet"); + when(agentPlatform.agents()).thenReturn(List.of(agent)); + + assertThat(AgenticCommandHandlerFunctionAdapter.resolveAgentByName(agentPlatform, "Wallet")) + .isSameAs(agent); + } + + @Test + void resolveAgentByNameShouldFindSuffixMatch() { + Agent agent = mock(Agent.class); + when(agent.getName()).thenReturn("WalletAgent"); + when(agentPlatform.agents()).thenReturn(List.of(agent)); + + assertThat(AgenticCommandHandlerFunctionAdapter.resolveAgentByName(agentPlatform, "Wallet")) + .isSameAs(agent); + } + + // ------------------------------------------------------------------------- + // Basic adapter properties + // ------------------------------------------------------------------------- + + @Test + void isCreateShouldAlwaysReturnFalse() { + var adapter = new AgenticCommandHandlerFunctionAdapter<>( + aggregate, "Test", commandType, agentPlatform, + List.of(), List.of(), Collections::emptyList); + + assertThat(adapter.isCreate()).isFalse(); + } + + @Test + void getCommandTypeShouldReturnConfiguredType() { + var adapter = new AgenticCommandHandlerFunctionAdapter<>( + aggregate, "Test", commandType, agentPlatform, + List.of(), List.of(), Collections::emptyList); + + assertThat(adapter.getCommandType()).isSameAs(commandType); + } + + @Test + void getAggregateShouldReturnConfiguredAggregate() { + var adapter = new AgenticCommandHandlerFunctionAdapter<>( + aggregate, "Test", commandType, agentPlatform, + List.of(), List.of(), Collections::emptyList); + + assertThat(adapter.getAggregate()).isSameAs(aggregate); + } +} diff --git a/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapterTest.java b/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapterTest.java new file mode 100644 index 00000000..4f177bf2 --- /dev/null +++ b/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/AgenticEventHandlerFunctionAdapterTest.java @@ -0,0 +1,227 @@ +/* + * Copyright 2022 - 2026 The Original Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.elasticsoftware.akces.agentic.runtime; + +import com.embabel.agent.core.Agent; +import com.embabel.agent.core.AgentPlatform; +import com.embabel.agent.core.AgentProcess; +import com.embabel.agent.core.Blackboard; +import com.embabel.agent.core.ProcessOptions; +import org.elasticsoftware.akces.agentic.embabel.DefaultAgent; +import org.elasticsoftware.akces.aggregate.*; +import org.elasticsoftware.akces.annotations.DomainEventInfo; +import org.elasticsoftware.akces.events.DomainEvent; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.*; + +/** + * Unit tests for {@link AgenticEventHandlerFunctionAdapter}, verifying the runtime + * agent resolution by aggregate name and the default agent fallback when no matching + * agent is found. + */ +@ExtendWith(MockitoExtension.class) +class AgenticEventHandlerFunctionAdapterTest { + + @DomainEventInfo(type = "ExternalEvent", version = 1) + record TestExternalEvent(String id) implements DomainEvent { + @Override + public String getAggregateId() { + return id; + } + } + + record TestState(String id) implements AggregateState { + @Override + public String getAggregateId() { + return id; + } + } + + @Mock + private AgenticAggregate aggregate; + + @Mock + private AgentPlatform agentPlatform; + + @Mock + private AgentProcess agentProcess; + + @Mock + private Blackboard blackboard; + + private final DomainEventType eventType = + new DomainEventType<>("ExternalEvent", 1, TestExternalEvent.class, false, true, false, false); + + @BeforeEach + void setUp() { + lenient().when(agentProcess.getBlackboard()).thenReturn(blackboard); + lenient().when(blackboard.get(any(String.class))).thenReturn(null); + } + + // ------------------------------------------------------------------------- + // Agent resolution by exact name + // ------------------------------------------------------------------------- + + @Test + void shouldResolveAgentByExactNameMatch() { + Agent agent = mock(Agent.class); + when(agent.getName()).thenReturn("TestAggregate"); + when(agentPlatform.agents()).thenReturn(List.of(agent)); + when(agentPlatform.createAgentProcess(eq(agent), eq(ProcessOptions.DEFAULT), any(Map.class))) + .thenReturn(agentProcess); + when(agentProcess.getFinished()).thenReturn(true); + when(agentProcess.getStatus()).thenReturn(null); + + var adapter = new AgenticEventHandlerFunctionAdapter<>( + aggregate, "TestAggregate", eventType, agentPlatform, + List.of(), List.of(), Collections::emptyList); + + Stream result = adapter.apply( + new TestExternalEvent("agg-1"), new TestState("agg-1")); + + assertThat(result).isEmpty(); + verify(agentPlatform).createAgentProcess(eq(agent), eq(ProcessOptions.DEFAULT), any(Map.class)); + } + + // ------------------------------------------------------------------------- + // Agent resolution by suffix name + // ------------------------------------------------------------------------- + + @Test + void shouldResolveAgentBySuffixNameMatch() { + Agent agent = mock(Agent.class); + when(agent.getName()).thenReturn("TestAggregateAgent"); + when(agentPlatform.agents()).thenReturn(List.of(agent)); + when(agentPlatform.createAgentProcess(eq(agent), eq(ProcessOptions.DEFAULT), any(Map.class))) + .thenReturn(agentProcess); + when(agentProcess.getFinished()).thenReturn(true); + when(agentProcess.getStatus()).thenReturn(null); + + var adapter = new AgenticEventHandlerFunctionAdapter<>( + aggregate, "TestAggregate", eventType, agentPlatform, + List.of(), List.of(), Collections::emptyList); + + Stream result = adapter.apply( + new TestExternalEvent("agg-1"), new TestState("agg-1")); + + assertThat(result).isEmpty(); + verify(agentPlatform).createAgentProcess(eq(agent), eq(ProcessOptions.DEFAULT), any(Map.class)); + } + + // ------------------------------------------------------------------------- + // Default agent fallback + // ------------------------------------------------------------------------- + + @Test + void shouldUseDefaultAgentWhenNoNameMatch() { + Agent defaultAgent = mock(Agent.class); + when(defaultAgent.getName()).thenReturn(DefaultAgent.AGENT_NAME); + when(agentPlatform.agents()).thenReturn(List.of(defaultAgent)); + when(agentPlatform.createAgentProcess(eq(defaultAgent), eq(ProcessOptions.DEFAULT), any(Map.class))) + .thenReturn(agentProcess); + when(agentProcess.getFinished()).thenReturn(true); + when(agentProcess.getStatus()).thenReturn(null); + + var adapter = new AgenticEventHandlerFunctionAdapter<>( + aggregate, "UnknownAggregate", eventType, agentPlatform, + List.of(), List.of(), Collections::emptyList); + + Stream result = adapter.apply( + new TestExternalEvent("agg-1"), new TestState("agg-1")); + + assertThat(result).isEmpty(); + verify(agentPlatform).createAgentProcess(eq(defaultAgent), eq(ProcessOptions.DEFAULT), any(Map.class)); + } + + @Test + void shouldThrowWhenNoDefaultAgentDeployed() { + Agent otherAgent = mock(Agent.class); + when(otherAgent.getName()).thenReturn("SomeOtherAgent"); + when(agentPlatform.agents()).thenReturn(List.of(otherAgent)); + + var adapter = new AgenticEventHandlerFunctionAdapter<>( + aggregate, "UnknownAggregate", eventType, agentPlatform, + List.of(), List.of(), Collections::emptyList); + + assertThatThrownBy(() -> adapter.apply( + new TestExternalEvent("agg-1"), new TestState("agg-1"))) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("No DefaultAgent") + .hasMessageContaining("UnknownAggregate"); + } + + @Test + void shouldThrowWhenNoAgentsDeployed() { + when(agentPlatform.agents()).thenReturn(List.of()); + + var adapter = new AgenticEventHandlerFunctionAdapter<>( + aggregate, "UnknownAggregate", eventType, agentPlatform, + List.of(), List.of(), Collections::emptyList); + + assertThatThrownBy(() -> adapter.apply( + new TestExternalEvent("agg-1"), new TestState("agg-1"))) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("No DefaultAgent") + .hasMessageContaining("UnknownAggregate"); + } + + // ------------------------------------------------------------------------- + // Basic adapter properties + // ------------------------------------------------------------------------- + + @Test + void isCreateShouldAlwaysReturnFalse() { + var adapter = new AgenticEventHandlerFunctionAdapter<>( + aggregate, "Test", eventType, agentPlatform, + List.of(), List.of(), Collections::emptyList); + + assertThat(adapter.isCreate()).isFalse(); + } + + @Test + void getEventTypeShouldReturnConfiguredType() { + var adapter = new AgenticEventHandlerFunctionAdapter<>( + aggregate, "Test", eventType, agentPlatform, + List.of(), List.of(), Collections::emptyList); + + assertThat(adapter.getEventType()).isSameAs(eventType); + } + + @Test + void getAggregateShouldReturnConfiguredAggregate() { + var adapter = new AgenticEventHandlerFunctionAdapter<>( + aggregate, "Test", eventType, agentPlatform, + List.of(), List.of(), Collections::emptyList); + + assertThat(adapter.getAggregate()).isSameAs(aggregate); + } +} diff --git a/plans/agent-resolution-refactoring.md b/plans/agent-resolution-refactoring.md new file mode 100644 index 00000000..bae40cad --- /dev/null +++ b/plans/agent-resolution-refactoring.md @@ -0,0 +1,89 @@ +# Agent Resolution Refactoring Plan + +## Summary + +Refactor `AgenticCommandHandlerFunctionAdapter` and `AgenticEventHandlerFunctionAdapter` to not +inject the `Agent` instance directly. Instead, infer the agent at runtime based on the +`AgenticAggregate` name by searching `agentPlatform.agents()`. If no matching agent is found, +fall back to using `Autonomy` via `agentPlatform.getPlatformServices().autonomy()` to let Embabel +decide on the best solution. + +## Motivation + +The current approach requires a 1:1 Spring bean naming convention (`{aggregateName}Agent`) and +fails fast at startup if the bean is missing. The new approach: + +1. **Decouples** agent registration from the Spring bean lifecycle — agents deployed to the + `AgentPlatform` at any time will be discovered automatically. +2. **Enables fallback** via Embabel's `Autonomy` when no agent is explicitly named after the + aggregate, letting the platform choose the best agent/goal combination. +3. **Simplifies** the `AgenticAggregateRuntimeFactory` by removing eager agent resolution. + +## Phase 1: Adapter Refactoring + +### 1.1 AgenticCommandHandlerFunctionAdapter + +- Replace the `Agent agent` constructor parameter and field with `String aggregateName`. +- In `apply()`, resolve the agent lazily: + 1. Search `agentPlatform.agents()` for an `Agent` whose `getName()` matches the aggregate name + (exact match or `{aggregateName}Agent` suffix match). + 2. If found → use existing `agentPlatform.createAgentProcess(agent, ProcessOptions.DEFAULT, bindings)` + with the manual tick loop. + 3. If not found → fall back to `agentPlatform.getPlatformServices().autonomy() + .chooseAndAccomplishGoal(GoalChoiceApprover.Companion.getAPPROVE_ALL(), agentPlatform, bindings)`. + Extract the `AgentProcess` from the returned `AgentProcessExecution`. +- Collect events from the blackboard identically in both paths. + +### 1.2 AgenticEventHandlerFunctionAdapter + +- Same changes as 1.1 but for the event handling path. +- The `event` binding (instead of `command`) is placed on the blackboard. + +### 1.3 Agent Resolution Helper + +- Extract the agent-name-matching logic into a private helper method shared by both adapters + (or a static utility) to avoid duplication. The method signature: + ```java + Optional resolveAgentByName(AgentPlatform platform, String aggregateName) + ``` + Matching rules: + 1. Exact match: `agent.getName().equals(aggregateName)` + 2. Agent-suffix match: `agent.getName().equals(aggregateName + "Agent")` + +## Phase 2: Factory Cleanup + +### 2.1 AgenticAggregateRuntimeFactory + +- Remove the `resolveAgent(String aggregateName)` method. +- Remove the `Agent agent` local variable and the lazy resolution logic in `createRuntime()`. +- Update `processAgentHandledCommand()` and `processAgentHandledEvent()` to pass the aggregate + name (String) instead of an `Agent` to the adapter constructors. + +## Phase 3: Testing + +### 3.1 Unit Tests + +- Add tests for `AgenticCommandHandlerFunctionAdapter`: + - Agent found by exact name match → process created via `createAgentProcess`. + - Agent found by suffix match → process created via `createAgentProcess`. + - No agent found → Autonomy fallback invoked. +- Add tests for `AgenticEventHandlerFunctionAdapter`: + - Same three scenarios. +- Verify that the Autonomy fallback correctly collects events from the resulting blackboard. + +### 3.2 Existing Test Validation + +- Run the full agentic module test suite (`mvn test -pl main/agentic`) to ensure no regressions. + +## Dependencies + +- Embabel `embabel-agent-api` 0.3.5 — `Autonomy`, `GoalChoiceApprover`, `AgentProcessExecution`. +- No new library dependencies required. + +## Risk Assessment + +- **Low risk**: The matching logic is straightforward and well-tested. +- **Medium risk**: The `Autonomy.chooseAndAccomplishGoal()` fallback runs the process to completion + synchronously. If Embabel's internal timeout/budget is not aligned with Kafka consumer polling + intervals, the consumer may be considered dead. Mitigation: use `ProcessOptions` with an + appropriate budget in a future iteration.