diff --git a/FRAMEWORK_OVERVIEW.md b/FRAMEWORK_OVERVIEW.md index 53872648..00b66582 100644 --- a/FRAMEWORK_OVERVIEW.md +++ b/FRAMEWORK_OVERVIEW.md @@ -84,7 +84,7 @@ Spring AI or Embabel-powered chatbot). It differs from a regular aggregate in th | Replicas | Multiple (Kafka-partitioned) | Always **1** (singleton) | | Partition count | Configurable | Fixed at **1** per topic | | State class | Any `AggregateState` | Must also implement `MemoryAwareState` | -| Built-in commands | None | `StoreMemoryCommand`, `ForgetMemoryCommand` | +| Built-in commands | None | None (memory commands handled by the Embabel layer) | | Built-in events | None | `MemoryStoredEvent`, `MemoryRevokedEvent` | | Memory system | N/A | Sliding-window memory (configurable capacity) | | External events | Via `@EventBridgeHandler` | Listens to **all** partitions directly | @@ -151,25 +151,24 @@ public record MyAssistantState( } ``` -### Built-in memory commands and events +### Built-in memory events -The framework automatically handles: +The framework automatically registers and handles the following built-in events. These are +produced internally by the Embabel layer when the agent stores or revokes memories: -| Command / Event | Type | Description | -|-----------------|------|-------------| -| `StoreMemoryCommand` | Command | Asks the aggregate to store a new memory entry | -| `ForgetMemoryCommand` | Command | Asks the aggregate to remove an existing memory entry | +| Event | Type | Description | +|-------|------|-------------| | `MemoryStoredEvent` | DomainEvent | Emitted when a memory entry is stored | -| `MemoryRevokedEvent` | DomainEvent | Emitted when a memory entry is removed (either by `ForgetMemoryCommand` or sliding-window eviction) | +| `MemoryRevokedEvent` | DomainEvent | Emitted when a memory entry is removed (by the Embabel agent layer) | -You do **not** need to implement command handlers for these; the +You do **not** need to implement event-sourcing handlers for these; the `KafkaAgenticAggregateRuntime` registers them as built-ins. ### Sliding-window memory When the number of stored memories exceeds `maxMemories`, the oldest entry is automatically evicted -and a `MemoryRevokedEvent` is emitted. This keeps memory consumption bounded while preserving the -most recent context. +and a `MemoryRevokedEvent` is emitted by the Embabel agent layer. This keeps memory consumption +bounded while preserving the most recent context. ### Listening to external events diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateBeanFactoryPostProcessor.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateBeanFactoryPostProcessor.java index dd5ecb0e..77b92201 100644 --- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateBeanFactoryPostProcessor.java +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/beans/AgenticAggregateBeanFactoryPostProcessor.java @@ -184,7 +184,6 @@ public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) .addConstructorArgReference("agenticServiceConsumerFactory") .addConstructorArgReference("agenticServiceProducerFactory") .addConstructorArgReference("agenticServiceAggregateStateRepositoryFactory") - .addConstructorArgValue(agenticInfo.maxMemories()) .setInitMethodName("start") .setDestroyMethodName("close") .getBeanDefinition()); diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/commands/ForgetMemoryCommand.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/commands/ForgetMemoryCommand.java deleted file mode 100644 index a157385a..00000000 --- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/commands/ForgetMemoryCommand.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright 2022 - 2026 The Original Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.elasticsoftware.akces.agentic.commands; - -import jakarta.annotation.Nonnull; -import org.elasticsoftware.akces.annotations.AggregateIdentifier; -import org.elasticsoftware.akces.annotations.CommandInfo; -import org.elasticsoftware.akces.commands.Command; - -/** - * Command to forget (revoke) a previously stored memory entry for an - * {@link org.elasticsoftware.akces.aggregate.AgenticAggregate}. - * - *

When handled, this command produces a - * {@link org.elasticsoftware.akces.agentic.events.MemoryRevokedEvent} removing the memory - * identified by {@code memoryId} from the aggregate's memory store. - * - * @param agenticAggregateId the unique identifier of the target AgenticAggregate instance - * @param memoryId the UUID of the memory entry to forget - * @param reason the reason for forgetting this memory entry - */ -@CommandInfo(type = "ForgetMemory", version = 1) -public record ForgetMemoryCommand( - @AggregateIdentifier String agenticAggregateId, - String memoryId, - String reason -) implements Command { - - /** - * Returns the aggregate identifier for routing. - * - * @return the {@code agenticAggregateId} - */ - @Override - @Nonnull - public String getAggregateId() { - return agenticAggregateId; - } -} diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/commands/StoreMemoryCommand.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/commands/StoreMemoryCommand.java deleted file mode 100644 index b078cae5..00000000 --- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/commands/StoreMemoryCommand.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright 2022 - 2026 The Original Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.elasticsoftware.akces.agentic.commands; - -import jakarta.annotation.Nonnull; -import org.elasticsoftware.akces.annotations.AggregateIdentifier; -import org.elasticsoftware.akces.annotations.CommandInfo; -import org.elasticsoftware.akces.commands.Command; - -/** - * Command to store a new memory entry for an {@link org.elasticsoftware.akces.aggregate.AgenticAggregate}. - * - *

When handled, this command produces a - * {@link org.elasticsoftware.akces.agentic.events.MemoryStoredEvent} and, if the - * {@link org.elasticsoftware.akces.annotations.AgenticAggregateInfo#maxMemories()} sliding-window - * capacity is reached, a subsequent - * {@link org.elasticsoftware.akces.agentic.events.MemoryRevokedEvent} is automatically emitted - * to evict the oldest memory entry. - * - * @param agenticAggregateId the unique identifier of the target AgenticAggregate instance - * @param subject a short (1–2 word) topic label for the memory being stored - * @param fact the fact to remember (max 200 characters) - * @param citations the source of the fact (e.g., a file path and line number) - * @param reason the reason for storing this fact - */ -@CommandInfo(type = "StoreMemory", version = 1) -public record StoreMemoryCommand( - @AggregateIdentifier String agenticAggregateId, - String subject, - String fact, - String citations, - String reason -) implements Command { - - /** - * Returns the aggregate identifier for routing. - * - * @return the {@code agenticAggregateId} - */ - @Override - @Nonnull - public String getAggregateId() { - return agenticAggregateId; - } -} diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/events/MemoryRevokedEvent.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/events/MemoryRevokedEvent.java index f3f3c03b..c3599c94 100644 --- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/events/MemoryRevokedEvent.java +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/events/MemoryRevokedEvent.java @@ -28,10 +28,9 @@ * Domain event produced when a memory entry has been revoked (forgotten) from an * {@link org.elasticsoftware.akces.aggregate.AgenticAggregate}. * - *

This event is produced either explicitly via a - * {@link org.elasticsoftware.akces.agentic.commands.ForgetMemoryCommand}, or automatically - * by the {@link org.elasticsoftware.akces.agentic.runtime.AgenticAggregatePartition} as part - * of the sliding-window eviction mechanism when + *

This event is produced internally by the Embabel layer (via the agent's memory + * management tools) when a memory is explicitly revoked, or as part of the + * sliding-window eviction mechanism when * {@link org.elasticsoftware.akces.annotations.AgenticAggregateInfo#maxMemories()} is exceeded. * * @param agenticAggregateId the unique identifier of the AgenticAggregate instance diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticAggregatePartition.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticAggregatePartition.java index aff73cce..7a70f9ab 100644 --- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticAggregatePartition.java +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AgenticAggregatePartition.java @@ -30,8 +30,6 @@ import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.WakeupException; import org.elasticsoftware.akces.agentic.AgenticAggregateRuntime; -import org.elasticsoftware.akces.agentic.commands.ForgetMemoryCommand; -import org.elasticsoftware.akces.aggregate.AgenticAggregateMemory; import org.elasticsoftware.akces.aggregate.DomainEventType; import org.elasticsoftware.akces.aggregate.IndexParams; import org.elasticsoftware.akces.commands.Command; @@ -81,11 +79,6 @@ *

  • Always hard-assigns partition 0 — no Kafka consumer-group rebalancing * is needed because every agentic aggregate runs as a single-partition service.
  • *
  • Processes commands in strict FIFO order; there is no prioritisation queue.
  • - *
  • Enforces a sliding-window memory capacity: whenever the number of stored - * memories exceeds {@code maxMemories} after a command is processed, a - * {@code ForgetMemoryCommand} is issued through the normal aggregate command path to - * evict the oldest entry. Memory state is derived directly from the loaded aggregate - * state, avoiding any separate in-memory deque reconstruction after restarts.
  • *
  • Supports external domain-event subscriptions analogous to * {@link org.elasticsoftware.akces.kafka.AggregatePartition}.
  • * @@ -110,7 +103,6 @@ public class AgenticAggregatePartition implements Runnable, AutoCloseable, Comma private final ProducerFactory producerFactory; private final AgenticAggregateRuntime runtime; private final AggregateStateRepository stateRepository; - private final int maxMemories; private final Collection> externalDomainEventTypes; private final AkcesRegistry ackesRegistry; private final TopicPartition commandPartition; @@ -130,7 +122,6 @@ public class AgenticAggregatePartition implements Runnable, AutoCloseable, Comma * @param producerFactory factory for creating transactional Kafka producers * @param runtime the agentic aggregate runtime that processes commands and domain events * @param stateRepositoryFactory factory for creating the aggregate-state repository - * @param maxMemories maximum number of memories allowed before sliding-window eviction * @param externalDomainEventTypes the collection of external domain event types to subscribe to * @param ackesRegistry registry for resolving command types, topics, and partitions */ @@ -139,14 +130,12 @@ public AgenticAggregatePartition( ProducerFactory producerFactory, AgenticAggregateRuntime runtime, AggregateStateRepositoryFactory stateRepositoryFactory, - int maxMemories, Collection> externalDomainEventTypes, AkcesRegistry ackesRegistry) { this.consumerFactory = consumerFactory; this.producerFactory = producerFactory; this.runtime = runtime; this.stateRepository = stateRepositoryFactory.create(runtime, AGENTIC_PARTITION); - this.maxMemories = maxMemories; this.externalDomainEventTypes = externalDomainEventTypes; this.ackesRegistry = ackesRegistry; this.commandPartition = new TopicPartition(runtime.getName() + COMMANDS_SUFFIX, AGENTIC_PARTITION); @@ -379,9 +368,8 @@ private void processRecords(ConsumerRecords allRecords) } /** - * Handles a single command record by delegating to the {@link AgenticAggregateRuntime}, - * then enforcing the sliding-window memory eviction policy. A - * {@link CommandResponseRecord} is sent when {@code replyToTopicPartition} is set. + * Handles a single command record by delegating to the {@link AgenticAggregateRuntime}. + * A {@link CommandResponseRecord} is sent when {@code replyToTopicPartition} is set. * * @param commandRecord the incoming command record to process */ @@ -404,11 +392,6 @@ private void handleCommand(CommandRecord commandRecord) { this::index, () -> stateRepository.get(commandRecord.aggregateId())); - // Enforce sliding-window memory limit after every command. - // This is a no-op when the count is within the allowed window. - enforceMemorySlidingWindow(commandRecord.aggregateId(), - commandRecord.tenantId(), commandRecord.correlationId()); - // Send a response so that AkcesClient.send() can complete its CompletionStage if (responseRecords != null) { CommandResponseRecord crr = new CommandResponseRecord( @@ -451,65 +434,6 @@ private void handleExternalEvent(DomainEventRecord eventRecord) { } } - /** - * Enforces the sliding-window memory limit for the given aggregate by querying the - * current state directly. - * - *

    If the number of memories in the loaded state exceeds {@code maxMemories}, a - * {@link ForgetMemoryCommand} is issued through the normal aggregate command path for - * each excess entry (oldest first) until the count is within the allowed window. - * This avoids rebuilding a separate in-memory deque after restarts. - * - *

    A safety bound of {@code 2 * (maxMemories + 1)} iterations guards against a pathological - * case where eviction commands fail to reduce the memory count (e.g., the ForgetMemory - * handler is not registered), preventing an infinite loop. - * - * @param aggregateId the aggregate whose memory count should be checked - * @param tenantId tenant identifier propagated from the triggering command - * @param correlationId correlation identifier propagated from the triggering command - */ - private void enforceMemorySlidingWindow(String aggregateId, String tenantId, - String correlationId) { - try { - // Safety bound: evict at most 2 * (maxMemories + 1) times to prevent an infinite loop - // if the ForgetMemory handler does not actually reduce the memory count. - int maxEvictions = 2 * (maxMemories + 1); - for (int evictions = 0; evictions < maxEvictions; evictions++) { - AggregateStateRecord stateRecord = stateRepository.get(aggregateId); - List memories = runtime.getMemories(stateRecord); - if (memories.size() <= maxMemories) { - return; - } - // Evict the oldest memory (first in the list, ordered by storedAt) - AgenticAggregateMemory oldest = memories.get(0); - logger.debug("Sliding-window eviction: revoking memory {} for aggregate {}", - oldest.memoryId(), aggregateId); - ForgetMemoryCommand forgetCmd = new ForgetMemoryCommand( - aggregateId, oldest.memoryId(), "sliding window eviction"); - CommandRecord evictionCommandRecord = new CommandRecord( - tenantId, - "ForgetMemory", - 1, - runtime.serialize(forgetCmd), - PayloadEncoding.JSON, - aggregateId, - correlationId, - null); // no reply-to for system eviction commands - runtime.handleCommandRecord( - evictionCommandRecord, - this::send, - this::index, - () -> stateRepository.get(aggregateId)); - } - logger.warn("Sliding-window eviction for aggregate {} did not converge within {} iterations; " + - "check that the ForgetMemory command handler is correctly registered", - aggregateId, maxEvictions); - } catch (IOException e) { - logger.error("Error during sliding-window memory eviction for aggregate {}", - aggregateId, e); - } - } - /** * Sends a {@link ProtocolRecord} to the appropriate Kafka topic. * diff --git a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AkcesAgenticAggregateController.java b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AkcesAgenticAggregateController.java index 8b3ec603..6dfe17a2 100644 --- a/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AkcesAgenticAggregateController.java +++ b/main/agentic/src/main/java/org/elasticsoftware/akces/agentic/runtime/AkcesAgenticAggregateController.java @@ -26,10 +26,6 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; import org.elasticsoftware.akces.agentic.AgenticAggregateRuntime; -import org.elasticsoftware.akces.agentic.commands.ForgetMemoryCommand; -import org.elasticsoftware.akces.agentic.commands.StoreMemoryCommand; -import org.elasticsoftware.akces.agentic.events.MemoryRevokedEvent; -import org.elasticsoftware.akces.agentic.events.MemoryStoredEvent; import org.elasticsoftware.akces.aggregate.CommandType; import org.elasticsoftware.akces.aggregate.DomainEventType; import org.elasticsoftware.akces.control.AggregateServiceCommandType; @@ -82,9 +78,9 @@ *

    Responsibilities: *

      *
    1. Initialise a {@link SchemaRegistry} backed by Kafka.
    2. - *
    3. Register and validate schemas for the built-in memory commands - * ({@link StoreMemoryCommand}, {@link ForgetMemoryCommand}) and events - * ({@link MemoryStoredEvent}, {@link MemoryRevokedEvent}).
    4. + *
    5. Register and validate schemas for the built-in memory events + * ({@link org.elasticsoftware.akces.agentic.events.MemoryStoredEvent}, + * {@link org.elasticsoftware.akces.agentic.events.MemoryRevokedEvent}).
    6. *
    7. Register and validate schemas for all commands and events declared by the * wrapped {@link AgenticAggregateRuntime}.
    8. *
    9. Publish an {@link AggregateServiceRecord} to the {@code Akces-Control} topic so @@ -119,13 +115,6 @@ public class AkcesAgenticAggregateController extends Thread /** Poll timeout (ms) used when reading the {@code Akces-Control} topic during initialisation. */ private static final long CONTROL_TOPIC_POLL_TIMEOUT_MS = 100L; - /** Built-in command types provided by the agentic framework. */ - @SuppressWarnings("unchecked") - private static final List> BUILTIN_COMMAND_TYPES = List.of( - new CommandType<>("StoreMemory", 1, StoreMemoryCommand.class, false, false, false), - new CommandType<>("ForgetMemory", 1, ForgetMemoryCommand.class, false, false, false) - ); - /** Built-in domain-event types provided by the agentic framework. */ @SuppressWarnings("unchecked") private static final List> BUILTIN_EVENT_TYPES = List.of( @@ -142,7 +131,6 @@ public class AkcesAgenticAggregateController extends Thread private final ConsumerFactory consumerFactory; private final ProducerFactory producerFactory; private final AggregateStateRepositoryFactory stateRepositoryFactory; - private final int maxMemories; private final ExecutorService partitionExecutor; private final CountDownLatch shutdownLatch = new CountDownLatch(1); private final CountDownLatch doneLatch = new CountDownLatch(1); @@ -174,7 +162,6 @@ public class AkcesAgenticAggregateController extends Thread * @param consumerFactory factory for creating the Kafka protocol consumer * @param producerFactory factory for creating the Kafka protocol producer * @param stateRepositoryFactory factory for creating the aggregate-state repository - * @param maxMemories maximum number of memories per aggregate before sliding-window eviction */ public AkcesAgenticAggregateController( ConsumerFactory schemaConsumerFactory, @@ -185,8 +172,7 @@ public AkcesAgenticAggregateController( AgenticAggregateRuntime aggregateRuntime, ConsumerFactory consumerFactory, ProducerFactory producerFactory, - AggregateStateRepositoryFactory stateRepositoryFactory, - int maxMemories) { + AggregateStateRepositoryFactory stateRepositoryFactory) { super(aggregateRuntime.getName() + "-AkcesAgenticController"); this.schemaConsumerFactory = schemaConsumerFactory; this.schemaProducerFactory = schemaProducerFactory; @@ -197,7 +183,6 @@ public AkcesAgenticAggregateController( this.consumerFactory = consumerFactory; this.producerFactory = producerFactory; this.stateRepositoryFactory = stateRepositoryFactory; - this.maxMemories = maxMemories; this.partitionExecutor = Executors.newSingleThreadExecutor( r -> new Thread(r, aggregateRuntime.getName() + "-AgenticPartitionThread")); } @@ -251,7 +236,6 @@ public void run() { producerFactory, aggregateRuntime, stateRepositoryFactory, - maxMemories, aggregateRuntime.getExternalDomainEventTypes(), this); this.partition = localPartition; @@ -281,32 +265,14 @@ public void run() { } /** - * Registers the built-in {@link StoreMemoryCommand}, {@link ForgetMemoryCommand}, - * {@link MemoryStoredEvent}, and {@link MemoryRevokedEvent} schemas with the - * schema registry. + * Registers the built-in {@link org.elasticsoftware.akces.agentic.events.MemoryStoredEvent} + * and {@link org.elasticsoftware.akces.agentic.events.MemoryRevokedEvent} schemas with the + * schema registry. These events are produced internally by the Embabel layer when the + * agent stores or revokes memories. */ private void registerBuiltinSchemas() { logger.info("Registering built-in agentic schemas for {}Aggregate", aggregateRuntime.getName()); - for (CommandType commandType : BUILTIN_COMMAND_TYPES) { - try { - aggregateRuntime.registerAndValidate(commandType, schemaRegistry); - } catch (IncompatibleSchemaException e) { - logger.warn("Built-in command schema {} is incompatible — attempting force-register", - commandType.typeName(), e); - try { - aggregateRuntime.registerAndValidate(commandType, schemaRegistry, true); - } catch (Exception ex) { - logger.error("Failed to force-register built-in command schema {}", - commandType.typeName(), ex); - throw new RuntimeException("Failed to register built-in command schema: " - + commandType.typeName(), ex); - } - } catch (Exception e) { - throw new RuntimeException("Failed to register built-in command schema: " - + commandType.typeName(), e); - } - } for (DomainEventType eventType : BUILTIN_EVENT_TYPES) { try { aggregateRuntime.registerAndValidate(eventType, schemaRegistry); @@ -416,22 +382,14 @@ private void loadAggregateServicesFromControlTopic() { /** * Publishes an {@link AggregateServiceRecord} to the {@code Akces-Control} topic so that * clients can discover this service. - * - *

      The record includes both the built-in agentic command types - * ({@link StoreMemoryCommand}, {@link ForgetMemoryCommand}) and any additional command - * types declared by the wrapped aggregate. */ private void publishControlRecord() { String transactionalId = aggregateRuntime.getName() + "-" + HostUtils.getHostName() + "-agentic-control"; try (Producer controlProducer = controlProducerFactory.createProducer(transactionalId)) { - // Combine built-in + aggregate command types for the service record + // Collect aggregate command types for the service record List allCommands = new ArrayList<>(); - BUILTIN_COMMAND_TYPES.forEach(ct -> - allCommands.add(new AggregateServiceCommandType( - ct.typeName(), ct.version(), ct.create(), - "commands." + ct.typeName()))); aggregateRuntime.getLocalCommandTypes().forEach(ct -> allCommands.add(new AggregateServiceCommandType( ct.typeName(), ct.version(), ct.create(), @@ -564,8 +522,7 @@ public void setEnvironment(Environment env) { /** * Resolves the {@link CommandType} for the given command class. * - *

      Built-in agentic commands ({@link StoreMemoryCommand}, {@link ForgetMemoryCommand}) - * are returned directly. All other commands are resolved from the underlying + *

      Command types are resolved from the underlying * {@link AgenticAggregateRuntime}'s local command types. * * @param commandClass the command class to resolve @@ -575,13 +532,6 @@ public void setEnvironment(Environment env) { @Override @Nonnull public CommandType resolveType(@Nonnull Class commandClass) { - // Check built-in types first - for (CommandType builtIn : BUILTIN_COMMAND_TYPES) { - if (builtIn.typeClass().equals(commandClass)) { - return builtIn; - } - } - // Delegate to aggregate runtime var commandInfo = commandClass.getAnnotation( org.elasticsoftware.akces.annotations.CommandInfo.class); if (commandInfo != null) { diff --git a/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/MemorySlidingWindowTest.java b/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/MemorySlidingWindowTest.java index 0dfd4fb5..ed9e5256 100644 --- a/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/MemorySlidingWindowTest.java +++ b/main/agentic/src/test/java/org/elasticsoftware/akces/agentic/runtime/MemorySlidingWindowTest.java @@ -32,15 +32,13 @@ import static org.assertj.core.api.Assertions.assertThat; /** - * Tests the sliding-window memory eviction logic that occurs when the memory list exceeds - * the configured {@code maxMemories} limit. The eviction algorithm is exercised by - * simulating the sequence of events that would be produced by - * {@link AgenticAggregatePartition#enforceMemorySlidingWindow}. + * Tests the sliding-window memory eviction logic in the state transition layer. + * The eviction algorithm is exercised by simulating the sequence of events that are + * produced by the Embabel layer during the agent's memory management process. * - *

      The partition enforces the limit by issuing {@link org.elasticsoftware.akces.agentic.commands.ForgetMemoryCommand} - * for the oldest entry (first in the list) until the count drops to the allowed window. - * Each {@code ForgetMemoryCommand} produces a {@link MemoryRevokedEvent} that updates - * the state through the built-in event-sourcing handler in + *

      Memory revocation is handled by the Embabel agent (via its memory management tools), + * which produces {@link MemoryRevokedEvent} directly. Each such event updates the + * state through the built-in event-sourcing handler in * {@link KafkaAgenticAggregateRuntime}. */ class MemorySlidingWindowTest { @@ -184,13 +182,13 @@ void multipleOverLimitShouldEvictMultipleOldestMemories() { } // ------------------------------------------------------------------------- - // ForgetMemoryCommand removes specific memory by ID + // Revoke memory by ID removes specific memory // ------------------------------------------------------------------------- @Test void forgetMemoryByIdShouldRemoveSpecificMemory() { AggregateState state = storeMemories(5); - // Simulate ForgetMemoryCommand for m3 + // Simulate revoking memory m3 via MemoryRevokedEvent (produced by Embabel layer) MemoryRevokedEvent revokeEvent = new MemoryRevokedEvent( "agg-1", "m3", "no longer relevant", Instant.now()); state = KafkaAgenticAggregateRuntime.onMemoryRevoked(revokeEvent, state);