Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 10 additions & 11 deletions FRAMEWORK_OVERVIEW.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ Spring AI or Embabel-powered chatbot). It differs from a regular aggregate in th
| Replicas | Multiple (Kafka-partitioned) | Always **1** (singleton) |
| Partition count | Configurable | Fixed at **1** per topic |
| State class | Any `AggregateState` | Must also implement `MemoryAwareState` |
| Built-in commands | None | `StoreMemoryCommand`, `ForgetMemoryCommand` |
| Built-in commands | None | None (memory commands handled by the Embabel layer) |
| Built-in events | None | `MemoryStoredEvent`, `MemoryRevokedEvent` |
| Memory system | N/A | Sliding-window memory (configurable capacity) |
| External events | Via `@EventBridgeHandler` | Listens to **all** partitions directly |
Expand Down Expand Up @@ -151,25 +151,24 @@ public record MyAssistantState(
}
```

### Built-in memory commands and events
### Built-in memory events

The framework automatically handles:
The framework automatically registers and handles the following built-in events. These are
produced internally by the Embabel layer when the agent stores or revokes memories:

| Command / Event | Type | Description |
|-----------------|------|-------------|
| `StoreMemoryCommand` | Command | Asks the aggregate to store a new memory entry |
| `ForgetMemoryCommand` | Command | Asks the aggregate to remove an existing memory entry |
| Event | Type | Description |
|-------|------|-------------|
| `MemoryStoredEvent` | DomainEvent | Emitted when a memory entry is stored |
| `MemoryRevokedEvent` | DomainEvent | Emitted when a memory entry is removed (either by `ForgetMemoryCommand` or sliding-window eviction) |
| `MemoryRevokedEvent` | DomainEvent | Emitted when a memory entry is removed (by the Embabel agent layer) |

You do **not** need to implement command handlers for these; the
You do **not** need to implement event-sourcing handlers for these; the
`KafkaAgenticAggregateRuntime` registers them as built-ins.

### Sliding-window memory

When the number of stored memories exceeds `maxMemories`, the oldest entry is automatically evicted
and a `MemoryRevokedEvent` is emitted. This keeps memory consumption bounded while preserving the
most recent context.
and a `MemoryRevokedEvent` is emitted by the Embabel agent layer. This keeps memory consumption
bounded while preserving the most recent context.

### Listening to external events

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory)
.addConstructorArgReference("agenticServiceConsumerFactory")
.addConstructorArgReference("agenticServiceProducerFactory")
.addConstructorArgReference("agenticServiceAggregateStateRepositoryFactory")
.addConstructorArgValue(agenticInfo.maxMemories())
.setInitMethodName("start")
.setDestroyMethodName("close")
.getBeanDefinition());
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,9 @@
* Domain event produced when a memory entry has been revoked (forgotten) from an
* {@link org.elasticsoftware.akces.aggregate.AgenticAggregate}.
*
* <p>This event is produced either explicitly via a
* {@link org.elasticsoftware.akces.agentic.commands.ForgetMemoryCommand}, or automatically
* by the {@link org.elasticsoftware.akces.agentic.runtime.AgenticAggregatePartition} as part
* of the sliding-window eviction mechanism when
* <p>This event is produced internally by the Embabel layer (via the agent's memory
* management tools) when a memory is explicitly revoked, or as part of the
* sliding-window eviction mechanism when
* {@link org.elasticsoftware.akces.annotations.AgenticAggregateInfo#maxMemories()} is exceeded.
*
* @param agenticAggregateId the unique identifier of the AgenticAggregate instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.WakeupException;
import org.elasticsoftware.akces.agentic.AgenticAggregateRuntime;
import org.elasticsoftware.akces.agentic.commands.ForgetMemoryCommand;
import org.elasticsoftware.akces.aggregate.AgenticAggregateMemory;
import org.elasticsoftware.akces.aggregate.DomainEventType;
import org.elasticsoftware.akces.aggregate.IndexParams;
import org.elasticsoftware.akces.commands.Command;
Expand Down Expand Up @@ -81,11 +79,6 @@
* <li>Always hard-assigns <strong>partition 0</strong> — no Kafka consumer-group rebalancing
* is needed because every agentic aggregate runs as a single-partition service.</li>
* <li>Processes commands in strict FIFO order; there is no prioritisation queue.</li>
* <li>Enforces a <em>sliding-window</em> memory capacity: whenever the number of stored
* memories exceeds {@code maxMemories} after a command is processed, a
* {@code ForgetMemoryCommand} is issued through the normal aggregate command path to
* evict the oldest entry. Memory state is derived directly from the loaded aggregate
* state, avoiding any separate in-memory deque reconstruction after restarts.</li>
* <li>Supports external domain-event subscriptions analogous to
* {@link org.elasticsoftware.akces.kafka.AggregatePartition}.</li>
* </ul>
Expand All @@ -110,7 +103,6 @@ public class AgenticAggregatePartition implements Runnable, AutoCloseable, Comma
private final ProducerFactory<String, ProtocolRecord> producerFactory;
private final AgenticAggregateRuntime runtime;
private final AggregateStateRepository stateRepository;
private final int maxMemories;
private final Collection<DomainEventType<?>> externalDomainEventTypes;
private final AkcesRegistry ackesRegistry;
private final TopicPartition commandPartition;
Expand All @@ -130,7 +122,6 @@ public class AgenticAggregatePartition implements Runnable, AutoCloseable, Comma
* @param producerFactory factory for creating transactional Kafka producers
* @param runtime the agentic aggregate runtime that processes commands and domain events
* @param stateRepositoryFactory factory for creating the aggregate-state repository
* @param maxMemories maximum number of memories allowed before sliding-window eviction
* @param externalDomainEventTypes the collection of external domain event types to subscribe to
* @param ackesRegistry registry for resolving command types, topics, and partitions
*/
Expand All @@ -139,14 +130,12 @@ public AgenticAggregatePartition(
ProducerFactory<String, ProtocolRecord> producerFactory,
AgenticAggregateRuntime runtime,
AggregateStateRepositoryFactory stateRepositoryFactory,
int maxMemories,
Collection<DomainEventType<?>> externalDomainEventTypes,
AkcesRegistry ackesRegistry) {
this.consumerFactory = consumerFactory;
this.producerFactory = producerFactory;
this.runtime = runtime;
this.stateRepository = stateRepositoryFactory.create(runtime, AGENTIC_PARTITION);
this.maxMemories = maxMemories;
this.externalDomainEventTypes = externalDomainEventTypes;
this.ackesRegistry = ackesRegistry;
this.commandPartition = new TopicPartition(runtime.getName() + COMMANDS_SUFFIX, AGENTIC_PARTITION);
Expand Down Expand Up @@ -379,9 +368,8 @@ private void processRecords(ConsumerRecords<String, ProtocolRecord> allRecords)
}

/**
* Handles a single command record by delegating to the {@link AgenticAggregateRuntime},
* then enforcing the sliding-window memory eviction policy. A
* {@link CommandResponseRecord} is sent when {@code replyToTopicPartition} is set.
* Handles a single command record by delegating to the {@link AgenticAggregateRuntime}.
* A {@link CommandResponseRecord} is sent when {@code replyToTopicPartition} is set.
*
* @param commandRecord the incoming command record to process
*/
Expand All @@ -404,11 +392,6 @@ private void handleCommand(CommandRecord commandRecord) {
this::index,
() -> stateRepository.get(commandRecord.aggregateId()));

// Enforce sliding-window memory limit after every command.
// This is a no-op when the count is within the allowed window.
enforceMemorySlidingWindow(commandRecord.aggregateId(),
commandRecord.tenantId(), commandRecord.correlationId());

// Send a response so that AkcesClient.send() can complete its CompletionStage
if (responseRecords != null) {
CommandResponseRecord crr = new CommandResponseRecord(
Expand Down Expand Up @@ -451,65 +434,6 @@ private void handleExternalEvent(DomainEventRecord eventRecord) {
}
}

/**
* Enforces the sliding-window memory limit for the given aggregate by querying the
* current state directly.
*
* <p>If the number of memories in the loaded state exceeds {@code maxMemories}, a
* {@link ForgetMemoryCommand} is issued through the normal aggregate command path for
* each excess entry (oldest first) until the count is within the allowed window.
* This avoids rebuilding a separate in-memory deque after restarts.
*
* <p>A safety bound of {@code 2 * (maxMemories + 1)} iterations guards against a pathological
* case where eviction commands fail to reduce the memory count (e.g., the ForgetMemory
* handler is not registered), preventing an infinite loop.
*
* @param aggregateId the aggregate whose memory count should be checked
* @param tenantId tenant identifier propagated from the triggering command
* @param correlationId correlation identifier propagated from the triggering command
*/
private void enforceMemorySlidingWindow(String aggregateId, String tenantId,
String correlationId) {
try {
// Safety bound: evict at most 2 * (maxMemories + 1) times to prevent an infinite loop
// if the ForgetMemory handler does not actually reduce the memory count.
int maxEvictions = 2 * (maxMemories + 1);
for (int evictions = 0; evictions < maxEvictions; evictions++) {
AggregateStateRecord stateRecord = stateRepository.get(aggregateId);
List<AgenticAggregateMemory> memories = runtime.getMemories(stateRecord);
if (memories.size() <= maxMemories) {
return;
}
// Evict the oldest memory (first in the list, ordered by storedAt)
AgenticAggregateMemory oldest = memories.get(0);
logger.debug("Sliding-window eviction: revoking memory {} for aggregate {}",
oldest.memoryId(), aggregateId);
ForgetMemoryCommand forgetCmd = new ForgetMemoryCommand(
aggregateId, oldest.memoryId(), "sliding window eviction");
CommandRecord evictionCommandRecord = new CommandRecord(
tenantId,
"ForgetMemory",
1,
runtime.serialize(forgetCmd),
PayloadEncoding.JSON,
aggregateId,
correlationId,
null); // no reply-to for system eviction commands
runtime.handleCommandRecord(
evictionCommandRecord,
this::send,
this::index,
() -> stateRepository.get(aggregateId));
}
logger.warn("Sliding-window eviction for aggregate {} did not converge within {} iterations; " +
"check that the ForgetMemory command handler is correctly registered",
aggregateId, maxEvictions);
} catch (IOException e) {
logger.error("Error during sliding-window memory eviction for aggregate {}",
aggregateId, e);
}
}

/**
* Sends a {@link ProtocolRecord} to the appropriate Kafka topic.
*
Expand Down
Loading
Loading