diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/AgenticAggregateRuntime.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/AgenticAggregateRuntime.java index 97149be8..7bf0f636 100644 --- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/AgenticAggregateRuntime.java +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/AgenticAggregateRuntime.java @@ -18,10 +18,13 @@ package org.elasticsoftware.akces.agentic; import com.embabel.agent.core.AgentPlatform; +import org.elasticsoftware.akces.agentic.commands.AssignTaskCommand; +import org.elasticsoftware.akces.agentic.events.AgentTaskAssignedEvent; import org.elasticsoftware.akces.agentic.events.MemoryRevokedEvent; import org.elasticsoftware.akces.agentic.events.MemoryStoredEvent; import org.elasticsoftware.akces.aggregate.AgenticAggregateMemory; import org.elasticsoftware.akces.aggregate.AggregateRuntime; +import org.elasticsoftware.akces.aggregate.CommandType; import org.elasticsoftware.akces.aggregate.DomainEventType; import org.elasticsoftware.akces.aggregate.MemoryAwareState; import org.elasticsoftware.akces.protocol.AggregateStateRecord; @@ -47,12 +50,23 @@ * */ public interface AgenticAggregateRuntime extends AggregateRuntime { + + /** Built-in domain event type for {@link MemoryStoredEvent}. */ DomainEventType MEMORY_STORED_TYPE = new DomainEventType<>( "MemoryStored", 1, MemoryStoredEvent.class, false, false, false, false); + /** Built-in domain event type for {@link MemoryRevokedEvent}. */ DomainEventType MEMORY_REVOKED_TYPE = new DomainEventType<>( "MemoryRevoked", 1, MemoryRevokedEvent.class, false, false, false, false); + /** Built-in command type for {@link AssignTaskCommand}. */ + CommandType ASSIGN_TASK_COMMAND_TYPE = new CommandType<>( + "AssignTask", 1, AssignTaskCommand.class, false, false, false); + + /** Built-in domain event type for {@link AgentTaskAssignedEvent}. */ + DomainEventType AGENT_TASK_ASSIGNED_TYPE = new DomainEventType<>( + "AgentTaskAssigned", 1, AgentTaskAssignedEvent.class, false, false, false, false); + /** * Returns the Embabel {@link AgentPlatform} used to create and run agent processes. * diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java index d3a90693..d990d0f9 100644 --- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateRuntimeFactory.java @@ -45,6 +45,8 @@ import java.util.Collections; import java.util.List; +import static org.elasticsoftware.akces.agentic.AgenticAggregateRuntime.AGENT_TASK_ASSIGNED_TYPE; +import static org.elasticsoftware.akces.agentic.AgenticAggregateRuntime.ASSIGN_TASK_COMMAND_TYPE; import static org.elasticsoftware.akces.agentic.AgenticAggregateRuntime.MEMORY_REVOKED_TYPE; import static org.elasticsoftware.akces.agentic.AgenticAggregateRuntime.MEMORY_STORED_TYPE; @@ -267,15 +269,26 @@ private KafkaAggregateRuntime createRuntime(AgenticAggregateInfo agenticInfo, } }); - // Register built-in event-sourcing handlers for memory management. - // These handle the framework-owned MemoryStored and MemoryRevoked events, - // which every AgenticAggregate state must process via MemoryAwareState. + // Register built-in event-sourcing handlers for memory management and task assignment. + // These handle the framework-owned MemoryStored, MemoryRevoked, and AgentTaskAssigned + // events, using a unified single-dispatch handler. runtimeBuilder - .addEventSourcingHandler(MEMORY_STORED_TYPE, KafkaAgenticAggregateRuntime::handleMemoryEvent) + .addEventSourcingHandler(MEMORY_STORED_TYPE, KafkaAgenticAggregateRuntime::handleBuiltInEvent) .addDomainEvent(MEMORY_STORED_TYPE); runtimeBuilder - .addEventSourcingHandler(MEMORY_REVOKED_TYPE, KafkaAgenticAggregateRuntime::handleMemoryEvent) + .addEventSourcingHandler(MEMORY_REVOKED_TYPE, KafkaAgenticAggregateRuntime::handleBuiltInEvent) .addDomainEvent(MEMORY_REVOKED_TYPE); + runtimeBuilder + .addEventSourcingHandler(AGENT_TASK_ASSIGNED_TYPE, KafkaAgenticAggregateRuntime::handleBuiltInEvent) + .addDomainEvent(AGENT_TASK_ASSIGNED_TYPE); + + // Register built-in AssignTask command handler using a single-dispatch handler on + // KafkaAgenticAggregateRuntime. The handler resolves the agent from the platform, + // creates an AgentProcess, and emits AgentTaskAssignedEvent. + runtimeBuilder + .addCommandHandler(ASSIGN_TASK_COMMAND_TYPE, + KafkaAgenticAggregateRuntime.builtInCommandHandler(agentPlatform, agenticInfo.value())) + .addCommand(ASSIGN_TASK_COMMAND_TYPE); // Collect agent-produced error types for registration and inclusion in adapters. List> agentProducedErrorTypes = diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/commands/AssignTaskCommand.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/commands/AssignTaskCommand.java new file mode 100644 index 00000000..32a8f4d4 --- /dev/null +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/commands/AssignTaskCommand.java @@ -0,0 +1,65 @@ +/* + * Copyright 2022 - 2026 The Original Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.elasticsoftware.akces.agentic.commands; + +import jakarta.annotation.Nonnull; +import jakarta.validation.constraints.NotNull; +import org.elasticsoftware.akces.aggregate.RequestingParty; +import org.elasticsoftware.akces.annotations.AggregateIdentifier; +import org.elasticsoftware.akces.annotations.CommandInfo; +import org.elasticsoftware.akces.commands.Command; + +import java.util.Map; + +/** + * Built-in framework command that assigns a task to an + * {@link org.elasticsoftware.akces.aggregate.AgenticAggregate} for AI-assisted processing. + * + *

When processed by the built-in command handler, this command triggers the creation + * of an Embabel {@code AgentProcess} and emits an + * {@link org.elasticsoftware.akces.agentic.events.AgentTaskAssignedEvent} containing the + * process identifier. + * + *

This is the first framework-owned {@link Command} implementation. Unlike + * application-defined commands, it is automatically registered for every + * {@link org.elasticsoftware.akces.aggregate.AgenticAggregate}. + * + * @param agenticAggregateId the identifier of the target agentic aggregate + * @param taskDescription a human-readable description of what the agent should do + * @param requestingParty the entity (agent or human) requesting this task + * @param taskMetadata optional key-value metadata for additional context + * (e.g. correlationId, priority, deadline); may be {@code null} + */ +@CommandInfo(type = "AssignTask", version = 1, + description = "Assigns a task to an agentic aggregate for AI-assisted processing") +public record AssignTaskCommand( + @AggregateIdentifier @NotNull String agenticAggregateId, + @NotNull String taskDescription, + @NotNull RequestingParty requestingParty, + Map taskMetadata +) implements Command { + + /** + * {@inheritDoc} + */ + @Override + @Nonnull + public String getAggregateId() { + return agenticAggregateId; + } +} diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/events/AgentTaskAssignedEvent.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/events/AgentTaskAssignedEvent.java new file mode 100644 index 00000000..e7e12b33 --- /dev/null +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/events/AgentTaskAssignedEvent.java @@ -0,0 +1,71 @@ +/* + * Copyright 2022 - 2026 The Original Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.elasticsoftware.akces.agentic.events; + +import jakarta.annotation.Nonnull; +import jakarta.validation.constraints.NotNull; +import org.elasticsoftware.akces.aggregate.RequestingParty; +import org.elasticsoftware.akces.annotations.AggregateIdentifier; +import org.elasticsoftware.akces.annotations.DomainEventInfo; +import org.elasticsoftware.akces.events.DomainEvent; + +import java.time.Instant; +import java.util.Map; + +/** + * Domain event emitted when a task has been successfully assigned to an + * {@link org.elasticsoftware.akces.aggregate.AgenticAggregate} and an Embabel + * {@code AgentProcess} has been created. + * + *

This event is produced by the built-in command handler when processing an + * {@link org.elasticsoftware.akces.agentic.commands.AssignTaskCommand}. It contains + * the Embabel {@code AgentProcess} identifier, which links the Akces domain to the + * Embabel runtime. + * + *

The built-in event-sourcing handler processes this event by updating the aggregate + * state's {@link org.elasticsoftware.akces.aggregate.TaskAwareState} with a new + * {@link org.elasticsoftware.akces.aggregate.AssignedTask}. + * + * @param agenticAggregateId the identifier of the agentic aggregate + * @param agentProcessId the Embabel {@code AgentProcess.getId()} value + * @param taskDescription the task description from the originating command + * @param requestingParty the entity that requested the task assignment + * @param taskMetadata optional key-value metadata from the originating command; + * may be {@code null} + * @param assignedAt the instant at which the task was assigned + */ +@DomainEventInfo(type = "AgentTaskAssigned", version = 1, + description = "Emitted when a task has been assigned to an agentic aggregate and an AgentProcess has been created") +public record AgentTaskAssignedEvent( + @AggregateIdentifier String agenticAggregateId, + @NotNull String agentProcessId, + @NotNull String taskDescription, + @NotNull RequestingParty requestingParty, + Map taskMetadata, + @NotNull Instant assignedAt +) implements DomainEvent { + + /** + * {@inheritDoc} + */ + @Override + @Nonnull + public String getAggregateId() { + return agenticAggregateId; + } +} diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AkcesAgenticAggregateController.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AkcesAgenticAggregateController.java index ca928674..9f172663 100644 --- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AkcesAgenticAggregateController.java +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AkcesAgenticAggregateController.java @@ -122,7 +122,14 @@ public class AkcesAgenticAggregateController extends Thread @SuppressWarnings("unchecked") private static final List> BUILTIN_EVENT_TYPES = List.of( AgenticAggregateRuntime.MEMORY_STORED_TYPE, - AgenticAggregateRuntime.MEMORY_REVOKED_TYPE + AgenticAggregateRuntime.MEMORY_REVOKED_TYPE, + AgenticAggregateRuntime.AGENT_TASK_ASSIGNED_TYPE + ); + + /** Built-in command types provided by the agentic framework. */ + @SuppressWarnings("unchecked") + private static final List> BUILTIN_COMMAND_TYPES = List.of( + AgenticAggregateRuntime.ASSIGN_TASK_COMMAND_TYPE ); private final ConsumerFactory schemaConsumerFactory; @@ -153,6 +160,10 @@ public class AkcesAgenticAggregateController extends Thread * subclasses or future extensions that may need environment properties. */ private Environment environment; + /** Whether to force-register built-in schemas when they are incompatible with existing ones. + * Configured via the {@code akces.aggregate.schemas.forceRegister} environment property. */ + private boolean forceRegisterOnIncompatible = false; + /** * Creates a new {@code AkcesAgenticAggregateController}. * @@ -268,10 +279,16 @@ public void run() { } /** - * Registers the built-in {@link org.elasticsoftware.akces.agentic.events.MemoryStoredEvent} - * and {@link org.elasticsoftware.akces.agentic.events.MemoryRevokedEvent} schemas with the - * schema registry. These events are produced internally by the Embabel layer when the - * agent stores or revokes memories. + * Registers the built-in agentic schemas with the schema registry: the memory events + * ({@link org.elasticsoftware.akces.agentic.events.MemoryStoredEvent}, + * {@link org.elasticsoftware.akces.agentic.events.MemoryRevokedEvent}), + * the {@link org.elasticsoftware.akces.agentic.events.AgentTaskAssignedEvent}, and + * the {@link org.elasticsoftware.akces.agentic.commands.AssignTaskCommand}. + * + *

If a schema is incompatible with the existing registered schema, the behaviour + * depends on the {@code akces.aggregate.schemas.forceRegister} environment property: + * when {@code true}, the schema is force-registered; otherwise the incompatibility is + * propagated as a fatal error. */ private void registerBuiltinSchemas() { logger.info("Registering built-in agentic schemas for {}Aggregate", @@ -280,21 +297,50 @@ private void registerBuiltinSchemas() { try { aggregateRuntime.registerAndValidate(eventType, schemaRegistry); } catch (IncompatibleSchemaException e) { - logger.warn("Built-in event schema {} is incompatible — attempting force-register", - eventType.typeName(), e); - try { - aggregateRuntime.registerAndValidate(eventType, schemaRegistry, true); - } catch (Exception ex) { - logger.error("Failed to force-register built-in event schema {}", - eventType.typeName(), ex); - throw new RuntimeException("Failed to register built-in event schema: " - + eventType.typeName(), ex); + if (forceRegisterOnIncompatible) { + logger.warn("Built-in event schema {} is incompatible — force-registering (forceRegister=true)", + eventType.typeName(), e); + try { + aggregateRuntime.registerAndValidate(eventType, schemaRegistry, true); + } catch (Exception ex) { + logger.error("Failed to force-register built-in event schema {}", + eventType.typeName(), ex); + throw new RuntimeException("Failed to register built-in event schema: " + + eventType.typeName(), ex); + } + } else { + throw new RuntimeException("Built-in event schema " + eventType.typeName() + + " is incompatible. Set akces.aggregate.schemas.forceRegister=true to override.", e); } } catch (Exception e) { throw new RuntimeException("Failed to register built-in event schema: " + eventType.typeName(), e); } } + for (CommandType commandType : BUILTIN_COMMAND_TYPES) { + try { + aggregateRuntime.registerAndValidate(commandType, schemaRegistry); + } catch (IncompatibleSchemaException e) { + if (forceRegisterOnIncompatible) { + logger.warn("Built-in command schema {} is incompatible — force-registering (forceRegister=true)", + commandType.typeName(), e); + try { + aggregateRuntime.registerAndValidate(commandType, schemaRegistry, true); + } catch (Exception ex) { + logger.error("Failed to force-register built-in command schema {}", + commandType.typeName(), ex); + throw new RuntimeException("Failed to register built-in command schema: " + + commandType.typeName(), ex); + } + } else { + throw new RuntimeException("Built-in command schema " + commandType.typeName() + + " is incompatible. Set akces.aggregate.schemas.forceRegister=true to override.", e); + } + } catch (Exception e) { + throw new RuntimeException("Failed to register built-in command schema: " + + commandType.typeName(), e); + } + } } /** @@ -521,6 +567,7 @@ public void setApplicationContext(ApplicationContext ctx) { @Override public void setEnvironment(Environment env) { this.environment = env; + this.forceRegisterOnIncompatible = env.getProperty("akces.aggregate.schemas.forceRegister", Boolean.class, false); } // ------------------------------------------------------------------------- diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntime.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntime.java index 4b627d00..1b05fbbf 100644 --- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntime.java +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/KafkaAgenticAggregateRuntime.java @@ -17,10 +17,15 @@ package org.elasticsoftware.akces.agentic.runtime; +import com.embabel.agent.core.Agent; import com.embabel.agent.core.AgentPlatform; +import com.embabel.agent.core.AgentProcess; +import com.embabel.agent.core.ProcessOptions; import jakarta.annotation.Nullable; import org.apache.kafka.common.errors.SerializationException; import org.elasticsoftware.akces.agentic.AgenticAggregateRuntime; +import org.elasticsoftware.akces.agentic.commands.AssignTaskCommand; +import org.elasticsoftware.akces.agentic.events.AgentTaskAssignedEvent; import org.elasticsoftware.akces.agentic.events.MemoryRevokedEvent; import org.elasticsoftware.akces.agentic.events.MemoryStoredEvent; import org.elasticsoftware.akces.events.DomainEvent; @@ -33,15 +38,21 @@ import org.elasticsoftware.akces.protocol.ProtocolRecord; import org.elasticsoftware.akces.schemas.SchemaException; import org.elasticsoftware.akces.schemas.SchemaRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import tools.jackson.databind.ObjectMapper; import java.io.IOException; +import java.time.Instant; import java.util.Collection; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Supplier; +import java.util.stream.Stream; /** * Kafka-backed implementation of {@link AgenticAggregateRuntime}. @@ -52,6 +63,9 @@ */ public class KafkaAgenticAggregateRuntime implements AgenticAggregateRuntime { + private static final Logger logger = + LoggerFactory.getLogger(KafkaAgenticAggregateRuntime.class); + private final AggregateRuntime delegate; private final ObjectMapper objectMapper; private final Class stateClass; @@ -229,7 +243,7 @@ public boolean shouldHandlePIIData() { } // ------------------------------------------------------------------------- - // Built-in EventSourcingHandler implementations for memory management + // Built-in EventSourcingHandler implementations // ------------------------------------------------------------------------- /** @@ -284,27 +298,154 @@ public static AggregateState onMemoryRevoked(MemoryRevokedEvent event, Aggregate } /** - * Single-dispatch event-sourcing handler that routes {@link MemoryStoredEvent} and - * {@link MemoryRevokedEvent} to the appropriate typed handler. + * Built-in event-sourcing handler for {@link AgentTaskAssignedEvent}. + * + *

Creates an {@link AssignedTask} from the event and appends it to the state's + * assigned tasks list. The state must implement {@link TaskAwareState}; otherwise an + * {@link IllegalStateException} is thrown. + * + * @param event the {@code AgentTaskAssignedEvent} to apply + * @param state the current aggregate state + * @return a new state instance with the assigned task added + * @throws IllegalStateException if {@code state} does not implement {@link TaskAwareState} + */ + @SuppressWarnings("unchecked") + public static AggregateState onAgentTaskAssigned(AgentTaskAssignedEvent event, AggregateState state) { + if (!(state instanceof TaskAwareState tas)) { + throw new IllegalStateException( + "Aggregate state " + state.getClass().getName() + + " does not implement TaskAwareState"); + } + AssignedTask task = new AssignedTask( + event.agentProcessId(), + event.taskDescription(), + event.requestingParty(), + event.taskMetadata(), + event.assignedAt()); + return (AggregateState) tas.withAssignedTask(task); + } + + /** + * Single-dispatch event-sourcing handler for all built-in agentic domain events. + * + *

Routes {@link MemoryStoredEvent}, {@link MemoryRevokedEvent}, and + * {@link AgentTaskAssignedEvent} to the appropriate typed handler. * *

Intended to be used as a method reference - * ({@code KafkaAgenticAggregateRuntime::handleMemoryEvent}) so that no anonymous adapter + * ({@code KafkaAgenticAggregateRuntime::handleBuiltInEvent}) so that no anonymous adapter * class is required at the registration site. * - * @param event the memory domain event to apply; must be a {@code MemoryStoredEvent} or - * {@code MemoryRevokedEvent} + * @param event the built-in domain event to apply * @param state the current aggregate state * @return the updated aggregate state - * @throws IllegalArgumentException if {@code event} is not a recognised memory event type + * @throws IllegalArgumentException if {@code event} is not a recognised built-in event type */ - public static AggregateState handleMemoryEvent(DomainEvent event, AggregateState state) { + public static AggregateState handleBuiltInEvent(DomainEvent event, AggregateState state) { if (event instanceof MemoryStoredEvent stored) { return onMemoryStored(stored, state); } else if (event instanceof MemoryRevokedEvent revoked) { return onMemoryRevoked(revoked, state); + } else if (event instanceof AgentTaskAssignedEvent assigned) { + return onAgentTaskAssigned(assigned, state); } else { throw new IllegalArgumentException( - "Unsupported memory event type: " + event.getClass().getName()); + "Unsupported built-in event type: " + event.getClass().getName()); + } + } + + // ------------------------------------------------------------------------- + // Built-in CommandHandler for task assignment + // ------------------------------------------------------------------------- + + /** + * Creates a {@link CommandHandlerFunction} for built-in agentic commands. + * + *

The returned function handles {@link AssignTaskCommand} by resolving the appropriate + * {@link Agent} from the {@link AgentPlatform}, creating an {@link AgentProcess}, + * and emitting an {@link AgentTaskAssignedEvent} with the process ID. + * + *

This is a factory method rather than a static method reference because it needs + * runtime access to the {@link AgentPlatform} and aggregate name for agent resolution. + * + * @param agentPlatform the Embabel platform used to create agent processes + * @param aggregateName the name of the aggregate (used for agent resolution) + * @return a command handler function for built-in agentic commands + */ + public static CommandHandlerFunction builtInCommandHandler( + AgentPlatform agentPlatform, String aggregateName) { + return (command, state) -> { + if (command instanceof AssignTaskCommand assignTask) { + return handleAssignTask(assignTask, state, agentPlatform, aggregateName); + } else { + throw new IllegalArgumentException( + "Unsupported built-in command type: " + command.getClass().getName()); + } + }; + } + + /** + * Handles the {@link AssignTaskCommand} by resolving the agent, creating an + * {@link AgentProcess}, and emitting an {@link AgentTaskAssignedEvent}. + */ + private static Stream handleAssignTask(AssignTaskCommand command, AggregateState state, + AgentPlatform agentPlatform, String aggregateName) { + logger.debug("Processing AssignTask command for aggregate {}, taskDescription='{}'", + aggregateName, command.taskDescription()); + + Map bindings = new LinkedHashMap<>(); + bindings.put("command", command); + bindings.put("state", state); + bindings.put("agenticAggregateId", command.agenticAggregateId()); + bindings.put("taskDescription", command.taskDescription()); + bindings.put("requestingParty", command.requestingParty()); + if (command.taskMetadata() != null) { + bindings.put("taskMetadata", command.taskMetadata()); + } + + Agent agent = resolveAgentByName(agentPlatform, aggregateName); + + AgentProcess agentProcess = + agentPlatform.createAgentProcess(agent, ProcessOptions.DEFAULT, bindings); + + String processId = agentProcess.getId(); + + logger.debug("Created AgentProcess with id={} for AssignTask on aggregate {}", + processId, aggregateName); + + AgentTaskAssignedEvent event = new AgentTaskAssignedEvent( + command.agenticAggregateId(), + processId, + command.taskDescription(), + command.requestingParty(), + command.taskMetadata(), + Instant.now()); + + return Stream.of(event); + } + + /** + * Resolves the {@link Agent} for the aggregate from the platform's registered agents. + * + *

Looks for an agent matching either the exact aggregate name or the + * {@code {aggregateName}Agent} convention. + * + * @param agentPlatform the platform containing registered agents + * @param aggregateName the aggregate name to resolve an agent for + * @return the resolved {@link Agent}; never {@code null} + * @throws IllegalStateException if no matching agent is found + */ + private static Agent resolveAgentByName(AgentPlatform agentPlatform, String aggregateName) { + String agentBeanName = aggregateName + "Agent"; + for (Agent candidate : agentPlatform.agents()) { + String candidateName = candidate.getName(); + if (aggregateName.equals(candidateName) || agentBeanName.equals(candidateName)) { + return candidate; + } } + throw new IllegalStateException( + "No Agent found with name '" + aggregateName + "' or '" + agentBeanName + + "' in the AgentPlatform for built-in command handling. " + + "The implementing application must provide an Agent named '" + + agentBeanName + "'."); } } diff --git a/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/AssignTaskCommandHandlerTest.java b/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/AssignTaskCommandHandlerTest.java new file mode 100644 index 00000000..44de1cfd --- /dev/null +++ b/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/AssignTaskCommandHandlerTest.java @@ -0,0 +1,173 @@ +/* + * Copyright 2022 - 2026 The Original Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.elasticsoftware.akces.agentic.runtime; + +import com.embabel.agent.core.Agent; +import com.embabel.agent.core.AgentPlatform; +import com.embabel.agent.core.AgentProcess; +import com.embabel.agent.core.ProcessOptions; +import org.elasticsoftware.akces.agentic.commands.AssignTaskCommand; +import org.elasticsoftware.akces.agentic.events.AgentTaskAssignedEvent; +import org.elasticsoftware.akces.aggregate.*; +import org.elasticsoftware.akces.commands.Command; +import org.elasticsoftware.akces.events.DomainEvent; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.*; + +/** + * Unit tests for {@link KafkaAgenticAggregateRuntime#builtInCommandHandler(AgentPlatform, String)}, + * verifying that it creates an Embabel {@link AgentProcess} and emits an + * {@link AgentTaskAssignedEvent} with the correct process ID. + */ +@ExtendWith(MockitoExtension.class) +class AssignTaskCommandHandlerTest { + + /** Simple test state implementing AggregateState. */ + record TestState(String id) implements AggregateState { + @Override + public String getAggregateId() { + return id; + } + } + + @Mock + private AgentPlatform agentPlatform; + + @Mock + private AgentProcess agentProcess; + + @Mock + private Agent agent; + + private CommandHandlerFunction handler; + + @BeforeEach + void setUp() { + handler = KafkaAgenticAggregateRuntime.builtInCommandHandler(agentPlatform, "TestAggregate"); + } + + private void setUpAgentResolution() { + when(agent.getName()).thenReturn("TestAggregateAgent"); + when(agentPlatform.agents()).thenReturn(List.of(agent)); + } + + @Test + void applyShouldCreateAgentProcessAndEmitEvent() { + setUpAgentResolution(); + var party = new HumanRequestingParty("user-1", "analyst"); + var command = new AssignTaskCommand("agg-1", "Analyze data", party, Map.of("key", "value")); + var state = new TestState("agg-1"); + + when(agentPlatform.createAgentProcess(eq(agent), eq(ProcessOptions.DEFAULT), any())) + .thenReturn(agentProcess); + when(agentProcess.getId()).thenReturn("embabel-proc-42"); + + Stream result = handler.apply(command, state); + List events = result.toList(); + + assertThat(events).hasSize(1); + assertThat(events.getFirst()).isInstanceOf(AgentTaskAssignedEvent.class); + + var event = (AgentTaskAssignedEvent) events.getFirst(); + assertThat(event.agenticAggregateId()).isEqualTo("agg-1"); + assertThat(event.agentProcessId()).isEqualTo("embabel-proc-42"); + assertThat(event.taskDescription()).isEqualTo("Analyze data"); + assertThat(event.requestingParty()).isEqualTo(party); + assertThat(event.taskMetadata()).isEqualTo(Map.of("key", "value")); + assertThat(event.assignedAt()).isNotNull(); + + verify(agentPlatform).createAgentProcess(eq(agent), eq(ProcessOptions.DEFAULT), any()); + verify(agentProcess).getId(); + } + + @Test + void applyShouldPropagateAgentRequestingParty() { + setUpAgentResolution(); + var party = new AgentRequestingParty("agent-99", "Orchestrator", "supervisor"); + var command = new AssignTaskCommand("agg-1", "Process task", party, null); + var state = new TestState("agg-1"); + + when(agentPlatform.createAgentProcess(eq(agent), eq(ProcessOptions.DEFAULT), any())) + .thenReturn(agentProcess); + when(agentProcess.getId()).thenReturn("proc-abc"); + + List events = handler.apply(command, state).toList(); + + var event = (AgentTaskAssignedEvent) events.getFirst(); + assertThat(event.requestingParty()).isInstanceOf(AgentRequestingParty.class); + assertThat(event.requestingParty().role()).isEqualTo("supervisor"); + } + + @Test + void applyShouldHandleNullMetadata() { + setUpAgentResolution(); + var party = new HumanRequestingParty("user-1", "admin"); + var command = new AssignTaskCommand("agg-1", "Simple task", party, null); + var state = new TestState("agg-1"); + + when(agentPlatform.createAgentProcess(eq(agent), eq(ProcessOptions.DEFAULT), any())) + .thenReturn(agentProcess); + when(agentProcess.getId()).thenReturn("proc-xyz"); + + List events = handler.apply(command, state).toList(); + + var event = (AgentTaskAssignedEvent) events.getFirst(); + assertThat(event.taskMetadata()).isNull(); + } + + @Test + void applyShouldThrowWhenNoAgentFound() { + when(agentPlatform.agents()).thenReturn(List.of()); + var noAgentHandler = KafkaAgenticAggregateRuntime.builtInCommandHandler(agentPlatform, "Unknown"); + var party = new HumanRequestingParty("user-1", "analyst"); + var command = new AssignTaskCommand("agg-1", "task", party, null); + var state = new TestState("agg-1"); + + assertThatThrownBy(() -> noAgentHandler.apply(command, state).toList()) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("No Agent found"); + } + + @Test + void applyShouldThrowForUnknownCommandType() { + Command unknownCommand = new Command() { + @Override + public String getAggregateId() { + return "agg-1"; + } + }; + var state = new TestState("agg-1"); + + assertThatThrownBy(() -> handler.apply(unknownCommand, state).toList()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Unsupported built-in command type"); + } +} diff --git a/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/AssignTaskTypeConstantsTest.java b/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/AssignTaskTypeConstantsTest.java new file mode 100644 index 00000000..c5670244 --- /dev/null +++ b/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/AssignTaskTypeConstantsTest.java @@ -0,0 +1,53 @@ +/* + * Copyright 2022 - 2026 The Original Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.elasticsoftware.akces.agentic.runtime; + +import org.elasticsoftware.akces.agentic.AgenticAggregateRuntime; +import org.elasticsoftware.akces.agentic.commands.AssignTaskCommand; +import org.elasticsoftware.akces.agentic.events.AgentTaskAssignedEvent; +import org.elasticsoftware.akces.aggregate.CommandType; +import org.elasticsoftware.akces.aggregate.DomainEventType; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for the built-in type registration constants on {@link AgenticAggregateRuntime}, + * verifying the {@code ASSIGN_TASK_COMMAND_TYPE} and {@code AGENT_TASK_ASSIGNED_TYPE} + * constants are correctly configured. + */ +class AssignTaskTypeConstantsTest { + + @Test + void assignTaskCommandTypeShouldBeConfiguredCorrectly() { + CommandType type = AgenticAggregateRuntime.ASSIGN_TASK_COMMAND_TYPE; + + assertThat(type.typeName()).isEqualTo("AssignTask"); + assertThat(type.version()).isEqualTo(1); + assertThat(type.typeClass()).isEqualTo(AssignTaskCommand.class); + } + + @Test + void agentTaskAssignedTypeShouldBeConfiguredCorrectly() { + DomainEventType type = AgenticAggregateRuntime.AGENT_TASK_ASSIGNED_TYPE; + + assertThat(type.typeName()).isEqualTo("AgentTaskAssigned"); + assertThat(type.version()).isEqualTo(1); + assertThat(type.typeClass()).isEqualTo(AgentTaskAssignedEvent.class); + } +} diff --git a/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/MemoryEventSourcingTest.java b/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/MemoryEventSourcingTest.java index 371b3a5b..20777a1d 100644 --- a/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/MemoryEventSourcingTest.java +++ b/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/MemoryEventSourcingTest.java @@ -38,7 +38,7 @@ * {@link MemoryRevokedEvent} sequences. * *

Per framework testing guidelines, the event-sourcing handler methods - * ({@code onMemoryStored}, {@code onMemoryRevoked}, {@code handleMemoryEvent}) are + * ({@code onMemoryStored}, {@code onMemoryRevoked}, {@code handleBuiltInEvent}) are * tested through their actual invocations rather than testing the event records directly. */ class MemoryEventSourcingTest { @@ -150,35 +150,35 @@ void onMemoryRevokedShouldThrowWhenStateIsNotMemoryAware() { } // ------------------------------------------------------------------------- - // handleMemoryEvent dispatch tests + // handleMemoryEvent dispatch tests (via handleBuiltInEvent) // ------------------------------------------------------------------------- @Test - void handleMemoryEventShouldDispatchMemoryStoredEvent() { + void handleBuiltInEventShouldDispatchMemoryStoredEvent() { Instant now = Instant.now(); var event = new MemoryStoredEvent("agg-1", "mem-1", "sub", "fact", "cite", "reason", now); var state = new TestMemoryState("agg-1", List.of()); - AggregateState result = KafkaAgenticAggregateRuntime.handleMemoryEvent(event, state); + AggregateState result = KafkaAgenticAggregateRuntime.handleBuiltInEvent(event, state); assertThat(result).isInstanceOf(TestMemoryState.class); assertThat(((TestMemoryState) result).getMemories()).hasSize(1); } @Test - void handleMemoryEventShouldDispatchMemoryRevokedEvent() { + void handleBuiltInEventShouldDispatchMemoryRevokedEvent() { var m1 = new AgenticAggregateMemory("m1", "s", "f", "c", "r", Instant.now()); var state = new TestMemoryState("agg-1", List.of(m1)); var event = new MemoryRevokedEvent("agg-1", "m1", "evicted", Instant.now()); - AggregateState result = KafkaAgenticAggregateRuntime.handleMemoryEvent(event, state); + AggregateState result = KafkaAgenticAggregateRuntime.handleBuiltInEvent(event, state); assertThat(result).isInstanceOf(TestMemoryState.class); assertThat(((TestMemoryState) result).getMemories()).isEmpty(); } @Test - void handleMemoryEventShouldThrowForUnknownEventType() { + void handleBuiltInEventShouldThrowForUnknownEventType() { var unknownEvent = new DomainEvent() { @Override public String getAggregateId() { @@ -187,9 +187,9 @@ public String getAggregateId() { }; var state = new TestMemoryState("agg-1", List.of()); - assertThatThrownBy(() -> KafkaAgenticAggregateRuntime.handleMemoryEvent(unknownEvent, state)) + assertThatThrownBy(() -> KafkaAgenticAggregateRuntime.handleBuiltInEvent(unknownEvent, state)) .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("Unsupported memory event type"); + .hasMessageContaining("Unsupported built-in event type"); } // ------------------------------------------------------------------------- @@ -215,7 +215,7 @@ void shouldReconstructStateFromEventSequence() { // Replay events to reconstruct state for (DomainEvent event : events) { - state = KafkaAgenticAggregateRuntime.handleMemoryEvent(event, state); + state = KafkaAgenticAggregateRuntime.handleBuiltInEvent(event, state); } var finalState = (TestMemoryState) state; @@ -229,9 +229,9 @@ void shouldHandleStoreAndRevokeOfSameMemoryInSequence() { AggregateState state = new TestMemoryState("agg-1", List.of()); // Store then immediately revoke - state = KafkaAgenticAggregateRuntime.handleMemoryEvent( + state = KafkaAgenticAggregateRuntime.handleBuiltInEvent( new MemoryStoredEvent("agg-1", "m1", "s", "f", "c", "r", Instant.now()), state); - state = KafkaAgenticAggregateRuntime.handleMemoryEvent( + state = KafkaAgenticAggregateRuntime.handleBuiltInEvent( new MemoryRevokedEvent("agg-1", "m1", "undo", Instant.now()), state); assertThat(((TestMemoryState) state).getMemories()).isEmpty(); @@ -242,7 +242,7 @@ void shouldPreserveInsertionOrderAfterMultipleStoredEvents() { AggregateState state = new TestMemoryState("agg-1", List.of()); for (int i = 1; i <= 5; i++) { - state = KafkaAgenticAggregateRuntime.handleMemoryEvent( + state = KafkaAgenticAggregateRuntime.handleBuiltInEvent( new MemoryStoredEvent("agg-1", "m" + i, "s" + i, "f" + i, "c" + i, "r" + i, Instant.now().plusSeconds(i)), state); diff --git a/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/TaskEventSourcingTest.java b/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/TaskEventSourcingTest.java new file mode 100644 index 00000000..e9d8eb12 --- /dev/null +++ b/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/TaskEventSourcingTest.java @@ -0,0 +1,190 @@ +/* + * Copyright 2022 - 2026 The Original Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.elasticsoftware.akces.agentic.runtime; + +import org.elasticsoftware.akces.agentic.events.AgentTaskAssignedEvent; +import org.elasticsoftware.akces.aggregate.*; +import org.elasticsoftware.akces.events.DomainEvent; +import org.junit.jupiter.api.Test; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * Tests for the built-in event-sourcing handler in {@link KafkaAgenticAggregateRuntime} + * that processes {@link AgentTaskAssignedEvent} by updating {@link TaskAwareState}. + * + *

Follows the same pattern as {@link MemoryEventSourcingTest}: event-sourcing handler + * methods are tested through their actual invocations. + */ +class TaskEventSourcingTest { + + /** Concrete {@link TaskAwareState} implementation for test assertions. */ + record TestTaskState( + String id, + List assignedTasks + ) implements AggregateState, TaskAwareState { + + @Override + public String getAggregateId() { + return id; + } + + @Override + public List getAssignedTasks() { + return assignedTasks; + } + + @Override + public TaskAwareState withAssignedTask(AssignedTask task) { + var updated = new ArrayList<>(assignedTasks); + updated.add(task); + return new TestTaskState(id, List.copyOf(updated)); + } + + @Override + public TaskAwareState withoutAssignedTask(String agentProcessId) { + var updated = assignedTasks.stream() + .filter(t -> !t.agentProcessId().equals(agentProcessId)) + .toList(); + return new TestTaskState(id, updated); + } + } + + /** A non-TaskAwareState state for testing error paths. */ + record PlainState(String id) implements AggregateState { + @Override + public String getAggregateId() { + return id; + } + } + + // ------------------------------------------------------------------------- + // onAgentTaskAssigned tests + // ------------------------------------------------------------------------- + + @Test + void onAgentTaskAssignedShouldAppendTaskToState() { + Instant now = Instant.parse("2026-04-10T12:00:00Z"); + var party = new HumanRequestingParty("user-1", "analyst"); + var metadata = Map.of("correlationId", "corr-123"); + var event = new AgentTaskAssignedEvent("agg-1", "proc-1", "Analyze data", + party, metadata, now); + var initialState = new TestTaskState("agg-1", List.of()); + + AggregateState result = KafkaAgenticAggregateRuntime.onAgentTaskAssigned(event, initialState); + + assertThat(result).isInstanceOf(TestTaskState.class); + var state = (TestTaskState) result; + assertThat(state.getAssignedTasks()).hasSize(1); + AssignedTask assigned = state.getAssignedTasks().getFirst(); + assertThat(assigned.agentProcessId()).isEqualTo("proc-1"); + assertThat(assigned.taskDescription()).isEqualTo("Analyze data"); + assertThat(assigned.requestingParty()).isEqualTo(party); + assertThat(assigned.taskMetadata()).isEqualTo(metadata); + assertThat(assigned.assignedAt()).isEqualTo(now); + } + + @Test + void onAgentTaskAssignedShouldThrowWhenStateIsNotTaskAware() { + var event = new AgentTaskAssignedEvent("agg-1", "proc-1", "task", + new HumanRequestingParty("u", "r"), null, Instant.now()); + var plainState = new PlainState("agg-1"); + + assertThatThrownBy(() -> KafkaAgenticAggregateRuntime.onAgentTaskAssigned(event, plainState)) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("does not implement TaskAwareState"); + } + + @Test + void onAgentTaskAssignedShouldPreserveExistingTasks() { + Instant t1 = Instant.parse("2026-01-01T00:00:00Z"); + Instant t2 = Instant.parse("2026-01-02T00:00:00Z"); + var party = new AgentRequestingParty("agent-1", "TestAgent", "supervisor"); + var existing = new AssignedTask("proc-old", "Old task", party, null, t1); + var state = new TestTaskState("agg-1", List.of(existing)); + + var event = new AgentTaskAssignedEvent("agg-1", "proc-new", "New task", + party, Map.of("priority", "high"), t2); + + AggregateState result = KafkaAgenticAggregateRuntime.onAgentTaskAssigned(event, state); + + var newState = (TestTaskState) result; + assertThat(newState.getAssignedTasks()).hasSize(2); + assertThat(newState.getAssignedTasks().get(0).agentProcessId()).isEqualTo("proc-old"); + assertThat(newState.getAssignedTasks().get(1).agentProcessId()).isEqualTo("proc-new"); + } + + // ------------------------------------------------------------------------- + // handleBuiltInEvent dispatch tests (AgentTaskAssigned path) + // ------------------------------------------------------------------------- + + @Test + void handleBuiltInEventShouldDispatchAgentTaskAssignedEvent() { + Instant now = Instant.now(); + var party = new HumanRequestingParty("user-1", "analyst"); + var event = new AgentTaskAssignedEvent("agg-1", "proc-1", "task", party, null, now); + var state = new TestTaskState("agg-1", List.of()); + + AggregateState result = KafkaAgenticAggregateRuntime.handleBuiltInEvent(event, state); + + assertThat(result).isInstanceOf(TestTaskState.class); + assertThat(((TestTaskState) result).getAssignedTasks()).hasSize(1); + } + + @Test + void handleBuiltInEventShouldThrowForUnknownEventType() { + var unknownEvent = new DomainEvent() { + @Override + public String getAggregateId() { + return "agg-1"; + } + }; + var state = new TestTaskState("agg-1", List.of()); + + assertThatThrownBy(() -> KafkaAgenticAggregateRuntime.handleBuiltInEvent(unknownEvent, state)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Unsupported built-in event type"); + } + + // ------------------------------------------------------------------------- + // State reconstruction from event sequences + // ------------------------------------------------------------------------- + + @Test + void shouldReconstructStateFromMultipleTaskAssignedEvents() { + AggregateState state = new TestTaskState("agg-1", List.of()); + var party = new HumanRequestingParty("user-1", "analyst"); + + for (int i = 1; i <= 3; i++) { + var event = new AgentTaskAssignedEvent("agg-1", "proc-" + i, "Task " + i, + party, null, Instant.now().plusSeconds(i)); + state = KafkaAgenticAggregateRuntime.handleBuiltInEvent(event, state); + } + + var finalState = (TestTaskState) state; + assertThat(finalState.getAssignedTasks()).hasSize(3); + assertThat(finalState.getAssignedTasks()).extracting(AssignedTask::agentProcessId) + .containsExactly("proc-1", "proc-2", "proc-3"); + } +} diff --git a/main/api/src/main/java/org/elasticsoftware/akces/aggregate/AgentRequestingParty.java b/main/api/src/main/java/org/elasticsoftware/akces/aggregate/AgentRequestingParty.java new file mode 100644 index 00000000..f65bbc59 --- /dev/null +++ b/main/api/src/main/java/org/elasticsoftware/akces/aggregate/AgentRequestingParty.java @@ -0,0 +1,34 @@ +/* + * Copyright 2022 - 2026 The Original Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.elasticsoftware.akces.aggregate; + +/** + * A {@link RequestingParty} implementation representing an AI agent that requested + * a task assignment. + * + * @param agentId the unique identifier of the requesting agent + * @param agentName the display name of the requesting agent + * @param role the role of the requesting agent in the system + * (e.g. "orchestrator", "supervisor", "analyst") + */ +public record AgentRequestingParty( + String agentId, + String agentName, + String role +) implements RequestingParty { +} diff --git a/main/api/src/main/java/org/elasticsoftware/akces/aggregate/AssignedTask.java b/main/api/src/main/java/org/elasticsoftware/akces/aggregate/AssignedTask.java new file mode 100644 index 00000000..538642f7 --- /dev/null +++ b/main/api/src/main/java/org/elasticsoftware/akces/aggregate/AssignedTask.java @@ -0,0 +1,45 @@ +/* + * Copyright 2022 - 2026 The Original Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.elasticsoftware.akces.aggregate; + +import java.time.Instant; +import java.util.Map; + +/** + * An immutable record representing a task that has been assigned to an + * {@link AgenticAggregate} for AI-assisted processing. + * + *

Instances are created by the built-in event-sourcing handler when an + * {@code AgentTaskAssignedEvent} is applied, and are stored in aggregate states + * that implement {@link TaskAwareState}. + * + * @param agentProcessId the Embabel {@code AgentProcess} identifier created for this task + * @param taskDescription a human-readable description of what the agent should do + * @param requestingParty the entity (agent or human) that requested this task + * @param taskMetadata optional key-value metadata for additional context + * (e.g. correlationId, priority, deadline) + * @param assignedAt the instant at which the task was assigned + */ +public record AssignedTask( + String agentProcessId, + String taskDescription, + RequestingParty requestingParty, + Map taskMetadata, + Instant assignedAt +) { +} diff --git a/main/api/src/main/java/org/elasticsoftware/akces/aggregate/HumanRequestingParty.java b/main/api/src/main/java/org/elasticsoftware/akces/aggregate/HumanRequestingParty.java new file mode 100644 index 00000000..e55a065e --- /dev/null +++ b/main/api/src/main/java/org/elasticsoftware/akces/aggregate/HumanRequestingParty.java @@ -0,0 +1,35 @@ +/* + * Copyright 2022 - 2026 The Original Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.elasticsoftware.akces.aggregate; + +/** + * A {@link RequestingParty} implementation representing a human user that requested + * a task assignment. + * + *

For GDPR reasons, this record does not carry any personally identifiable + * information beyond a system-level user identifier and role. + * + * @param userId the unique identifier of the human user + * @param role the role of the human user in the system + * (e.g. "administrator", "analyst", "end-user") + */ +public record HumanRequestingParty( + String userId, + String role +) implements RequestingParty { +} diff --git a/main/api/src/main/java/org/elasticsoftware/akces/aggregate/RequestingParty.java b/main/api/src/main/java/org/elasticsoftware/akces/aggregate/RequestingParty.java new file mode 100644 index 00000000..d2999349 --- /dev/null +++ b/main/api/src/main/java/org/elasticsoftware/akces/aggregate/RequestingParty.java @@ -0,0 +1,56 @@ +/* + * Copyright 2022 - 2026 The Original Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.elasticsoftware.akces.aggregate; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; + +/** + * Represents the entity that requested a task assignment to an + * {@link AgenticAggregate} — either another AI agent or a human user. + * + *

This is a sealed interface with two permitted implementations: + *

    + *
  • {@link AgentRequestingParty} — when the requesting party is another AI agent
  • + *
  • {@link HumanRequestingParty} — when the requesting party is a human user
  • + *
+ * + *

Each variant carries a {@link #role()} describing the requesting party's role in the + * system (e.g. "administrator", "analyst", "orchestrator"). + * + *

Jackson polymorphic serialization is configured via {@link JsonTypeInfo} and + * {@link JsonSubTypes}, using a {@code "type"} discriminator property with values + * {@code "agent"} and {@code "human"}. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonSubTypes({ + @JsonSubTypes.Type(value = AgentRequestingParty.class, name = "agent"), + @JsonSubTypes.Type(value = HumanRequestingParty.class, name = "human") +}) +public sealed interface RequestingParty permits AgentRequestingParty, HumanRequestingParty { + + /** + * The role of this requesting party in the system. + * + *

Examples: {@code "administrator"}, {@code "analyst"}, {@code "orchestrator"}, + * {@code "supervisor"}. + * + * @return the role; never {@code null} + */ + String role(); +} diff --git a/main/api/src/main/java/org/elasticsoftware/akces/aggregate/TaskAwareState.java b/main/api/src/main/java/org/elasticsoftware/akces/aggregate/TaskAwareState.java new file mode 100644 index 00000000..8d5281dd --- /dev/null +++ b/main/api/src/main/java/org/elasticsoftware/akces/aggregate/TaskAwareState.java @@ -0,0 +1,61 @@ +/* + * Copyright 2022 - 2026 The Original Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.elasticsoftware.akces.aggregate; + +import java.util.List; + +/** + * State interface for agentic aggregates that track assigned tasks. + * + *

Mirrors the {@link MemoryAwareState} pattern: the framework's built-in event-sourcing + * handler for {@code AgentTaskAssignedEvent} checks whether the aggregate state implements + * this interface, and if so, calls {@link #withAssignedTask(AssignedTask)} to update the + * state. If the state does not implement this interface, an {@link IllegalStateException} + * is thrown. + * + *

States may implement both {@link MemoryAwareState} and {@code TaskAwareState} when + * an agentic aggregate needs to track both memories and assigned tasks. + * + *

All mutation methods return new instances — implementations must be immutable. + */ +public interface TaskAwareState { + + /** + * Returns the list of currently assigned tasks. + * + * @return an unmodifiable list of assigned tasks; never {@code null} + */ + List getAssignedTasks(); + + /** + * Returns a new state instance with the given task appended to the assigned tasks list. + * + * @param task the task to add; must not be {@code null} + * @return a new state instance with the task added + */ + TaskAwareState withAssignedTask(AssignedTask task); + + /** + * Returns a new state instance with the task identified by the given + * {@code agentProcessId} removed from the assigned tasks list. + * + * @param agentProcessId the Embabel AgentProcess ID of the task to remove + * @return a new state instance with the matching task removed + */ + TaskAwareState withoutAssignedTask(String agentProcessId); +} diff --git a/main/api/src/test/java/org/elasticsoftware/akces/aggregate/RequestingPartySerializationTest.java b/main/api/src/test/java/org/elasticsoftware/akces/aggregate/RequestingPartySerializationTest.java new file mode 100644 index 00000000..4db0a372 --- /dev/null +++ b/main/api/src/test/java/org/elasticsoftware/akces/aggregate/RequestingPartySerializationTest.java @@ -0,0 +1,177 @@ +/* + * Copyright 2022 - 2026 The Original Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.elasticsoftware.akces.aggregate; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for the {@link RequestingParty} sealed interface and its implementations + * ({@link AgentRequestingParty} and {@link HumanRequestingParty}), verifying + * record construction, equality, and Jackson annotation configuration. + * + *

Since the api module only has {@code jackson-annotations} (not {@code jackson-databind}), + * serialization round-trips are verified in the agentic module tests where a full Jackson + * {@code ObjectMapper} is available. + */ +class RequestingPartySerializationTest { + + // ------------------------------------------------------------------------- + // AgentRequestingParty tests + // ------------------------------------------------------------------------- + + @Test + void agentRequestingPartyShouldPopulateAllFields() { + var agent = new AgentRequestingParty("agent-1", "Orchestrator", "supervisor"); + + assertThat(agent.agentId()).isEqualTo("agent-1"); + assertThat(agent.agentName()).isEqualTo("Orchestrator"); + assertThat(agent.role()).isEqualTo("supervisor"); + } + + @Test + void agentRequestingPartyShouldImplementRequestingParty() { + var agent = new AgentRequestingParty("agent-1", "Orchestrator", "supervisor"); + + assertThat(agent).isInstanceOf(RequestingParty.class); + assertThat(((RequestingParty) agent).role()).isEqualTo("supervisor"); + } + + // ------------------------------------------------------------------------- + // HumanRequestingParty tests + // ------------------------------------------------------------------------- + + @Test + void humanRequestingPartyShouldPopulateAllFields() { + var human = new HumanRequestingParty("user-1", "administrator"); + + assertThat(human.userId()).isEqualTo("user-1"); + assertThat(human.role()).isEqualTo("administrator"); + } + + @Test + void humanRequestingPartyShouldImplementRequestingParty() { + var human = new HumanRequestingParty("user-1", "administrator"); + + assertThat(human).isInstanceOf(RequestingParty.class); + assertThat(((RequestingParty) human).role()).isEqualTo("administrator"); + } + + // ------------------------------------------------------------------------- + // Equality tests + // ------------------------------------------------------------------------- + + @Test + void agentRequestingPartyEqualityShouldHold() { + var a = new AgentRequestingParty("id-1", "Name", "role"); + var b = new AgentRequestingParty("id-1", "Name", "role"); + + assertThat(a).isEqualTo(b); + assertThat(a.hashCode()).isEqualTo(b.hashCode()); + } + + @Test + void humanRequestingPartyEqualityShouldHold() { + var a = new HumanRequestingParty("id-1", "role"); + var b = new HumanRequestingParty("id-1", "role"); + + assertThat(a).isEqualTo(b); + assertThat(a.hashCode()).isEqualTo(b.hashCode()); + } + + @Test + void agentAndHumanShouldNotBeEqual() { + var agent = new AgentRequestingParty("id-1", "Name", "role"); + var human = new HumanRequestingParty("id-1", "role"); + + assertThat(agent).isNotEqualTo(human); + } + + @Test + void agentRequestingPartyInequalityWhenIdDiffers() { + var a = new AgentRequestingParty("id-1", "Name", "role"); + var b = new AgentRequestingParty("id-2", "Name", "role"); + + assertThat(a).isNotEqualTo(b); + } + + @Test + void humanRequestingPartyInequalityWhenIdDiffers() { + var a = new HumanRequestingParty("id-1", "role"); + var b = new HumanRequestingParty("id-2", "role"); + + assertThat(a).isNotEqualTo(b); + } + + // ------------------------------------------------------------------------- + // Jackson annotation tests + // ------------------------------------------------------------------------- + + @Test + void requestingPartyShouldHaveJsonTypeInfoAnnotation() { + JsonTypeInfo typeInfo = RequestingParty.class.getAnnotation(JsonTypeInfo.class); + + assertThat(typeInfo).isNotNull(); + assertThat(typeInfo.use()).isEqualTo(JsonTypeInfo.Id.NAME); + assertThat(typeInfo.property()).isEqualTo("type"); + } + + @Test + void requestingPartyShouldHaveJsonSubTypesAnnotation() { + JsonSubTypes subTypes = RequestingParty.class.getAnnotation(JsonSubTypes.class); + + assertThat(subTypes).isNotNull(); + assertThat(subTypes.value()).hasSize(2); + + JsonSubTypes.Type[] types = subTypes.value(); + assertThat(types[0].value()).isEqualTo(AgentRequestingParty.class); + assertThat(types[0].name()).isEqualTo("agent"); + assertThat(types[1].value()).isEqualTo(HumanRequestingParty.class); + assertThat(types[1].name()).isEqualTo("human"); + } + + // ------------------------------------------------------------------------- + // Sealed interface tests + // ------------------------------------------------------------------------- + + @Test + void requestingPartyShouldBeSealed() { + assertThat(RequestingParty.class.isSealed()).isTrue(); + assertThat(RequestingParty.class.getPermittedSubclasses()) + .containsExactlyInAnyOrder(AgentRequestingParty.class, HumanRequestingParty.class); + } + + @Test + void patternMatchingShouldWorkWithSwitch() { + RequestingParty agent = new AgentRequestingParty("a-1", "TestAgent", "supervisor"); + RequestingParty human = new HumanRequestingParty("u-1", "analyst"); + + assertThat(describe(agent)).isEqualTo("Agent: TestAgent (supervisor)"); + assertThat(describe(human)).isEqualTo("Human: u-1 (analyst)"); + } + + private static String describe(RequestingParty party) { + return switch (party) { + case AgentRequestingParty a -> "Agent: " + a.agentName() + " (" + a.role() + ")"; + case HumanRequestingParty h -> "Human: " + h.userId() + " (" + h.role() + ")"; + }; + } +} diff --git a/main/api/src/test/java/org/elasticsoftware/akces/aggregate/TaskAwareStateTest.java b/main/api/src/test/java/org/elasticsoftware/akces/aggregate/TaskAwareStateTest.java new file mode 100644 index 00000000..b60ea053 --- /dev/null +++ b/main/api/src/test/java/org/elasticsoftware/akces/aggregate/TaskAwareStateTest.java @@ -0,0 +1,194 @@ +/* + * Copyright 2022 - 2026 The Original Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.elasticsoftware.akces.aggregate; + +import org.junit.jupiter.api.Test; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for the {@link TaskAwareState} interface contract and the {@link AssignedTask} record, + * following the same pattern as {@link MemoryAwareStateTest}. Per framework testing guidelines, + * interfaces and records are tested through concrete implementations rather than directly. + */ +class TaskAwareStateTest { + + /** Concrete {@link TaskAwareState} implementation for testing. */ + record TestTaskState( + String id, + List assignedTasks + ) implements AggregateState, TaskAwareState { + + @Override + public String getAggregateId() { + return id; + } + + @Override + public List getAssignedTasks() { + return assignedTasks; + } + + @Override + public TaskAwareState withAssignedTask(AssignedTask task) { + var updated = new ArrayList<>(assignedTasks); + updated.add(task); + return new TestTaskState(id, List.copyOf(updated)); + } + + @Override + public TaskAwareState withoutAssignedTask(String agentProcessId) { + var updated = assignedTasks.stream() + .filter(t -> !t.agentProcessId().equals(agentProcessId)) + .toList(); + return new TestTaskState(id, updated); + } + } + + // ------------------------------------------------------------------------- + // AssignedTask record tests + // ------------------------------------------------------------------------- + + @Test + void assignedTaskShouldPopulateAllFields() { + Instant now = Instant.now(); + var party = new HumanRequestingParty("user-1", "analyst"); + var metadata = Map.of("correlationId", "corr-123"); + + var task = new AssignedTask("proc-1", "Analyze data", party, metadata, now); + + assertThat(task.agentProcessId()).isEqualTo("proc-1"); + assertThat(task.taskDescription()).isEqualTo("Analyze data"); + assertThat(task.requestingParty()).isEqualTo(party); + assertThat(task.taskMetadata()).isEqualTo(metadata); + assertThat(task.assignedAt()).isEqualTo(now); + } + + @Test + void assignedTaskEqualityShouldHold() { + Instant now = Instant.parse("2026-04-10T12:00:00Z"); + var party = new AgentRequestingParty("agent-1", "TestAgent", "supervisor"); + var a = new AssignedTask("proc-1", "Do task", party, Map.of(), now); + var b = new AssignedTask("proc-1", "Do task", party, Map.of(), now); + + assertThat(a).isEqualTo(b); + assertThat(a.hashCode()).isEqualTo(b.hashCode()); + } + + @Test + void assignedTaskInequalityWhenProcessIdDiffers() { + Instant now = Instant.now(); + var party = new HumanRequestingParty("user-1", "analyst"); + var a = new AssignedTask("proc-1", "task", party, null, now); + var b = new AssignedTask("proc-2", "task", party, null, now); + + assertThat(a).isNotEqualTo(b); + } + + @Test + void assignedTaskShouldAllowNullMetadata() { + Instant now = Instant.now(); + var party = new HumanRequestingParty("user-1", "analyst"); + + var task = new AssignedTask("proc-1", "task", party, null, now); + + assertThat(task.taskMetadata()).isNull(); + } + + // ------------------------------------------------------------------------- + // TaskAwareState contract tests + // ------------------------------------------------------------------------- + + @Test + void emptyStateShouldReturnEmptyTaskList() { + var state = new TestTaskState("agg-1", List.of()); + + assertThat(state.getAssignedTasks()).isEmpty(); + } + + @Test + void withAssignedTaskShouldAppendToEnd() { + Instant now = Instant.now(); + var party = new HumanRequestingParty("user-1", "analyst"); + var task1 = new AssignedTask("proc-1", "Task 1", party, null, now); + var task2 = new AssignedTask("proc-2", "Task 2", party, null, now.plusSeconds(1)); + + TaskAwareState state = new TestTaskState("agg-1", List.of()); + state = state.withAssignedTask(task1); + state = state.withAssignedTask(task2); + + assertThat(state.getAssignedTasks()).hasSize(2); + assertThat(state.getAssignedTasks().get(0).agentProcessId()).isEqualTo("proc-1"); + assertThat(state.getAssignedTasks().get(1).agentProcessId()).isEqualTo("proc-2"); + } + + @Test + void withoutAssignedTaskShouldRemoveByProcessId() { + Instant now = Instant.now(); + var party = new AgentRequestingParty("agent-1", "Agent", "supervisor"); + var task1 = new AssignedTask("proc-1", "Task 1", party, null, now); + var task2 = new AssignedTask("proc-2", "Task 2", party, null, now); + var task3 = new AssignedTask("proc-3", "Task 3", party, null, now); + + TaskAwareState state = new TestTaskState("agg-1", List.of(task1, task2, task3)); + state = state.withoutAssignedTask("proc-2"); + + assertThat(state.getAssignedTasks()).hasSize(2); + assertThat(state.getAssignedTasks()).extracting(AssignedTask::agentProcessId) + .containsExactly("proc-1", "proc-3"); + } + + @Test + void withoutNonExistentProcessIdShouldReturnEquivalentState() { + Instant now = Instant.now(); + var party = new HumanRequestingParty("user-1", "analyst"); + var task = new AssignedTask("proc-1", "Task 1", party, null, now); + + TaskAwareState state = new TestTaskState("agg-1", List.of(task)); + TaskAwareState unchanged = state.withoutAssignedTask("non-existent"); + + assertThat(unchanged.getAssignedTasks()).isEqualTo(state.getAssignedTasks()); + } + + @Test + void taskRoundTripThroughTaskAwareState() { + Instant now = Instant.now(); + var party = new HumanRequestingParty("user-42", "manager"); + var metadata = Map.of("priority", "high", "deadline", "2026-05-01"); + var task = new AssignedTask("proc-42", "Urgent analysis", party, metadata, now); + + var emptyState = new TestTaskState("agg-1", List.of()); + var stateWithTask = (TestTaskState) emptyState.withAssignedTask(task); + + assertThat(stateWithTask.getAssignedTasks()).hasSize(1); + assertThat(stateWithTask.getAssignedTasks().getFirst()).isEqualTo(task); + + // Verify all fields survived the round-trip + AssignedTask retrieved = stateWithTask.getAssignedTasks().getFirst(); + assertThat(retrieved.agentProcessId()).isEqualTo("proc-42"); + assertThat(retrieved.taskDescription()).isEqualTo("Urgent analysis"); + assertThat(retrieved.requestingParty()).isEqualTo(party); + assertThat(retrieved.taskMetadata()).isEqualTo(metadata); + assertThat(retrieved.assignedAt()).isEqualTo(now); + } +} diff --git a/plans/assign-task/assign-task-plan.md b/plans/assign-task/assign-task-plan.md new file mode 100644 index 00000000..f1f000d7 --- /dev/null +++ b/plans/assign-task/assign-task-plan.md @@ -0,0 +1,425 @@ +# AssignTask Support for AgenticAggregate + +## Overview + +This plan adds built-in task assignment support to the Akces Framework's agentic aggregates. When an `AssignTask` command is sent to an `AgenticAggregate`, the framework handles it with a built-in command handler that creates an Embabel `AgentProcess` and emits an `AgentTaskAssigned` event containing the process ID and requesting party information. + +This feature introduces: +1. A **`RequestingParty`** concept — a sealed interface representing who is requesting the task (an Agent or a Human), each with a `String role`. +2. A **built-in `AssignTaskCommand`** — the first framework-owned command, automatically registered for every `AgenticAggregate`. +3. A **built-in `AgentTaskAssignedEvent`** — emitted when the task is accepted, containing the Embabel `AgentProcess.getId()` value. +4. **Built-in command and event-sourcing handlers** — registered automatically by the framework, following the same pattern as the existing `MemoryStored`/`MemoryRevoked` built-in handlers. + +> **Relationship to other plans**: This plan builds on the [Embabel AgentPlatform Integration Plan](../embabel-integration-plan.md) and is a prerequisite for the [Incremental Agent Task Processing Plan](../agenttasks.md). The `AgentTaskAssigned` event creates the link between a requesting party and an active Embabel `AgentProcess`, which the incremental tick model will later use to advance processes across poll loop iterations. + +--- + +## Phase 1: Core Types — `RequestingParty` (api module) + +### What + +Introduce the `RequestingParty` concept as a sealed interface in the `api` module. This represents the entity that requested the task assignment — either another AI agent or a human user. + +### Module + +`main/api` — package `org.elasticsoftware.akces.aggregate` + +### New Files + +| File | Description | +|------|-------------| +| `RequestingParty.java` | Sealed interface with `String role()` method, Jackson `@JsonTypeInfo` / `@JsonSubTypes` for polymorphic serialization | +| `AgentRequestingParty.java` | Record implementing `RequestingParty` for agent requestors — fields: `String agentId`, `String agentName`, `String role` | +| `HumanRequestingParty.java` | Record implementing `RequestingParty` for human requestors — fields: `String userId`, `String displayName`, `String role` | + +### Design Decisions + +**Sealed interface vs. single record with enum discriminator:** + +A sealed interface with `AgentRequestingParty` and `HumanRequestingParty` permits is preferred because: +- It leverages Java 25 sealed types and pattern matching (`switch` expressions with exhaustiveness checking) +- It allows each variant to carry distinct fields (agents have `agentId`/`agentName`, humans have `userId`/`displayName`) +- It aligns with the framework's preference for immutable records +- Jackson 3.x supports sealed type serialization via `@JsonTypeInfo` + +**Location in `api` module:** + +`RequestingParty` is placed in the `api` module (not `agentic`) because: +- It is a core domain concept that may be referenced by application-level aggregates, query models, and database models +- It follows the pattern of `AgenticAggregateMemory` and `MemoryAwareState`, which are also in `api` +- The `agentic` module depends on `api`, not the other way around + +### Serialization + +```java +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonSubTypes({ + @JsonSubTypes.Type(value = AgentRequestingParty.class, name = "agent"), + @JsonSubTypes.Type(value = HumanRequestingParty.class, name = "human") +}) +public sealed interface RequestingParty permits AgentRequestingParty, HumanRequestingParty { + /** + * The role of this requesting party in the system (e.g., "administrator", "analyst", "orchestrator"). + */ + String role(); +} +``` + +```java +public record AgentRequestingParty( + String agentId, + String agentName, + String role +) implements RequestingParty {} +``` + +```java +public record HumanRequestingParty( + String userId, + String displayName, + String role +) implements RequestingParty {} +``` + +### Dependencies + +None — uses only Jackson annotations already available in the `api` module. + +--- + +## Phase 2: Built-in Command and Event (agentic module) + +### What + +Create the `AssignTaskCommand` and `AgentTaskAssignedEvent` as framework-owned types in the `agentic` module, following the established pattern of `MemoryStoredEvent` and `MemoryRevokedEvent`. + +### Module + +`main/agentic` — new package `org.elasticsoftware.akces.agentic.commands` for the command, existing package `org.elasticsoftware.akces.agentic.events` for the event. + +### New Files + +| File | Package | Description | +|------|---------|-------------| +| `AssignTaskCommand.java` | `o.e.a.agentic.commands` | Built-in command — fields: `agenticAggregateId` (aggregate identifier), `taskDescription` (what the agent should do), `requestingParty` (who requested it), `taskMetadata` (optional Map of additional context) | +| `AgentTaskAssignedEvent.java` | `o.e.a.agentic.events` | Built-in event — fields: `agenticAggregateId`, `agentProcessId` (from `AgentProcess.getId()`), `taskDescription`, `requestingParty`, `taskMetadata`, `assignedAt` (Instant) | + +### AssignTaskCommand + +```java +@CommandInfo(type = "AssignTask", version = 1, description = "Assigns a task to an agentic aggregate for AI-assisted processing") +public record AssignTaskCommand( + @AggregateIdentifier @NotNull String agenticAggregateId, + @NotNull String taskDescription, + @NotNull RequestingParty requestingParty, + Map taskMetadata +) implements Command { + @Override + @Nonnull + public String getAggregateId() { + return agenticAggregateId; + } +} +``` + +### AgentTaskAssignedEvent + +```java +@DomainEventInfo(type = "AgentTaskAssigned", version = 1, description = "Emitted when a task has been assigned to an agentic aggregate and an AgentProcess has been created") +public record AgentTaskAssignedEvent( + @AggregateIdentifier String agenticAggregateId, + @NotNull String agentProcessId, + @NotNull String taskDescription, + @NotNull RequestingParty requestingParty, + Map taskMetadata, + @NotNull Instant assignedAt +) implements DomainEvent { + @Override + @Nonnull + public String getAggregateId() { + return agenticAggregateId; + } +} +``` + +### Design Decisions + +**First built-in command:** + +This is the first framework-owned `Command` implementation. Until now, all commands were application-defined. The `AssignTask` command is a framework concern because it directly interacts with the Embabel runtime to create an `AgentProcess`, which is an infrastructure operation. + +**`taskMetadata` field:** + +A `Map` provides extensibility for passing additional context to the agent process without changing the command schema. Examples: `correlationId`, `priority`, `deadline`, `sourceSystem`. + +**Command module location:** + +A new `commands` package is created in the `agentic` module, mirroring the existing `events` package. This keeps the layout symmetric and predictable. + +### Type Registration Constants + +In `AgenticAggregateRuntime` (or a dedicated constants class), add: + +```java +CommandType ASSIGN_TASK_COMMAND_TYPE = new CommandType<>( + "AssignTask", 1, AssignTaskCommand.class, false, false, false); + +DomainEventType AGENT_TASK_ASSIGNED_TYPE = new DomainEventType<>( + "AgentTaskAssigned", 1, AgentTaskAssignedEvent.class, false, false, false, false); +``` + +--- + +## Phase 3: Built-in Command Handler and Event-Sourcing Handler (agentic module) + +### What + +Implement the built-in handler that processes `AssignTaskCommand` and emits `AgentTaskAssignedEvent`, plus the event-sourcing handler that updates the aggregate state when the event is applied. + +### Module + +`main/agentic` — package `org.elasticsoftware.akces.agentic.runtime` + +### Changes to Existing Files + +#### `KafkaAgenticAggregateRuntime` + +Add two new static methods following the existing `handleMemoryEvent` pattern: + +1. **`handleAssignTask(Command command, AggregateState state)`** — Built-in command handler: + - Casts command to `AssignTaskCommand` + - Creates an `AgentProcess` via `agentPlatform.createAgentProcess(agent, ProcessOptions.DEFAULT, bindings)` with the task description and requesting party in the bindings + - Retrieves the process ID via `agentProcess.getId()` + - Returns a `Stream` containing a single `AgentTaskAssignedEvent` with all relevant fields + + > **Note:** Unlike the memory handlers (which are event-sourcing handlers and are `static`), this is a command handler that needs access to the `AgentPlatform` and `Agent` instances. This means the handler **cannot** be a static method reference. Instead, it will be implemented as a non-static method or as a dedicated adapter class. + +2. **`handleAgentTaskAssignedEvent(DomainEvent event, AggregateState state)`** — Built-in event-sourcing handler: + - Pattern matches on `AgentTaskAssignedEvent` + - Updates the state to track the assigned task (see Phase 4 for state interface) + - Returns the updated state + +#### `AgenticAggregateRuntimeFactory` + +Register the built-in command and event types, following the same pattern as memory events: + +```java +// Register built-in AssignTask command handler +runtimeBuilder + .addCommandHandler(ASSIGN_TASK_COMMAND_TYPE, assignTaskHandler) + .addCommand(ASSIGN_TASK_COMMAND_TYPE); + +// Register built-in AgentTaskAssigned event-sourcing handler +runtimeBuilder + .addEventSourcingHandler(AGENT_TASK_ASSIGNED_TYPE, KafkaAgenticAggregateRuntime::handleTaskEvent) + .addDomainEvent(AGENT_TASK_ASSIGNED_TYPE); +``` + +#### `AkcesAgenticAggregateController` + +Register the `AssignTask` and `AgentTaskAssigned` schemas in the schema registry, following the pattern used for `MemoryStored` and `MemoryRevoked`. + +### Handler Design — Command Handler Adapter + +Because the `AssignTask` handler needs runtime access to `AgentPlatform` and `Agent`, it cannot use the simple `static` method reference pattern of memory handlers. Two options: + +**Option A: Dedicated `AssignTaskCommandHandler` class** (recommended) + +A new `CommandHandlerFunction` implementation in `o.e.a.agentic.runtime`: + +```java +public class AssignTaskCommandHandler + implements CommandHandlerFunction { + + private final AgentPlatform agentPlatform; + private final Agent agent; + private final AgenticAggregate aggregate; + + @Override + public Stream apply(AssignTaskCommand command, S state) { + // 1. Build bindings from command metadata + // 2. Create AgentProcess + // 3. Emit AgentTaskAssignedEvent with process ID + } + + @Override + public boolean isCreate() { + return false; + } +} +``` + +**Option B: Lambda registered inline in the factory** + +Less clean, but avoids a new class. Not recommended due to complexity of captured variables. + +### Dependencies + +- Phase 1 (RequestingParty types for the command and event fields) +- Phase 2 (AssignTaskCommand and AgentTaskAssignedEvent for handler signatures) + +--- + +## Phase 4: State Interface for Task Tracking (api module) + +### What + +Extend the state contracts to support tracking assigned tasks, so the event-sourcing handler for `AgentTaskAssignedEvent` can update the aggregate state. + +### Module + +`main/api` — package `org.elasticsoftware.akces.aggregate` + +### New Files + +| File | Description | +|------|-------------| +| `AssignedTask.java` | Record representing an assigned task: `String agentProcessId`, `String taskDescription`, `RequestingParty requestingParty`, `Map taskMetadata`, `Instant assignedAt` | +| `TaskAwareState.java` | Interface for states that track assigned tasks: `List getAssignedTasks()`, `TaskAwareState withAssignedTask(AssignedTask task)`, `TaskAwareState withoutAssignedTask(String agentProcessId)` | + +### Design Decisions + +**Separate interface vs. extending `MemoryAwareState`:** + +A separate `TaskAwareState` interface is preferred because: +- Not all agentic aggregates need to track both memories and assigned tasks +- It follows the Interface Segregation Principle +- States can implement both `MemoryAwareState` and `TaskAwareState` when needed +- The framework can check `instanceof TaskAwareState` independently + +**Relationship to `MemoryAwareState`:** + +The `TaskAwareState` pattern mirrors `MemoryAwareState`: +- Immutable update methods (`withAssignedTask`, `withoutAssignedTask`) +- The framework's built-in handler checks for the interface and fails with `IllegalStateException` if not implemented + +### Event-Sourcing Handler + +The built-in event-sourcing handler in `KafkaAgenticAggregateRuntime`: + +```java +public static AggregateState onAgentTaskAssigned(AgentTaskAssignedEvent event, AggregateState state) { + if (!(state instanceof TaskAwareState tas)) { + throw new IllegalStateException( + "Aggregate state " + state.getClass().getName() + + " does not implement TaskAwareState"); + } + AssignedTask task = new AssignedTask( + event.agentProcessId(), + event.taskDescription(), + event.requestingParty(), + event.taskMetadata(), + event.assignedAt()); + return (AggregateState) tas.withAssignedTask(task); +} +``` + +--- + +## Phase 5: Tests + +### What + +Add comprehensive tests for all new components across both modules. + +### Module + +`main/api` (for Phase 1 & 4 types) and `main/agentic` (for Phase 2 & 3 handlers) + +### Test Categories + +#### Unit Tests (api module) + +| Test Class | What is Tested | +|------------|----------------| +| `RequestingPartySerializationTest` | Jackson serialization/deserialization of `AgentRequestingParty` and `HumanRequestingParty`, including polymorphic type handling | +| `AssignedTaskTest` | Record equality, construction, immutability | +| `TaskAwareStateTest` | Default behavior of `TaskAwareState` methods with a test implementation | + +#### Unit Tests (agentic module) + +| Test Class | What is Tested | +|------------|----------------| +| `AssignTaskCommandTest` | Command construction, aggregate ID extraction, validation annotations | +| `AgentTaskAssignedEventTest` | Event construction, aggregate ID extraction | +| `AssignTaskCommandHandlerTest` | Handler creates AgentProcess, emits correct event with process ID, handles errors | +| `AgentTaskAssignedEventSourcingTest` | Event-sourcing handler correctly updates TaskAwareState | + +#### Integration Tests (agentic module) + +| Test Class | What is Tested | +|------------|----------------| +| `AssignTaskIntegrationTest` | Full flow: send AssignTaskCommand → handler creates AgentProcess → AgentTaskAssignedEvent emitted → state updated with assigned task | + +### Test Strategy + +- **Mock `AgentPlatform` and `AgentProcess`** for unit tests of the command handler — verify `getId()` is called and the returned ID is used in the event +- **Use existing test infrastructure** (Mockito, AssertJ, TestNG/JUnit 5) — no new test dependencies +- **Follow existing test patterns** from `AgentProcessResultTranslatorTest` and other agentic module tests + +--- + +## Summary of All New and Changed Files + +### New Files + +| Module | Package | File | Phase | +|--------|---------|------|-------| +| api | `o.e.a.aggregate` | `RequestingParty.java` | 1 | +| api | `o.e.a.aggregate` | `AgentRequestingParty.java` | 1 | +| api | `o.e.a.aggregate` | `HumanRequestingParty.java` | 1 | +| api | `o.e.a.aggregate` | `AssignedTask.java` | 4 | +| api | `o.e.a.aggregate` | `TaskAwareState.java` | 4 | +| agentic | `o.e.a.agentic.commands` | `AssignTaskCommand.java` | 2 | +| agentic | `o.e.a.agentic.events` | `AgentTaskAssignedEvent.java` | 2 | +| agentic | `o.e.a.agentic.runtime` | `AssignTaskCommandHandler.java` | 3 | + +### Changed Files + +| Module | File | Phase | Change | +|--------|------|-------|--------| +| agentic | `AgenticAggregateRuntime.java` | 2 | Add `ASSIGN_TASK_COMMAND_TYPE` and `AGENT_TASK_ASSIGNED_TYPE` constants | +| agentic | `KafkaAgenticAggregateRuntime.java` | 3 | Add `onAgentTaskAssigned` and `handleTaskEvent` static methods | +| agentic | `AgenticAggregateRuntimeFactory.java` | 3 | Register built-in command handler and event-sourcing handler | +| agentic | `AkcesAgenticAggregateController.java` | 3 | Register schemas for `AssignTask` and `AgentTaskAssigned` | + +### Test Files + +| Module | Package | File | Phase | +|--------|---------|------|-------| +| api | test | `RequestingPartySerializationTest.java` | 1 | +| api | test | `AssignedTaskTest.java` | 4 | +| api | test | `TaskAwareStateTest.java` | 4 | +| agentic | test | `AssignTaskCommandTest.java` | 2 | +| agentic | test | `AgentTaskAssignedEventTest.java` | 2 | +| agentic | test | `AssignTaskCommandHandlerTest.java` | 3 | +| agentic | test | `AgentTaskAssignedEventSourcingTest.java` | 3 | +| agentic | test | `AssignTaskIntegrationTest.java` | 5 | + +--- + +## Open Questions + +1. **Should `AssignTaskCommand` trigger the actual Embabel `AgentProcess` creation, or should it only record the assignment and defer process creation to a later stage?** The current design creates the process immediately and captures the ID. An alternative is to only emit the event and defer process creation to when the task is actually started (e.g., via a separate `StartTask` command). + +2. **Should there be a `TaskCompleted` / `TaskFailed` event?** The current scope only covers assignment. Task completion tracking would naturally follow and could be planned as a separate feature. + +3. **Should `taskMetadata` be `Map` or `Map`?** `String` values are simpler and schema-safe. `Object` values are more flexible but require careful serialization handling. + +4. **Relationship to `AgentTask` from the incremental tick plan:** The `AssignedTask` record in this plan represents the *persisted* task assignment in aggregate state. The `AgentTask` record from the [agenttasks plan](../agenttasks.md) represents an *in-memory* reference to an active process. These are complementary — `AssignedTask` is the source of truth, `AgentTask` is the runtime handle. + +--- + +## Phase Dependencies + +``` +Phase 1 (RequestingParty) + ↓ +Phase 2 (Command & Event) ← Phase 4 (State Interfaces) + ↓ ↓ +Phase 3 (Handlers & Registration) + ↓ +Phase 5 (Tests) +``` + +Phases 1 and 4 can be implemented in parallel since they are both in the `api` module and have no mutual dependencies. Phases 2 and 3 depend on Phase 1. Phase 3 also depends on Phase 4. Phase 5 depends on all other phases.