Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
package org.elasticsoftware.akces.agentic;

import com.embabel.agent.core.AgentPlatform;
import org.elasticsoftware.akces.agentic.commands.AssignTaskCommand;
import org.elasticsoftware.akces.agentic.events.AgentTaskAssignedEvent;
import org.elasticsoftware.akces.agentic.events.MemoryRevokedEvent;
import org.elasticsoftware.akces.agentic.events.MemoryStoredEvent;
import org.elasticsoftware.akces.aggregate.AgenticAggregateMemory;
import org.elasticsoftware.akces.aggregate.AggregateRuntime;
import org.elasticsoftware.akces.aggregate.CommandType;
import org.elasticsoftware.akces.aggregate.DomainEventType;
import org.elasticsoftware.akces.aggregate.MemoryAwareState;
import org.elasticsoftware.akces.protocol.AggregateStateRecord;
Expand All @@ -47,12 +50,23 @@
* </ul>
*/
public interface AgenticAggregateRuntime extends AggregateRuntime {

/** Built-in domain event type for {@link MemoryStoredEvent}. */
DomainEventType<MemoryStoredEvent> MEMORY_STORED_TYPE = new DomainEventType<>(
"MemoryStored", 1, MemoryStoredEvent.class, false, false, false, false);

/** Built-in domain event type for {@link MemoryRevokedEvent}. */
DomainEventType<MemoryRevokedEvent> MEMORY_REVOKED_TYPE = new DomainEventType<>(
"MemoryRevoked", 1, MemoryRevokedEvent.class, false, false, false, false);

/** Built-in command type for {@link AssignTaskCommand}. */
CommandType<AssignTaskCommand> ASSIGN_TASK_COMMAND_TYPE = new CommandType<>(
"AssignTask", 1, AssignTaskCommand.class, false, false, false);

/** Built-in domain event type for {@link AgentTaskAssignedEvent}. */
DomainEventType<AgentTaskAssignedEvent> AGENT_TASK_ASSIGNED_TYPE = new DomainEventType<>(
"AgentTaskAssigned", 1, AgentTaskAssignedEvent.class, false, false, false, false);

/**
* Returns the Embabel {@link AgentPlatform} used to create and run agent processes.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import java.util.Collections;
import java.util.List;

import static org.elasticsoftware.akces.agentic.AgenticAggregateRuntime.AGENT_TASK_ASSIGNED_TYPE;
import static org.elasticsoftware.akces.agentic.AgenticAggregateRuntime.ASSIGN_TASK_COMMAND_TYPE;
import static org.elasticsoftware.akces.agentic.AgenticAggregateRuntime.MEMORY_REVOKED_TYPE;
import static org.elasticsoftware.akces.agentic.AgenticAggregateRuntime.MEMORY_STORED_TYPE;

Expand Down Expand Up @@ -267,15 +269,26 @@ private KafkaAggregateRuntime createRuntime(AgenticAggregateInfo agenticInfo,
}
});

// Register built-in event-sourcing handlers for memory management.
// These handle the framework-owned MemoryStored and MemoryRevoked events,
// which every AgenticAggregate state must process via MemoryAwareState.
// Register built-in event-sourcing handlers for memory management and task assignment.
// These handle the framework-owned MemoryStored, MemoryRevoked, and AgentTaskAssigned
// events, using a unified single-dispatch handler.
runtimeBuilder
.addEventSourcingHandler(MEMORY_STORED_TYPE, KafkaAgenticAggregateRuntime::handleMemoryEvent)
.addEventSourcingHandler(MEMORY_STORED_TYPE, KafkaAgenticAggregateRuntime::handleBuiltInEvent)
.addDomainEvent(MEMORY_STORED_TYPE);
runtimeBuilder
.addEventSourcingHandler(MEMORY_REVOKED_TYPE, KafkaAgenticAggregateRuntime::handleMemoryEvent)
.addEventSourcingHandler(MEMORY_REVOKED_TYPE, KafkaAgenticAggregateRuntime::handleBuiltInEvent)
.addDomainEvent(MEMORY_REVOKED_TYPE);
runtimeBuilder
.addEventSourcingHandler(AGENT_TASK_ASSIGNED_TYPE, KafkaAgenticAggregateRuntime::handleBuiltInEvent)
.addDomainEvent(AGENT_TASK_ASSIGNED_TYPE);

// Register built-in AssignTask command handler using a single-dispatch handler on
// KafkaAgenticAggregateRuntime. The handler resolves the agent from the platform,
// creates an AgentProcess, and emits AgentTaskAssignedEvent.
runtimeBuilder
.addCommandHandler(ASSIGN_TASK_COMMAND_TYPE,
KafkaAgenticAggregateRuntime.builtInCommandHandler(agentPlatform, agenticInfo.value()))
.addCommand(ASSIGN_TASK_COMMAND_TYPE);

// Collect agent-produced error types for registration and inclusion in adapters.
List<DomainEventType<?>> agentProducedErrorTypes =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2022 - 2026 The Original Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.elasticsoftware.akces.agentic.commands;

import jakarta.annotation.Nonnull;
import jakarta.validation.constraints.NotNull;
import org.elasticsoftware.akces.aggregate.RequestingParty;
import org.elasticsoftware.akces.annotations.AggregateIdentifier;
import org.elasticsoftware.akces.annotations.CommandInfo;
import org.elasticsoftware.akces.commands.Command;

import java.util.Map;

/**
* Built-in framework command that assigns a task to an
* {@link org.elasticsoftware.akces.aggregate.AgenticAggregate} for AI-assisted processing.
*
* <p>When processed by the built-in command handler, this command triggers the creation
* of an Embabel {@code AgentProcess} and emits an
* {@link org.elasticsoftware.akces.agentic.events.AgentTaskAssignedEvent} containing the
* process identifier.
*
* <p>This is the first framework-owned {@link Command} implementation. Unlike
* application-defined commands, it is automatically registered for every
* {@link org.elasticsoftware.akces.aggregate.AgenticAggregate}.
*
* @param agenticAggregateId the identifier of the target agentic aggregate
* @param taskDescription a human-readable description of what the agent should do
* @param requestingParty the entity (agent or human) requesting this task
* @param taskMetadata optional key-value metadata for additional context
* (e.g. correlationId, priority, deadline); may be {@code null}
*/
@CommandInfo(type = "AssignTask", version = 1,
description = "Assigns a task to an agentic aggregate for AI-assisted processing")
public record AssignTaskCommand(
@AggregateIdentifier @NotNull String agenticAggregateId,
@NotNull String taskDescription,
@NotNull RequestingParty requestingParty,
Map<String, String> taskMetadata
) implements Command {

/**
* {@inheritDoc}
*/
@Override
@Nonnull
public String getAggregateId() {
return agenticAggregateId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright 2022 - 2026 The Original Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.elasticsoftware.akces.agentic.events;

import jakarta.annotation.Nonnull;
import jakarta.validation.constraints.NotNull;
import org.elasticsoftware.akces.aggregate.RequestingParty;
import org.elasticsoftware.akces.annotations.AggregateIdentifier;
import org.elasticsoftware.akces.annotations.DomainEventInfo;
import org.elasticsoftware.akces.events.DomainEvent;

import java.time.Instant;
import java.util.Map;

/**
* Domain event emitted when a task has been successfully assigned to an
* {@link org.elasticsoftware.akces.aggregate.AgenticAggregate} and an Embabel
* {@code AgentProcess} has been created.
*
* <p>This event is produced by the built-in command handler when processing an
* {@link org.elasticsoftware.akces.agentic.commands.AssignTaskCommand}. It contains
* the Embabel {@code AgentProcess} identifier, which links the Akces domain to the
* Embabel runtime.
*
* <p>The built-in event-sourcing handler processes this event by updating the aggregate
* state's {@link org.elasticsoftware.akces.aggregate.TaskAwareState} with a new
* {@link org.elasticsoftware.akces.aggregate.AssignedTask}.
*
* @param agenticAggregateId the identifier of the agentic aggregate
* @param agentProcessId the Embabel {@code AgentProcess.getId()} value
* @param taskDescription the task description from the originating command
* @param requestingParty the entity that requested the task assignment
* @param taskMetadata optional key-value metadata from the originating command;
* may be {@code null}
* @param assignedAt the instant at which the task was assigned
*/
@DomainEventInfo(type = "AgentTaskAssigned", version = 1,
description = "Emitted when a task has been assigned to an agentic aggregate and an AgentProcess has been created")
public record AgentTaskAssignedEvent(
@AggregateIdentifier String agenticAggregateId,
@NotNull String agentProcessId,
@NotNull String taskDescription,
@NotNull RequestingParty requestingParty,
Map<String, String> taskMetadata,
@NotNull Instant assignedAt
) implements DomainEvent {

/**
* {@inheritDoc}
*/
@Override
@Nonnull
public String getAggregateId() {
return agenticAggregateId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,14 @@ public class AkcesAgenticAggregateController extends Thread
@SuppressWarnings("unchecked")
private static final List<DomainEventType<?>> BUILTIN_EVENT_TYPES = List.of(
AgenticAggregateRuntime.MEMORY_STORED_TYPE,
AgenticAggregateRuntime.MEMORY_REVOKED_TYPE
AgenticAggregateRuntime.MEMORY_REVOKED_TYPE,
AgenticAggregateRuntime.AGENT_TASK_ASSIGNED_TYPE
);

/** Built-in command types provided by the agentic framework. */
@SuppressWarnings("unchecked")
private static final List<CommandType<?>> BUILTIN_COMMAND_TYPES = List.of(
AgenticAggregateRuntime.ASSIGN_TASK_COMMAND_TYPE
);

private final ConsumerFactory<String, SchemaRecord> schemaConsumerFactory;
Expand Down Expand Up @@ -153,6 +160,10 @@ public class AkcesAgenticAggregateController extends Thread
* subclasses or future extensions that may need environment properties. */
private Environment environment;

/** Whether to force-register built-in schemas when they are incompatible with existing ones.
* Configured via the {@code akces.aggregate.schemas.forceRegister} environment property. */
private boolean forceRegisterOnIncompatible = false;

/**
* Creates a new {@code AkcesAgenticAggregateController}.
*
Expand Down Expand Up @@ -268,10 +279,16 @@ public void run() {
}

/**
* Registers the built-in {@link org.elasticsoftware.akces.agentic.events.MemoryStoredEvent}
* and {@link org.elasticsoftware.akces.agentic.events.MemoryRevokedEvent} schemas with the
* schema registry. These events are produced internally by the Embabel layer when the
* agent stores or revokes memories.
* Registers the built-in agentic schemas with the schema registry: the memory events
* ({@link org.elasticsoftware.akces.agentic.events.MemoryStoredEvent},
* {@link org.elasticsoftware.akces.agentic.events.MemoryRevokedEvent}),
* the {@link org.elasticsoftware.akces.agentic.events.AgentTaskAssignedEvent}, and
* the {@link org.elasticsoftware.akces.agentic.commands.AssignTaskCommand}.
*
* <p>If a schema is incompatible with the existing registered schema, the behaviour
* depends on the {@code akces.aggregate.schemas.forceRegister} environment property:
* when {@code true}, the schema is force-registered; otherwise the incompatibility is
* propagated as a fatal error.
*/
private void registerBuiltinSchemas() {
logger.info("Registering built-in agentic schemas for {}Aggregate",
Expand All @@ -280,21 +297,50 @@ private void registerBuiltinSchemas() {
try {
aggregateRuntime.registerAndValidate(eventType, schemaRegistry);
} catch (IncompatibleSchemaException e) {
logger.warn("Built-in event schema {} is incompatible — attempting force-register",
eventType.typeName(), e);
try {
aggregateRuntime.registerAndValidate(eventType, schemaRegistry, true);
} catch (Exception ex) {
logger.error("Failed to force-register built-in event schema {}",
eventType.typeName(), ex);
throw new RuntimeException("Failed to register built-in event schema: "
+ eventType.typeName(), ex);
if (forceRegisterOnIncompatible) {
logger.warn("Built-in event schema {} is incompatible — force-registering (forceRegister=true)",
eventType.typeName(), e);
try {
aggregateRuntime.registerAndValidate(eventType, schemaRegistry, true);
} catch (Exception ex) {
logger.error("Failed to force-register built-in event schema {}",
eventType.typeName(), ex);
throw new RuntimeException("Failed to register built-in event schema: "
+ eventType.typeName(), ex);
}
} else {
throw new RuntimeException("Built-in event schema " + eventType.typeName()
+ " is incompatible. Set akces.aggregate.schemas.forceRegister=true to override.", e);
}
} catch (Exception e) {
throw new RuntimeException("Failed to register built-in event schema: "
+ eventType.typeName(), e);
}
}
for (CommandType<?> commandType : BUILTIN_COMMAND_TYPES) {
try {
aggregateRuntime.registerAndValidate(commandType, schemaRegistry);
} catch (IncompatibleSchemaException e) {
if (forceRegisterOnIncompatible) {
logger.warn("Built-in command schema {} is incompatible — force-registering (forceRegister=true)",
commandType.typeName(), e);
try {
aggregateRuntime.registerAndValidate(commandType, schemaRegistry, true);
} catch (Exception ex) {
logger.error("Failed to force-register built-in command schema {}",
commandType.typeName(), ex);
throw new RuntimeException("Failed to register built-in command schema: "
+ commandType.typeName(), ex);
}
} else {
throw new RuntimeException("Built-in command schema " + commandType.typeName()
+ " is incompatible. Set akces.aggregate.schemas.forceRegister=true to override.", e);
}
} catch (Exception e) {
throw new RuntimeException("Failed to register built-in command schema: "
+ commandType.typeName(), e);
}
}
}

/**
Expand Down Expand Up @@ -521,6 +567,7 @@ public void setApplicationContext(ApplicationContext ctx) {
@Override
public void setEnvironment(Environment env) {
this.environment = env;
this.forceRegisterOnIncompatible = env.getProperty("akces.aggregate.schemas.forceRegister", Boolean.class, false);
}

// -------------------------------------------------------------------------
Expand Down
Loading
Loading