Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.elasticsoftware.akces.agentic.AgenticAggregateRuntime;
import org.elasticsoftware.akces.aggregate.CommandType;
import org.elasticsoftware.akces.aggregate.DomainEventType;
import org.elasticsoftware.akces.annotations.CommandInfo;
import org.elasticsoftware.akces.annotations.DomainEventInfo;
import org.elasticsoftware.akces.control.AggregateServiceCommandType;
import org.elasticsoftware.akces.control.AggregateServiceDomainEventType;
import org.elasticsoftware.akces.control.AggregateServiceRecord;
Expand Down Expand Up @@ -68,6 +70,7 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.elasticsoftware.akces.kafka.KafkaAggregateRuntime.normalizeDescription;
import static org.elasticsoftware.akces.kafka.PartitionUtils.COMMANDS_SUFFIX;
import static org.elasticsoftware.akces.kafka.PartitionUtils.DOMAINEVENTS_SUFFIX;

Expand Down Expand Up @@ -393,26 +396,30 @@ private void publishControlRecord() {
aggregateRuntime.getLocalCommandTypes().forEach(ct ->
allCommands.add(new AggregateServiceCommandType(
ct.typeName(), ct.version(), ct.create(),
"commands." + ct.typeName())));
"commands." + ct.typeName(),
normalizeDescription(ct.typeClass().getAnnotation(CommandInfo.class).description()))));

// Combine built-in + aggregate event types
List<AggregateServiceDomainEventType> allEvents = new ArrayList<>();
BUILTIN_EVENT_TYPES.forEach(et ->
allEvents.add(new AggregateServiceDomainEventType(
et.typeName(), et.version(), et.create(), et.external(),
"domainevents." + et.typeName())));
"domainevents." + et.typeName(),
normalizeDescription(et.typeClass().getAnnotation(DomainEventInfo.class).description()))));
aggregateRuntime.getProducedDomainEventTypes().forEach(et ->
allEvents.add(new AggregateServiceDomainEventType(
et.typeName(), et.version(), et.create(), et.external(),
"domainevents." + et.typeName())));
"domainevents." + et.typeName(),
normalizeDescription(et.typeClass().getAnnotation(DomainEventInfo.class).description()))));

// Consumed external events — mirrors the pattern in AkcesAggregateController so
// that service discovery correctly reflects the external event dependencies.
List<AggregateServiceDomainEventType> consumedEvents =
aggregateRuntime.getExternalDomainEventTypes().stream()
.map(et -> new AggregateServiceDomainEventType(
et.typeName(), et.version(), et.create(), et.external(),
"domainevents." + et.typeName()))
"domainevents." + et.typeName(),
normalizeDescription(et.typeClass().getAnnotation(DomainEventInfo.class).description())))
.toList();

AggregateServiceRecord serviceRecord = new AggregateServiceRecord(
Expand All @@ -422,7 +429,8 @@ private void publishControlRecord() {
AggregateServiceType.AGENTIC,
allCommands,
allEvents,
consumedEvents
consumedEvents,
aggregateRuntime.getDescription()
);

controlProducer.beginTransaction();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.elasticsoftware.akces.agentic.runtime;

import com.embabel.agent.core.AgentPlatform;
import jakarta.annotation.Nullable;
import org.apache.kafka.common.errors.SerializationException;
import org.elasticsoftware.akces.agentic.AgenticAggregateRuntime;
import org.elasticsoftware.akces.agentic.events.MemoryRevokedEvent;
Expand Down Expand Up @@ -115,6 +116,12 @@ public String getName() {
return delegate.getName();
}

@Override
@Nullable
public String getDescription() {
return delegate.getDescription();
}

@Override
public Class<? extends Aggregate<?>> getAggregateClass() {
return delegate.getAggregateClass();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,9 +289,10 @@ public static void prepareExternalServices(String bootstrapServers) {
"Account-Commands",
"Account-DomainEvents",
AggregateServiceType.STANDARD,
List.of(new AggregateServiceCommandType("CreateAccount", 1, true, "commands.CreateAccount")),
List.of(new AggregateServiceDomainEventType("AccountCreated", 1, true, false, "domainevents.AccountCreated")),
List.of());
List.of(new AggregateServiceCommandType("CreateAccount", 1, true, "commands.CreateAccount", null)),
List.of(new AggregateServiceDomainEventType("AccountCreated", 1, true, false, "domainevents.AccountCreated", null)),
List.of(),
null);
controlProducer.beginTransaction();
for (int partition = 0; partition < 3; partition++) {
controlProducer.send(new ProducerRecord<>("Akces-Control", partition, "Account", aggregateServiceRecord));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsoftware.akces.aggregate.DomainEventType;
import org.elasticsoftware.akces.aggregate.SchemaType;
import org.elasticsoftware.akces.annotations.CommandInfo;
import org.elasticsoftware.akces.annotations.DomainEventInfo;
import org.elasticsoftware.akces.commands.Command;
import org.elasticsoftware.akces.control.*;
import org.elasticsoftware.akces.gdpr.GDPRContextRepositoryFactory;
Expand Down Expand Up @@ -75,6 +76,7 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.elasticsoftware.akces.AkcesControllerState.*;
import static org.elasticsoftware.akces.gdpr.GDPRAnnotationUtils.hasPIIDataAnnotation;
import static org.elasticsoftware.akces.kafka.KafkaAggregateRuntime.normalizeDescription;
import static org.elasticsoftware.akces.kafka.PartitionUtils.*;
import static org.elasticsoftware.akces.util.KafkaUtils.createCompactedTopic;
import static org.elasticsoftware.akces.util.KafkaUtils.getIndexTopicName;
Expand Down Expand Up @@ -463,21 +465,25 @@ private void publishControlRecord(int partitions) {
commandType.typeName(),
commandType.version(),
commandType.create(),
"commands." + commandType.typeName())).toList(),
"commands." + commandType.typeName(),
normalizeDescription(commandType.typeClass().getAnnotation(CommandInfo.class).description()))).toList(),
aggregateRuntime.getProducedDomainEventTypes().stream().map(domainEventType ->
new AggregateServiceDomainEventType(
domainEventType.typeName(),
domainEventType.version(),
domainEventType.create(),
domainEventType.external(),
"domainevents." + domainEventType.typeName())).toList(),
"domainevents." + domainEventType.typeName(),
normalizeDescription(domainEventType.typeClass().getAnnotation(DomainEventInfo.class).description()))).toList(),
aggregateRuntime.getExternalDomainEventTypes().stream().map(externalDomainEventType ->
new AggregateServiceDomainEventType(
externalDomainEventType.typeName(),
externalDomainEventType.version(),
externalDomainEventType.create(),
externalDomainEventType.external(),
"domainevents." + externalDomainEventType.typeName())).toList());
"domainevents." + externalDomainEventType.typeName(),
normalizeDescription(externalDomainEventType.typeClass().getAnnotation(DomainEventInfo.class).description()))).toList(),
aggregateRuntime.getDescription());
controlProducer.beginTransaction();
for (int partition = 0; partition < partitions; partition++) {
controlProducer.send(new ProducerRecord<>("Akces-Control", partition, aggregateRuntime.getName(), aggregateServiceRecord));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.elasticsoftware.akces.aggregate;

import jakarta.annotation.Nullable;
import org.apache.kafka.common.errors.SerializationException;
import org.elasticsoftware.akces.commands.Command;
import org.elasticsoftware.akces.commands.CommandBus;
Expand All @@ -37,6 +38,15 @@ public interface AggregateRuntime {

String getName();

/**
* Returns the human-readable description of this aggregate, as specified in
* the {@code @AggregateInfo} or {@code @AgenticAggregateInfo} annotation.
*
* @return the description, or {@code null} if none was specified
*/
@Nullable
String getDescription();

Class<? extends Aggregate<?>> getAggregateClass();

void handleCommandRecord(CommandRecord commandRecord,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import tools.jackson.core.JacksonException;
import tools.jackson.databind.JsonNode;
import tools.jackson.databind.ObjectMapper;
import org.elasticsoftware.akces.annotations.AgenticAggregateInfo;
import org.elasticsoftware.akces.annotations.AggregateInfo;
import org.elasticsoftware.akces.schemas.JsonSchema;
import jakarta.annotation.Nullable;
import org.apache.kafka.common.errors.SerializationException;
Expand Down Expand Up @@ -107,6 +109,31 @@ public String getName() {
return stateType.typeName();
}

@Override
@Nullable
public String getDescription() {
AggregateInfo aggregateInfo = aggregateClass.getAnnotation(AggregateInfo.class);
if (aggregateInfo != null) {
return normalizeDescription(aggregateInfo.description());
}
AgenticAggregateInfo agenticInfo = aggregateClass.getAnnotation(AgenticAggregateInfo.class);
if (agenticInfo != null) {
return normalizeDescription(agenticInfo.description());
}
return null;
}

/**
* Normalizes a description string by returning {@code null} for blank values.
*
* @param description the raw description from the annotation
* @return the description if non-blank, or {@code null}
*/
@Nullable
public static String normalizeDescription(@Nullable String description) {
return description == null || description.isBlank() ? null : description;
}

@Override
public Class<? extends Aggregate<?>> getAggregateClass() {
return aggregateClass;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,10 @@ public static void prepareExternalServices(String bootstrapServers) {
"Account" + COMMANDS_SUFFIX,
"Account" + DOMAINEVENTS_SUFFIX,
AggregateServiceType.STANDARD,
List.of(new AggregateServiceCommandType("CreateAccount", 1, true, "commands.CreateAccount")),
List.of(new AggregateServiceDomainEventType("AccountCreated", 1, true, false, "domainevents.AccountCreated")),
List.of());
List.of(new AggregateServiceCommandType("CreateAccount", 1, true, "commands.CreateAccount", null)),
List.of(new AggregateServiceDomainEventType("AccountCreated", 1, true, false, "domainevents.AccountCreated", null)),
List.of(),
null);
controlProducer.beginTransaction();
for (int partition = 0; partition < 3; partition++) {
controlProducer.send(new ProducerRecord<>("Akces-Control", partition, "Account", aggregateServiceRecord));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ public void testSerialization() {
"Account-Commands",
"Account-DomainEvents",
AggregateServiceType.STANDARD,
List.of(new AggregateServiceCommandType("CreateAccount", 1, true, "commands.CreateAccount")),
List.of(new AggregateServiceDomainEventType("AccountCreated", 1, true, false, "domainevents.AccountCreated")),
Collections.emptyList());
List.of(new AggregateServiceCommandType("CreateAccount", 1, true, "commands.CreateAccount", null)),
List.of(new AggregateServiceDomainEventType("AccountCreated", 1, true, false, "domainevents.AccountCreated", null)),
Collections.emptyList(),
null);
assertNotNull(record);
}

Expand Down Expand Up @@ -86,9 +87,10 @@ public void testSerde() {
"Account-Commands",
"Account-DomainEvents",
AggregateServiceType.STANDARD,
List.of(new AggregateServiceCommandType("CreateAccount", 1, true, "commands.CreateAccount")),
List.of(new AggregateServiceDomainEventType("AccountCreated", 1, true, false, "domainevents.AccountCreated")),
Collections.emptyList());
List.of(new AggregateServiceCommandType("CreateAccount", 1, true, "commands.CreateAccount", null)),
List.of(new AggregateServiceDomainEventType("AccountCreated", 1, true, false, "domainevents.AccountCreated", null)),
Collections.emptyList(),
null);
byte[] serialized = serde.serializer().serialize("Akces-Control", record);
assertNotNull(serialized);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,37 @@

package org.elasticsoftware.akces.control;

import jakarta.annotation.Nullable;
import org.elasticsoftware.akces.aggregate.CommandType;
import org.elasticsoftware.akces.commands.Command;

import static org.elasticsoftware.akces.gdpr.GDPRAnnotationUtils.hasPIIDataAnnotation;

/**
* Describes a command type supported by an aggregate service, as published on the
* {@code Akces-Control} topic.
*
* @param typeName the logical name of the command type
* @param version the schema version of the command
* @param create whether this command creates a new aggregate instance
* @param schemaName the schema registry subject name for this command
* @param description a human-readable description of the command; may be {@code null}
*/
public record AggregateServiceCommandType(
String typeName,
int version,
boolean create,
String schemaName
String schemaName,
@Nullable String description
) {
/**
* Converts this service command type into a {@link CommandType} for use as an external
* command type within an aggregate runtime.
*
* @param typeClass the Java class of the command
* @param <C> the command type
* @return a new {@link CommandType} instance
*/
public <C extends Command> CommandType<C> toExternalCommandType(Class<C> typeClass) {
return new CommandType<>(typeName, version, typeClass, create, true, hasPIIDataAnnotation(typeClass));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,40 @@

package org.elasticsoftware.akces.control;

import jakarta.annotation.Nullable;
import org.elasticsoftware.akces.aggregate.DomainEventType;
import org.elasticsoftware.akces.events.DomainEvent;

import static org.elasticsoftware.akces.gdpr.GDPRAnnotationUtils.hasPIIDataAnnotation;

/**
* Describes a domain-event type produced or consumed by an aggregate service, as
* published on the {@code Akces-Control} topic.
*
* @param typeName the logical name of the domain-event type
* @param version the schema version of the domain event
* @param create whether this event creates a new aggregate instance
* @param external whether this event originates from another aggregate
* @param schemaName the schema registry subject name for this domain event
* @param description a human-readable description of the domain event; may be {@code null}
*/
public record AggregateServiceDomainEventType(
String typeName,
int version,
boolean create,
boolean external,
String schemaName
String schemaName,
@Nullable String description
) {
/**
* Converts this service domain-event type into a {@link DomainEventType} for use
* within an aggregate runtime.
*
* @param typeClass the Java class of the domain event
* @param error whether this event represents an error
* @param <E> the domain-event type
* @return a new {@link DomainEventType} instance
*/
public <E extends DomainEvent> DomainEventType<E> toLocalDomainEventType(Class<E> typeClass, boolean error) {
return new DomainEventType<>(typeName, version, typeClass, create, external, error, hasPIIDataAnnotation(typeClass));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.elasticsoftware.akces.control;

import jakarta.annotation.Nullable;

import java.util.List;

/**
Expand All @@ -26,13 +28,19 @@
* aggregate services. It defaults to {@link AggregateServiceType#STANDARD} for records
* that pre-date the introduction of this field (see {@link #effectiveType()}).
*
* <p>The optional {@link #description()} field provides a human-readable summary of
* the aggregate, sourced from the {@code @AggregateInfo} or {@code @AgenticAggregateInfo}
* annotation. It may be {@code null} for legacy records or when no description was
* specified in the annotation.
*
* @param aggregateName the logical name of the aggregate
* @param commandTopic the Kafka topic to which commands for this aggregate are sent
* @param domainEventTopic the Kafka topic on which this aggregate publishes domain events
* @param type the service type; may be {@code null} for legacy records
* @param supportedCommands the command types supported by this service
* @param producedEvents the domain-event types produced by this service
* @param consumedEvents the external domain-event types consumed by this service
* @param description a human-readable description of the aggregate; may be {@code null}
*/
public record AggregateServiceRecord(
String aggregateName,
Expand All @@ -41,7 +49,8 @@ public record AggregateServiceRecord(
AggregateServiceType type,
List<AggregateServiceCommandType> supportedCommands,
List<AggregateServiceDomainEventType> producedEvents,
List<AggregateServiceDomainEventType> consumedEvents
List<AggregateServiceDomainEventType> consumedEvents,
@Nullable String description
) implements AkcesControlRecord {

/**
Expand Down
Loading
Loading