From ff3ff06ed478ddf804d764a3ab80949b312b630b Mon Sep 17 00:00:00 2001 From: Pol Pinol Castuera Date: Thu, 29 May 2025 22:16:31 +0200 Subject: [PATCH 01/13] Dont add @Id in alerts read model --- .../infrastructure/read_models/alerts/DecisionDocument.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/DecisionDocument.java b/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/DecisionDocument.java index 42772d9..a7eae2e 100644 --- a/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/DecisionDocument.java +++ b/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/DecisionDocument.java @@ -13,7 +13,6 @@ @Getter @Document(collection = "alerts") public class DecisionDocument { - @Id private String userId; private String assetId; private String type; From dc7dc7a59ada54556e61bd16e2d5a7046d3d6a4e Mon Sep 17 00:00:00 2001 From: Pol Pinol Castuera Date: Sat, 31 May 2025 10:27:36 +0200 Subject: [PATCH 02/13] Add sonar and linter --- HELP.md | 1 - build.gradle | 80 ++++++++++++------- .../io/autoinvestor/application/AlertDTO.java | 7 +- .../AlertsReadModelRepository.java | 1 + .../application/EmitAlertsCommand.java | 7 +- .../application/EmitAlertsCommandHandler.java | 38 ++++++--- .../application/GetAlertsQueryHandler.java | 10 +-- .../application/GetAlertsQueryResponse.java | 6 +- .../application/GetDecisionsQuery.java | 2 +- .../application/InboxReadModelRepository.java | 1 + .../RegisterPortfolioAssetCommand.java | 3 +- .../RegisterPortfolioAssetCommandHandler.java | 13 +-- .../application/RegisterUserCommand.java | 3 +- .../RegisterUserCommandHandler.java | 9 ++- src/main/java/io/autoinvestor/domain/Id.java | 6 +- .../domain/PortfolioRepository.java | 3 + .../domain/events/AlertEmittedEvent.java | 35 ++++---- .../events/AlertEmittedEventPayload.java | 10 +-- .../io/autoinvestor/domain/events/Event.java | 3 +- .../autoinvestor/domain/events/EventId.java | 1 - .../domain/events/EventSourcedEntity.java | 1 - .../domain/events/EventStoreRepository.java | 3 +- .../domain/events/InboxCreatedEvent.java | 28 +++---- .../events/InboxCreatedEventPayload.java | 7 +- .../events/SubscriptionCreatedEvent.java | 29 ++++--- .../SubscriptionCreatedEventPayload.java | 8 +- .../io/autoinvestor/domain/model/Alert.java | 3 +- .../io/autoinvestor/domain/model/Inbox.java | 25 +++--- .../io/autoinvestor/domain/model/InboxId.java | 1 - .../autoinvestor/domain/model/InboxState.java | 34 ++++---- .../io/autoinvestor/domain/model/UserId.java | 1 - .../event_publishers/EventMessageMapper.java | 24 +++--- .../InMemoryEventPublisher.java | 12 ++- .../PubsubEventPublisher.java | 33 +++++--- .../PubsubDecisionsEventSubscriber.java | 77 +++++++++++------- .../infrastructure/listeners/PubsubEvent.java | 7 +- .../listeners/PubsubEventMapper.java | 6 +- .../PubsubPortfolioAssetEventSubscriber.java | 73 ++++++++++------- .../listeners/PubsubUsersEventSubscriber.java | 68 ++++++++++------ .../read_models/alerts/DecisionDocument.java | 8 +- .../read_models/alerts/DecisionMapper.java | 16 +--- .../InMemoryAlertsReadModelRepository.java | 30 +++++-- .../MongoAlertsReadModelRepository.java | 16 ++-- .../read_models/users/DecisionDocument.java | 13 +-- .../InMemoryInboxReadModelRepository.java | 9 +-- .../users/MongoInboxReadModelRepository.java | 5 +- .../event_store/EventDocument.java | 50 ++++++------ .../repositories/event_store/EventMapper.java | 15 ++-- .../InMemoryEventStoreRepository.java | 16 ++-- .../MongoEventStoreRepository.java | 35 ++++---- .../InMemoryPortfolioRepository.java | 5 +- .../portfolio/MongoPortfolioRepository.java | 35 ++++---- .../portfolio/PortfolioDocument.java | 6 +- .../repositories/portfolio/UserDocument.java | 4 +- .../autoinvestor/ui/GetAlertsController.java | 30 ++++--- .../java/io/autoinvestor/ui/GetAlertsDTO.java | 3 +- 56 files changed, 504 insertions(+), 471 deletions(-) diff --git a/HELP.md b/HELP.md index f876abc..bdbf568 100644 --- a/HELP.md +++ b/HELP.md @@ -11,4 +11,3 @@ For further reference, please consider the following sections: These additional references should also help you: * [Gradle Build Scans – insights for your project's build](https://scans.gradle.com#gradle) - diff --git a/build.gradle b/build.gradle index 2e88f93..db7692c 100644 --- a/build.gradle +++ b/build.gradle @@ -1,49 +1,69 @@ plugins { - id 'java' - id 'org.springframework.boot' version '3.4.4' - id 'io.spring.dependency-management' version '1.1.7' - id 'io.freefair.lombok' version '8.13.1' + id 'java' + id 'org.springframework.boot' version '3.4.4' + id 'io.spring.dependency-management' version '1.1.7' + id 'io.freefair.lombok' version '8.13.1' + id 'com.diffplug.spotless' version '7.0.4' } group = 'io.autoinvestor' java { - toolchain { - languageVersion = JavaLanguageVersion.of(21) - } + toolchain { + languageVersion = JavaLanguageVersion.of(21) + } } repositories { - mavenCentral() + mavenCentral() } dependencies { - implementation 'org.springframework.boot:spring-boot-starter-web' - testImplementation 'org.springframework.boot:spring-boot-starter-test' - testRuntimeOnly 'org.junit.platform:junit-platform-launcher' - - implementation 'com.google.cloud:google-cloud-pubsub:1.123.0' - implementation "com.google.cloud:spring-cloud-gcp-starter-pubsub:6.1.1" - implementation 'com.fasterxml.jackson.core:jackson-databind' - implementation 'org.springframework.integration:spring-integration-core' - implementation 'org.springframework.boot:spring-boot-starter-data-mongodb' - - testImplementation 'org.springframework.boot:spring-boot-testcontainers' - testImplementation 'org.testcontainers:testcontainers' - testImplementation 'org.testcontainers:junit-jupiter' - testImplementation 'org.testcontainers:gcloud' - - compileOnly 'org.projectlombok:lombok:1.18.38' - annotationProcessor 'org.projectlombok:lombok:1.18.38' - - testCompileOnly 'org.projectlombok:lombok:1.18.38' - testAnnotationProcessor 'org.projectlombok:lombok:1.18.38' + implementation 'org.springframework.boot:spring-boot-starter-web' + testImplementation 'org.springframework.boot:spring-boot-starter-test' + testRuntimeOnly 'org.junit.platform:junit-platform-launcher' + + implementation 'com.google.cloud:google-cloud-pubsub:1.123.0' + implementation "com.google.cloud:spring-cloud-gcp-starter-pubsub:6.1.1" + implementation 'com.fasterxml.jackson.core:jackson-databind' + implementation 'org.springframework.integration:spring-integration-core' + implementation 'org.springframework.boot:spring-boot-starter-data-mongodb' + + testImplementation 'org.springframework.boot:spring-boot-testcontainers' + testImplementation 'org.testcontainers:testcontainers' + testImplementation 'org.testcontainers:junit-jupiter' + testImplementation 'org.testcontainers:gcloud' + + compileOnly 'org.projectlombok:lombok:1.18.38' + annotationProcessor 'org.projectlombok:lombok:1.18.38' + + testCompileOnly 'org.projectlombok:lombok:1.18.38' + testAnnotationProcessor 'org.projectlombok:lombok:1.18.38' } tasks.named('test') { - useJUnitPlatform() + useJUnitPlatform() } bootBuildImage { - publish = false + publish = false +} + +spotless { + java { + googleJavaFormat('1.22.0').aosp() + removeUnusedImports() + trimTrailingWhitespace() + leadingTabsToSpaces() + endWithNewline() + importOrder '', 'java', 'javax', 'org', 'com' + target 'src/**/*.java' + } + + format 'misc', { + target '*.gradle', '*.md', '.gitignore' + leadingTabsToSpaces() + trimTrailingWhitespace() + endWithNewline() + } } diff --git a/src/main/java/io/autoinvestor/application/AlertDTO.java b/src/main/java/io/autoinvestor/application/AlertDTO.java index 35d6c7e..dd7eb76 100644 --- a/src/main/java/io/autoinvestor/application/AlertDTO.java +++ b/src/main/java/io/autoinvestor/application/AlertDTO.java @@ -2,9 +2,4 @@ import java.util.Date; -public record AlertDTO( - String userId, - String assetId, - String type, - Date date -) {} +public record AlertDTO(String userId, String assetId, String type, Date date) {} diff --git a/src/main/java/io/autoinvestor/application/AlertsReadModelRepository.java b/src/main/java/io/autoinvestor/application/AlertsReadModelRepository.java index 2dcc95a..874779c 100644 --- a/src/main/java/io/autoinvestor/application/AlertsReadModelRepository.java +++ b/src/main/java/io/autoinvestor/application/AlertsReadModelRepository.java @@ -4,5 +4,6 @@ public interface AlertsReadModelRepository { void save(AlertDTO alertDTO); + List get(String userId); } diff --git a/src/main/java/io/autoinvestor/application/EmitAlertsCommand.java b/src/main/java/io/autoinvestor/application/EmitAlertsCommand.java index e324d5b..d7d16e7 100644 --- a/src/main/java/io/autoinvestor/application/EmitAlertsCommand.java +++ b/src/main/java/io/autoinvestor/application/EmitAlertsCommand.java @@ -1,8 +1,3 @@ package io.autoinvestor.application; - -public record EmitAlertsCommand( - String assetId, - String decision, - int riskLevel -) { } +public record EmitAlertsCommand(String assetId, String decision, int riskLevel) {} diff --git a/src/main/java/io/autoinvestor/application/EmitAlertsCommandHandler.java b/src/main/java/io/autoinvestor/application/EmitAlertsCommandHandler.java index 170f39a..ee52cb2 100644 --- a/src/main/java/io/autoinvestor/application/EmitAlertsCommandHandler.java +++ b/src/main/java/io/autoinvestor/application/EmitAlertsCommandHandler.java @@ -10,11 +10,12 @@ import io.autoinvestor.domain.model.UserId; import io.autoinvestor.exceptions.InternalErrorException; import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Service; import java.util.List; import java.util.Optional; +import org.springframework.stereotype.Service; + @Slf4j @Service public class EmitAlertsCommandHandler { @@ -25,9 +26,12 @@ public class EmitAlertsCommandHandler { private final InboxReadModelRepository inboxReadModel; private final EventPublisher eventPublisher; - public EmitAlertsCommandHandler(EventStoreRepository eventStore, PortfolioRepository portfolioRepository, - AlertsReadModelRepository alertsReadModel, InboxReadModelRepository inboxReadModel, - EventPublisher eventPublisher) { + public EmitAlertsCommandHandler( + EventStoreRepository eventStore, + PortfolioRepository portfolioRepository, + AlertsReadModelRepository alertsReadModel, + InboxReadModelRepository inboxReadModel, + EventPublisher eventPublisher) { this.eventStore = eventStore; this.portfolioRepository = portfolioRepository; this.alertsReadModel = alertsReadModel; @@ -36,7 +40,9 @@ public EmitAlertsCommandHandler(EventStoreRepository eventStore, PortfolioReposi } public void handle(EmitAlertsCommand command) { - List usersId = this.portfolioRepository.getUsersIdByAssetAndRiskLevel(command.assetId(), command.riskLevel()); + List usersId = + this.portfolioRepository.getUsersIdByAssetAndRiskLevel( + command.assetId(), command.riskLevel()); for (UserId userId : usersId) { Optional inboxIdOpt = this.inboxReadModel.getInboxId(userId); @@ -60,15 +66,21 @@ public void handle(EmitAlertsCommand command) { this.eventStore.save(inbox); - Alert alert = inbox.getState().getLastAlert() - .orElseThrow(() -> new InternalErrorException("No alert found after emitting one for userId " + userId.value())); + Alert alert = + inbox.getState() + .getLastAlert() + .orElseThrow( + () -> + new InternalErrorException( + "No alert found after emitting one for userId " + + userId.value())); - AlertDTO alertDTO = new AlertDTO( - userId.value(), - command.assetId(), - alert.decision().name(), - alert.date() - ); + AlertDTO alertDTO = + new AlertDTO( + userId.value(), + command.assetId(), + alert.decision().name(), + alert.date()); this.alertsReadModel.save(alertDTO); this.eventPublisher.publish(events); diff --git a/src/main/java/io/autoinvestor/application/GetAlertsQueryHandler.java b/src/main/java/io/autoinvestor/application/GetAlertsQueryHandler.java index efacd61..5d0e8c5 100644 --- a/src/main/java/io/autoinvestor/application/GetAlertsQueryHandler.java +++ b/src/main/java/io/autoinvestor/application/GetAlertsQueryHandler.java @@ -1,10 +1,10 @@ package io.autoinvestor.application; -import org.springframework.stereotype.Service; - import java.util.List; import java.util.stream.Collectors; +import org.springframework.stereotype.Service; + @Service public class GetAlertsQueryHandler { @@ -18,11 +18,7 @@ public List handle(GetDecisionsQuery query) { List decisions = this.readModel.get(query.userId()); return decisions.stream() - .map(d -> new GetAlertsQueryResponse( - d.assetId(), - d.type(), - d.date() - )) + .map(d -> new GetAlertsQueryResponse(d.assetId(), d.type(), d.date())) .collect(Collectors.toList()); } } diff --git a/src/main/java/io/autoinvestor/application/GetAlertsQueryResponse.java b/src/main/java/io/autoinvestor/application/GetAlertsQueryResponse.java index bb5dbff..f91e27f 100644 --- a/src/main/java/io/autoinvestor/application/GetAlertsQueryResponse.java +++ b/src/main/java/io/autoinvestor/application/GetAlertsQueryResponse.java @@ -2,8 +2,4 @@ import java.util.Date; -public record GetAlertsQueryResponse( - String assetId, - String type, - Date date -) { } \ No newline at end of file +public record GetAlertsQueryResponse(String assetId, String type, Date date) {} diff --git a/src/main/java/io/autoinvestor/application/GetDecisionsQuery.java b/src/main/java/io/autoinvestor/application/GetDecisionsQuery.java index 5958289..03006ba 100644 --- a/src/main/java/io/autoinvestor/application/GetDecisionsQuery.java +++ b/src/main/java/io/autoinvestor/application/GetDecisionsQuery.java @@ -1,3 +1,3 @@ package io.autoinvestor.application; -public record GetDecisionsQuery(String userId) { } +public record GetDecisionsQuery(String userId) {} diff --git a/src/main/java/io/autoinvestor/application/InboxReadModelRepository.java b/src/main/java/io/autoinvestor/application/InboxReadModelRepository.java index 01508c7..bee76ce 100644 --- a/src/main/java/io/autoinvestor/application/InboxReadModelRepository.java +++ b/src/main/java/io/autoinvestor/application/InboxReadModelRepository.java @@ -7,5 +7,6 @@ public interface InboxReadModelRepository { void save(UserId userId, InboxId inboxId); + Optional getInboxId(UserId userId); } diff --git a/src/main/java/io/autoinvestor/application/RegisterPortfolioAssetCommand.java b/src/main/java/io/autoinvestor/application/RegisterPortfolioAssetCommand.java index 4a50678..80f055a 100644 --- a/src/main/java/io/autoinvestor/application/RegisterPortfolioAssetCommand.java +++ b/src/main/java/io/autoinvestor/application/RegisterPortfolioAssetCommand.java @@ -1,4 +1,3 @@ package io.autoinvestor.application; -public record RegisterPortfolioAssetCommand(String userId, String assetId) { -} +public record RegisterPortfolioAssetCommand(String userId, String assetId) {} diff --git a/src/main/java/io/autoinvestor/application/RegisterPortfolioAssetCommandHandler.java b/src/main/java/io/autoinvestor/application/RegisterPortfolioAssetCommandHandler.java index 8ca0609..475d43e 100644 --- a/src/main/java/io/autoinvestor/application/RegisterPortfolioAssetCommandHandler.java +++ b/src/main/java/io/autoinvestor/application/RegisterPortfolioAssetCommandHandler.java @@ -7,13 +7,12 @@ import io.autoinvestor.domain.model.Inbox; import io.autoinvestor.domain.model.InboxId; import io.autoinvestor.domain.model.UserId; -import io.autoinvestor.exceptions.InternalErrorException; import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Service; import java.util.List; import java.util.Optional; +import org.springframework.stereotype.Service; @Slf4j @Service @@ -24,8 +23,11 @@ public class RegisterPortfolioAssetCommandHandler { private final InboxReadModelRepository inboxReadModel; private final EventPublisher eventPublisher; - public RegisterPortfolioAssetCommandHandler(EventStoreRepository eventStore, PortfolioRepository portfolioRepository, - InboxReadModelRepository inboxReadModel, EventPublisher eventPublisher) { + public RegisterPortfolioAssetCommandHandler( + EventStoreRepository eventStore, + PortfolioRepository portfolioRepository, + InboxReadModelRepository inboxReadModel, + EventPublisher eventPublisher) { this.eventStore = eventStore; this.portfolioRepository = portfolioRepository; this.inboxReadModel = inboxReadModel; @@ -34,7 +36,8 @@ public RegisterPortfolioAssetCommandHandler(EventStoreRepository eventStore, Por public void handle(RegisterPortfolioAssetCommand command) { if (this.portfolioRepository.existsPortfolioAsset(command.userId(), command.assetId())) { - log.info("Asset {} already registered for user {}", command.assetId(), command.userId()); + log.info( + "Asset {} already registered for user {}", command.assetId(), command.userId()); return; } diff --git a/src/main/java/io/autoinvestor/application/RegisterUserCommand.java b/src/main/java/io/autoinvestor/application/RegisterUserCommand.java index d9c100b..7dc384f 100644 --- a/src/main/java/io/autoinvestor/application/RegisterUserCommand.java +++ b/src/main/java/io/autoinvestor/application/RegisterUserCommand.java @@ -1,4 +1,3 @@ package io.autoinvestor.application; -public record RegisterUserCommand(String userId, int riskLevel) { -} +public record RegisterUserCommand(String userId, int riskLevel) {} diff --git a/src/main/java/io/autoinvestor/application/RegisterUserCommandHandler.java b/src/main/java/io/autoinvestor/application/RegisterUserCommandHandler.java index 386ff95..2b79f42 100644 --- a/src/main/java/io/autoinvestor/application/RegisterUserCommandHandler.java +++ b/src/main/java/io/autoinvestor/application/RegisterUserCommandHandler.java @@ -7,10 +7,10 @@ import io.autoinvestor.domain.model.Inbox; import io.autoinvestor.domain.model.UserId; import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Service; import java.util.List; +import org.springframework.stereotype.Service; @Slf4j @Service @@ -21,8 +21,11 @@ public class RegisterUserCommandHandler { private final InboxReadModelRepository inboxReadModel; private final EventPublisher eventPublisher; - public RegisterUserCommandHandler(EventStoreRepository eventStore, PortfolioRepository portfolioRepository, - InboxReadModelRepository inboxReadModel, EventPublisher eventPublisher) { + public RegisterUserCommandHandler( + EventStoreRepository eventStore, + PortfolioRepository portfolioRepository, + InboxReadModelRepository inboxReadModel, + EventPublisher eventPublisher) { this.eventStore = eventStore; this.portfolioRepository = portfolioRepository; this.inboxReadModel = inboxReadModel; diff --git a/src/main/java/io/autoinvestor/domain/Id.java b/src/main/java/io/autoinvestor/domain/Id.java index 7e541e8..4af1ee3 100644 --- a/src/main/java/io/autoinvestor/domain/Id.java +++ b/src/main/java/io/autoinvestor/domain/Id.java @@ -20,10 +20,8 @@ protected static String generateId() { @Override public boolean equals(Object o) { - if (this == o) - return true; - if (!(o instanceof Id that)) - return false; + if (this == o) return true; + if (!(o instanceof Id that)) return false; return id.equals(that.id); } diff --git a/src/main/java/io/autoinvestor/domain/PortfolioRepository.java b/src/main/java/io/autoinvestor/domain/PortfolioRepository.java index cf89ec0..30acca6 100644 --- a/src/main/java/io/autoinvestor/domain/PortfolioRepository.java +++ b/src/main/java/io/autoinvestor/domain/PortfolioRepository.java @@ -6,7 +6,10 @@ public interface PortfolioRepository { List getUsersIdByAssetAndRiskLevel(String assetId, int riskLevel); + void addUser(String userId, int riskLevel); + void addPortfolioAsset(String userId, String assetId); + boolean existsPortfolioAsset(String userId, String assetId); } diff --git a/src/main/java/io/autoinvestor/domain/events/AlertEmittedEvent.java b/src/main/java/io/autoinvestor/domain/events/AlertEmittedEvent.java index 069f181..1e8df7f 100644 --- a/src/main/java/io/autoinvestor/domain/events/AlertEmittedEvent.java +++ b/src/main/java/io/autoinvestor/domain/events/AlertEmittedEvent.java @@ -2,13 +2,12 @@ import io.autoinvestor.domain.Id; import io.autoinvestor.domain.model.AssetId; +import io.autoinvestor.domain.model.Decision; import io.autoinvestor.domain.model.InboxId; import io.autoinvestor.domain.model.UserId; -import io.autoinvestor.domain.model.Decision; import java.util.Date; - public class AlertEmittedEvent extends Event { public static final String TYPE = "ALERT_EMITTED_EVENT"; @@ -17,28 +16,28 @@ private AlertEmittedEvent(Id aggregateId, AlertEmittedEventPayload payload) { super(aggregateId, TYPE, payload); } - protected AlertEmittedEvent(EventId id, - Id aggregateId, - AlertEmittedEventPayload payload, - Date occurredAt, - int version) { + protected AlertEmittedEvent( + EventId id, + Id aggregateId, + AlertEmittedEventPayload payload, + Date occurredAt, + int version) { super(id, aggregateId, TYPE, payload, occurredAt, version); } - public static AlertEmittedEvent with(InboxId inboxId, UserId userId, AssetId assetId, Decision decision) { - AlertEmittedEventPayload payload = new AlertEmittedEventPayload( - userId.value(), - assetId.value(), - decision.name() - ); + public static AlertEmittedEvent with( + InboxId inboxId, UserId userId, AssetId assetId, Decision decision) { + AlertEmittedEventPayload payload = + new AlertEmittedEventPayload(userId.value(), assetId.value(), decision.name()); return new AlertEmittedEvent(inboxId, payload); } - public static AlertEmittedEvent hydrate(EventId id, - Id aggregateId, - AlertEmittedEventPayload payload, - Date occurredAt, - int version) { + public static AlertEmittedEvent hydrate( + EventId id, + Id aggregateId, + AlertEmittedEventPayload payload, + Date occurredAt, + int version) { return new AlertEmittedEvent(id, aggregateId, payload, occurredAt, version); } } diff --git a/src/main/java/io/autoinvestor/domain/events/AlertEmittedEventPayload.java b/src/main/java/io/autoinvestor/domain/events/AlertEmittedEventPayload.java index 2f6e6e4..1abb2ac 100644 --- a/src/main/java/io/autoinvestor/domain/events/AlertEmittedEventPayload.java +++ b/src/main/java/io/autoinvestor/domain/events/AlertEmittedEventPayload.java @@ -2,18 +2,14 @@ import java.util.Map; -public record AlertEmittedEventPayload( - String userId, - String assetId, - String decision -) implements EventPayload { +public record AlertEmittedEventPayload(String userId, String assetId, String decision) + implements EventPayload { @Override public Map asMap() { return Map.of( "userId", userId, "assetId", assetId, - "decision", decision - ); + "decision", decision); } } diff --git a/src/main/java/io/autoinvestor/domain/events/Event.java b/src/main/java/io/autoinvestor/domain/events/Event.java index 940207c..fe6725e 100644 --- a/src/main/java/io/autoinvestor/domain/events/Event.java +++ b/src/main/java/io/autoinvestor/domain/events/Event.java @@ -27,7 +27,8 @@ protected Event(Id aggregateId, String type, P payload, int version) { this.version = version; } - protected Event(EventId id, Id aggregateId, String type, P payload, Date occurredAt, int version) { + protected Event( + EventId id, Id aggregateId, String type, P payload, Date occurredAt, int version) { this.id = id; this.aggregateId = aggregateId; this.type = type; diff --git a/src/main/java/io/autoinvestor/domain/events/EventId.java b/src/main/java/io/autoinvestor/domain/events/EventId.java index 04eb658..1b08665 100644 --- a/src/main/java/io/autoinvestor/domain/events/EventId.java +++ b/src/main/java/io/autoinvestor/domain/events/EventId.java @@ -1,6 +1,5 @@ package io.autoinvestor.domain.events; - import io.autoinvestor.domain.Id; public class EventId extends Id { diff --git a/src/main/java/io/autoinvestor/domain/events/EventSourcedEntity.java b/src/main/java/io/autoinvestor/domain/events/EventSourcedEntity.java index e9b67b0..911eb3b 100644 --- a/src/main/java/io/autoinvestor/domain/events/EventSourcedEntity.java +++ b/src/main/java/io/autoinvestor/domain/events/EventSourcedEntity.java @@ -37,4 +37,3 @@ public void markEventsAsCommitted() { appliedEvents.clear(); } } - diff --git a/src/main/java/io/autoinvestor/domain/events/EventStoreRepository.java b/src/main/java/io/autoinvestor/domain/events/EventStoreRepository.java index d9bc041..233c26e 100644 --- a/src/main/java/io/autoinvestor/domain/events/EventStoreRepository.java +++ b/src/main/java/io/autoinvestor/domain/events/EventStoreRepository.java @@ -5,9 +5,10 @@ import java.util.Optional; - public interface EventStoreRepository { boolean exists(InboxId inboxId); + Optional get(InboxId inboxId); + void save(Inbox inbox); } diff --git a/src/main/java/io/autoinvestor/domain/events/InboxCreatedEvent.java b/src/main/java/io/autoinvestor/domain/events/InboxCreatedEvent.java index 8a5698a..4d17d88 100644 --- a/src/main/java/io/autoinvestor/domain/events/InboxCreatedEvent.java +++ b/src/main/java/io/autoinvestor/domain/events/InboxCreatedEvent.java @@ -6,7 +6,6 @@ import java.util.Date; - public class InboxCreatedEvent extends Event { public static final String TYPE = "INBOX_CREATED"; @@ -15,27 +14,26 @@ private InboxCreatedEvent(Id aggregateId, InboxCreatedEventPayload payload) { super(aggregateId, TYPE, payload); } - protected InboxCreatedEvent(EventId id, - Id aggregateId, - InboxCreatedEventPayload payload, - Date occurredAt, - int version) { + protected InboxCreatedEvent( + EventId id, + Id aggregateId, + InboxCreatedEventPayload payload, + Date occurredAt, + int version) { super(id, aggregateId, TYPE, payload, occurredAt, version); } public static InboxCreatedEvent with(InboxId inboxId, UserId userId, int riskLevel) { - InboxCreatedEventPayload payload = new InboxCreatedEventPayload( - userId.value(), - riskLevel - ); + InboxCreatedEventPayload payload = new InboxCreatedEventPayload(userId.value(), riskLevel); return new InboxCreatedEvent(inboxId, payload); } - public static InboxCreatedEvent hydrate(EventId id, - Id aggregateId, - InboxCreatedEventPayload payload, - Date occurredAt, - int version) { + public static InboxCreatedEvent hydrate( + EventId id, + Id aggregateId, + InboxCreatedEventPayload payload, + Date occurredAt, + int version) { return new InboxCreatedEvent(id, aggregateId, payload, occurredAt, version); } } diff --git a/src/main/java/io/autoinvestor/domain/events/InboxCreatedEventPayload.java b/src/main/java/io/autoinvestor/domain/events/InboxCreatedEventPayload.java index 1e32655..fbd497f 100644 --- a/src/main/java/io/autoinvestor/domain/events/InboxCreatedEventPayload.java +++ b/src/main/java/io/autoinvestor/domain/events/InboxCreatedEventPayload.java @@ -2,15 +2,12 @@ import java.util.Map; -public record InboxCreatedEventPayload( - String userId, int riskLevel -) implements EventPayload { +public record InboxCreatedEventPayload(String userId, int riskLevel) implements EventPayload { @Override public Map asMap() { return Map.of( "userId", userId, - "riskLevel", riskLevel - ); + "riskLevel", riskLevel); } } diff --git a/src/main/java/io/autoinvestor/domain/events/SubscriptionCreatedEvent.java b/src/main/java/io/autoinvestor/domain/events/SubscriptionCreatedEvent.java index b5b7eb5..059eef2 100644 --- a/src/main/java/io/autoinvestor/domain/events/SubscriptionCreatedEvent.java +++ b/src/main/java/io/autoinvestor/domain/events/SubscriptionCreatedEvent.java @@ -3,11 +3,9 @@ import io.autoinvestor.domain.Id; import io.autoinvestor.domain.model.AssetId; import io.autoinvestor.domain.model.InboxId; -import io.autoinvestor.domain.model.UserId; import java.util.Date; - public class SubscriptionCreatedEvent extends Event { public static final String TYPE = "SUBSCRIPTION_CREATED"; @@ -16,26 +14,27 @@ private SubscriptionCreatedEvent(Id aggregateId, SubscriptionCreatedEventPayload super(aggregateId, TYPE, payload); } - protected SubscriptionCreatedEvent(EventId id, - Id aggregateId, - SubscriptionCreatedEventPayload payload, - Date occurredAt, - int version) { + protected SubscriptionCreatedEvent( + EventId id, + Id aggregateId, + SubscriptionCreatedEventPayload payload, + Date occurredAt, + int version) { super(id, aggregateId, TYPE, payload, occurredAt, version); } public static SubscriptionCreatedEvent with(InboxId inboxId, AssetId assetId) { - SubscriptionCreatedEventPayload payload = new SubscriptionCreatedEventPayload( - assetId.value() - ); + SubscriptionCreatedEventPayload payload = + new SubscriptionCreatedEventPayload(assetId.value()); return new SubscriptionCreatedEvent(inboxId, payload); } - public static SubscriptionCreatedEvent hydrate(EventId id, - Id aggregateId, - SubscriptionCreatedEventPayload payload, - Date occurredAt, - int version) { + public static SubscriptionCreatedEvent hydrate( + EventId id, + Id aggregateId, + SubscriptionCreatedEventPayload payload, + Date occurredAt, + int version) { return new SubscriptionCreatedEvent(id, aggregateId, payload, occurredAt, version); } } diff --git a/src/main/java/io/autoinvestor/domain/events/SubscriptionCreatedEventPayload.java b/src/main/java/io/autoinvestor/domain/events/SubscriptionCreatedEventPayload.java index fbd0b83..30156d8 100644 --- a/src/main/java/io/autoinvestor/domain/events/SubscriptionCreatedEventPayload.java +++ b/src/main/java/io/autoinvestor/domain/events/SubscriptionCreatedEventPayload.java @@ -2,14 +2,10 @@ import java.util.Map; -public record SubscriptionCreatedEventPayload( - String assetId -) implements EventPayload { +public record SubscriptionCreatedEventPayload(String assetId) implements EventPayload { @Override public Map asMap() { - return Map.of( - "assetId", assetId - ); + return Map.of("assetId", assetId); } } diff --git a/src/main/java/io/autoinvestor/domain/model/Alert.java b/src/main/java/io/autoinvestor/domain/model/Alert.java index adaf33b..ac93e59 100644 --- a/src/main/java/io/autoinvestor/domain/model/Alert.java +++ b/src/main/java/io/autoinvestor/domain/model/Alert.java @@ -2,5 +2,4 @@ import java.util.Date; -public record Alert(AssetId assetId, Decision decision, Date date) { -} +public record Alert(AssetId assetId, Decision decision, Date date) {} diff --git a/src/main/java/io/autoinvestor/domain/model/Inbox.java b/src/main/java/io/autoinvestor/domain/model/Inbox.java index 385261d..2c690d3 100644 --- a/src/main/java/io/autoinvestor/domain/model/Inbox.java +++ b/src/main/java/io/autoinvestor/domain/model/Inbox.java @@ -29,29 +29,24 @@ public static Inbox from(List> stream) { public static Inbox create(String userId, int riskLevel) { Inbox inbox = Inbox.empty(); - inbox.apply(InboxCreatedEvent.with( - inbox.getState().getInboxId(), - UserId.from(userId), - riskLevel - )); + inbox.apply( + InboxCreatedEvent.with( + inbox.getState().getInboxId(), UserId.from(userId), riskLevel)); return inbox; } public void addPortfolioAsset(String assetId) { - this.apply(SubscriptionCreatedEvent.with( - this.state.getInboxId(), - AssetId.of(assetId) - )); + this.apply(SubscriptionCreatedEvent.with(this.state.getInboxId(), AssetId.of(assetId))); } public void emitAlert(String assetId, String decision) { - this.apply(AlertEmittedEvent.with( - this.state.getInboxId(), - this.state.getUserId(), - AssetId.of(assetId), - Decision.from(decision) - )); + this.apply( + AlertEmittedEvent.with( + this.state.getInboxId(), + this.state.getUserId(), + AssetId.of(assetId), + Decision.from(decision))); } @Override diff --git a/src/main/java/io/autoinvestor/domain/model/InboxId.java b/src/main/java/io/autoinvestor/domain/model/InboxId.java index 5f3a12f..2750261 100644 --- a/src/main/java/io/autoinvestor/domain/model/InboxId.java +++ b/src/main/java/io/autoinvestor/domain/model/InboxId.java @@ -2,7 +2,6 @@ import io.autoinvestor.domain.Id; - public class InboxId extends Id { InboxId(String id) { super(id); diff --git a/src/main/java/io/autoinvestor/domain/model/InboxState.java b/src/main/java/io/autoinvestor/domain/model/InboxState.java index 21d19f5..f06f060 100644 --- a/src/main/java/io/autoinvestor/domain/model/InboxState.java +++ b/src/main/java/io/autoinvestor/domain/model/InboxState.java @@ -15,7 +15,12 @@ public class InboxState { private final Set portfolioAssets; private final List alerts; - private InboxState(InboxId inboxId, UserId userId, int riskLevel, Set portfolioAssets, List alerts) { + private InboxState( + InboxId inboxId, + UserId userId, + int riskLevel, + Set portfolioAssets, + List alerts) { this.inboxId = inboxId; this.userId = userId; this.riskLevel = riskLevel; @@ -37,8 +42,7 @@ public InboxState withInboxCreated(InboxCreatedEvent event) { UserId.from(event.getPayload().userId()), event.getPayload().riskLevel(), Set.of(), - List.of() - ); + List.of()); } public InboxState withSubscriptionCreated(SubscriptionCreatedEvent event) { @@ -47,29 +51,19 @@ public InboxState withSubscriptionCreated(SubscriptionCreatedEvent event) { updatedAssets.add(AssetId.of(assetId)); return new InboxState( - this.inboxId, - this.userId, - this.riskLevel, - updatedAssets, - this.alerts - ); + this.inboxId, this.userId, this.riskLevel, updatedAssets, this.alerts); } public InboxState withAlertEmitted(AlertEmittedEvent event) { - Alert alert = new Alert( - AssetId.of(event.getPayload().assetId()), - Decision.from(event.getPayload().decision()), - event.getOccurredAt() - ); + Alert alert = + new Alert( + AssetId.of(event.getPayload().assetId()), + Decision.from(event.getPayload().decision()), + event.getOccurredAt()); List updatedAlerts = new ArrayList<>(this.alerts); updatedAlerts.add(alert); return new InboxState( - this.inboxId, - this.userId, - this.riskLevel, - this.portfolioAssets, - updatedAlerts - ); + this.inboxId, this.userId, this.riskLevel, this.portfolioAssets, updatedAlerts); } } diff --git a/src/main/java/io/autoinvestor/domain/model/UserId.java b/src/main/java/io/autoinvestor/domain/model/UserId.java index 49cfe0f..b8d922d 100644 --- a/src/main/java/io/autoinvestor/domain/model/UserId.java +++ b/src/main/java/io/autoinvestor/domain/model/UserId.java @@ -2,7 +2,6 @@ import io.autoinvestor.domain.Id; - public class UserId extends Id { UserId(String id) { super(id); diff --git a/src/main/java/io/autoinvestor/infrastructure/event_publishers/EventMessageMapper.java b/src/main/java/io/autoinvestor/infrastructure/event_publishers/EventMessageMapper.java index 5f2c762..a1a81d2 100644 --- a/src/main/java/io/autoinvestor/infrastructure/event_publishers/EventMessageMapper.java +++ b/src/main/java/io/autoinvestor/infrastructure/event_publishers/EventMessageMapper.java @@ -1,9 +1,5 @@ package io.autoinvestor.infrastructure.event_publishers; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.protobuf.ByteString; -import com.google.pubsub.v1.PubsubMessage; import io.autoinvestor.domain.events.Event; import io.autoinvestor.exceptions.InternalErrorException; @@ -11,6 +7,10 @@ import java.util.HashMap; import java.util.Map; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; final class EventMessageMapper { @@ -23,18 +23,16 @@ final class EventMessageMapper { PubsubMessage toMessage(Event event) { try { Map envelope = new HashMap<>(); - envelope.put("payload", event.getPayload().asMap()); - envelope.put("eventId", event.getId().value()); - envelope.put("type", event.getType()); + envelope.put("payload", event.getPayload().asMap()); + envelope.put("eventId", event.getId().value()); + envelope.put("type", event.getType()); envelope.put("aggregateId", event.getAggregateId().value()); - envelope.put("occurredAt", - Instant.ofEpochMilli(event.getOccurredAt().getTime()).toString()); - envelope.put("version", event.getVersion()); + envelope.put( + "occurredAt", Instant.ofEpochMilli(event.getOccurredAt().getTime()).toString()); + envelope.put("version", event.getVersion()); String json = objectMapper.writeValueAsString(envelope); - return PubsubMessage.newBuilder() - .setData(ByteString.copyFromUtf8(json)) - .build(); + return PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(json)).build(); } catch (JsonProcessingException ex) { throw new InternalErrorException("Failed to serialise domain event"); diff --git a/src/main/java/io/autoinvestor/infrastructure/event_publishers/InMemoryEventPublisher.java b/src/main/java/io/autoinvestor/infrastructure/event_publishers/InMemoryEventPublisher.java index e29f9ea..f2e4760 100644 --- a/src/main/java/io/autoinvestor/infrastructure/event_publishers/InMemoryEventPublisher.java +++ b/src/main/java/io/autoinvestor/infrastructure/event_publishers/InMemoryEventPublisher.java @@ -2,13 +2,14 @@ import io.autoinvestor.domain.events.Event; import io.autoinvestor.domain.events.EventPublisher; -import org.springframework.context.ApplicationEventPublisher; -import org.springframework.context.annotation.Profile; -import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.List; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.annotation.Profile; +import org.springframework.stereotype.Component; + @Component @Profile("local") public class InMemoryEventPublisher implements EventPublisher { @@ -28,6 +29,9 @@ public void publish(List> events) { public boolean hasPublishedEvent(String type, String aggregateId) { return publishedEvents.stream() - .anyMatch(event -> event.getType().equals(type) && event.getAggregateId().value().equals(aggregateId)); + .anyMatch( + event -> + event.getType().equals(type) + && event.getAggregateId().value().equals(aggregateId)); } } diff --git a/src/main/java/io/autoinvestor/infrastructure/event_publishers/PubsubEventPublisher.java b/src/main/java/io/autoinvestor/infrastructure/event_publishers/PubsubEventPublisher.java index 488a965..8c3a585 100644 --- a/src/main/java/io/autoinvestor/infrastructure/event_publishers/PubsubEventPublisher.java +++ b/src/main/java/io/autoinvestor/infrastructure/event_publishers/PubsubEventPublisher.java @@ -1,18 +1,20 @@ package io.autoinvestor.infrastructure.event_publishers; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.cloud.pubsub.v1.Publisher; -import com.google.pubsub.v1.ProjectTopicName; import io.autoinvestor.domain.events.Event; import io.autoinvestor.domain.events.EventPublisher; import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; + +import java.util.List; +import java.util.concurrent.TimeUnit; + import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Profile; import org.springframework.stereotype.Component; -import java.util.List; -import java.util.concurrent.TimeUnit; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.pubsub.v1.ProjectTopicName; @Slf4j @Component @@ -25,8 +27,8 @@ public class PubsubEventPublisher implements EventPublisher { public PubsubEventPublisher( @Value("${GCP_PROJECT}") String projectId, @Value("${PUBSUB_TOPIC}") String topic, - ObjectMapper objectMapper - ) throws Exception { + ObjectMapper objectMapper) + throws Exception { this.mapper = new EventMessageMapper(objectMapper); ProjectTopicName topicName = ProjectTopicName.of(projectId, topic); this.publisher = Publisher.newBuilder(topicName).build(); @@ -45,12 +47,17 @@ public void publish(List> events) { events.stream() .map(mapper::toMessage) - .forEach(msg -> { - publisher.publish(msg).addListener( - () -> log.debug("Published msgId={}", msg.getMessageId()), - Runnable::run - ); - }); + .forEach( + msg -> { + publisher + .publish(msg) + .addListener( + () -> + log.debug( + "Published msgId={}", + msg.getMessageId()), + Runnable::run); + }); } @PreDestroy diff --git a/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubDecisionsEventSubscriber.java b/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubDecisionsEventSubscriber.java index bdd94d6..64f48bd 100644 --- a/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubDecisionsEventSubscriber.java +++ b/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubDecisionsEventSubscriber.java @@ -1,24 +1,26 @@ package io.autoinvestor.infrastructure.listeners; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.api.core.ApiService.Listener; -import com.google.api.core.ApiService.State; -import com.google.cloud.pubsub.v1.AckReplyConsumer; -import com.google.cloud.pubsub.v1.MessageReceiver; -import com.google.cloud.pubsub.v1.Subscriber; -import com.google.pubsub.v1.ProjectSubscriptionName; -import com.google.pubsub.v1.PubsubMessage; import io.autoinvestor.application.EmitAlertsCommand; import io.autoinvestor.application.EmitAlertsCommandHandler; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; + +import java.util.Map; + import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Profile; import org.springframework.stereotype.Component; -import java.util.Map; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.core.ApiService.Listener; +import com.google.api.core.ApiService.State; +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; @Slf4j @Component @@ -37,8 +39,8 @@ public PubsubDecisionsEventSubscriber( PubsubEventMapper eventMapper, @Value("${GCP_PROJECT}") String projectId, @Value("${PUBSUB_SUBSCRIPTION_DECISION_MAKING}") String subscriptionId) { - this.commandHandler = commandHandler; - this.eventMapper = eventMapper; + this.commandHandler = commandHandler; + this.eventMapper = eventMapper; this.subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId); } @@ -49,11 +51,18 @@ public void listen() { MessageReceiver receiver = this::processMessage; this.subscriber = Subscriber.newBuilder(subscriptionName, receiver).build(); - this.subscriber.addListener(new Listener() { - @Override public void failed(State from, Throwable failure) { - log.error("Subscriber failed from state {}: {}", from, failure.toString(), failure); - } - }, Runnable::run); + this.subscriber.addListener( + new Listener() { + @Override + public void failed(State from, Throwable failure) { + log.error( + "Subscriber failed from state {}: {}", + from, + failure.toString(), + failure); + } + }, + Runnable::run); this.subscriber.startAsync().awaitRunning(); log.info("Subscriber running"); } @@ -71,34 +80,46 @@ private void processMessage(PubsubMessage message, AckReplyConsumer consumer) { log.debug("Received message msgId={} size={}B", msgId, message.getData().size()); try { - Map raw = objectMapper.readValue(message.getData().toByteArray(), new TypeReference<>() {}); + Map raw = + objectMapper.readValue( + message.getData().toByteArray(), new TypeReference<>() {}); PubsubEvent event = eventMapper.fromMap(raw); log.info("Processing event type={} msgId={}", event.getType(), msgId); if ("ASSET_DECISION_TAKEN".equals(event.getType())) { if (!event.getPayload().containsKey("assetId")) { - log.warn("Event payload missing 'assetId' field, ignoring event msgId={}", msgId); + log.warn( + "Event payload missing 'assetId' field, ignoring event msgId={}", + msgId); consumer.nack(); return; } if (!event.getPayload().containsKey("decision")) { - log.warn("Event payload missing 'decision' field, ignoring event msgId={}", msgId); + log.warn( + "Event payload missing 'decision' field, ignoring event msgId={}", + msgId); consumer.nack(); return; } if (!event.getPayload().containsKey("riskLevel")) { - log.warn("Event payload missing 'riskLevel' field, ignoring event msgId={}", msgId); + log.warn( + "Event payload missing 'riskLevel' field, ignoring event msgId={}", + msgId); consumer.nack(); return; } - EmitAlertsCommand cmd = new EmitAlertsCommand( - (String) event.getPayload().get("assetId"), - (String) event.getPayload().get("decision"), - (int) event.getPayload().get("riskLevel") - ); + EmitAlertsCommand cmd = + new EmitAlertsCommand( + (String) event.getPayload().get("assetId"), + (String) event.getPayload().get("decision"), + (int) event.getPayload().get("riskLevel")); this.commandHandler.handle(cmd); - log.info("Decision registered for asset={} decision={} riskLevel={} msgId={}", - cmd.assetId(), cmd.decision(), cmd.riskLevel(), msgId); + log.info( + "Decision registered for asset={} decision={} riskLevel={} msgId={}", + cmd.assetId(), + cmd.decision(), + cmd.riskLevel(), + msgId); } else { log.debug("Ignored unsupported event type={} msgId={}", event.getType(), msgId); } diff --git a/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubEvent.java b/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubEvent.java index 41e0dd1..b212c11 100644 --- a/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubEvent.java +++ b/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubEvent.java @@ -1,6 +1,5 @@ package io.autoinvestor.infrastructure.listeners; -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; @@ -8,9 +7,13 @@ import java.util.Map; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; @JsonIgnoreProperties(ignoreUnknown = true) -@Data @Builder @NoArgsConstructor @AllArgsConstructor +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor public class PubsubEvent { private String aggregateId; private String type; diff --git a/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubEventMapper.java b/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubEventMapper.java index 3c1384e..5b50853 100644 --- a/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubEventMapper.java +++ b/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubEventMapper.java @@ -1,12 +1,12 @@ package io.autoinvestor.infrastructure.listeners; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.PropertyNamingStrategies; import lombok.RequiredArgsConstructor; -import org.springframework.stereotype.Component; import java.util.Map; +import org.springframework.stereotype.Component; + +import com.fasterxml.jackson.databind.ObjectMapper; @Component @RequiredArgsConstructor diff --git a/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubPortfolioAssetEventSubscriber.java b/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubPortfolioAssetEventSubscriber.java index db54fdc..e82bddd 100644 --- a/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubPortfolioAssetEventSubscriber.java +++ b/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubPortfolioAssetEventSubscriber.java @@ -1,25 +1,26 @@ package io.autoinvestor.infrastructure.listeners; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.api.core.ApiService.Listener; -import com.google.api.core.ApiService.State; -import com.google.cloud.pubsub.v1.AckReplyConsumer; -import com.google.cloud.pubsub.v1.MessageReceiver; -import com.google.cloud.pubsub.v1.Subscriber; -import com.google.pubsub.v1.ProjectSubscriptionName; -import com.google.pubsub.v1.PubsubMessage; import io.autoinvestor.application.RegisterPortfolioAssetCommand; import io.autoinvestor.application.RegisterPortfolioAssetCommandHandler; -import io.autoinvestor.application.RegisterUserCommand; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; + +import java.util.Map; + import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Profile; import org.springframework.stereotype.Component; -import java.util.Map; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.core.ApiService.Listener; +import com.google.api.core.ApiService.State; +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; @Slf4j @Component @@ -38,8 +39,8 @@ public PubsubPortfolioAssetEventSubscriber( PubsubEventMapper eventMapper, @Value("${GCP_PROJECT}") String projectId, @Value("${PUBSUB_SUBSCRIPTION_PORTFOLIO}") String subscriptionId) { - this.commandHandler = commandHandler; - this.eventMapper = eventMapper; + this.commandHandler = commandHandler; + this.eventMapper = eventMapper; this.subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId); } @@ -50,11 +51,18 @@ public void listen() { MessageReceiver receiver = this::processMessage; this.subscriber = Subscriber.newBuilder(subscriptionName, receiver).build(); - this.subscriber.addListener(new Listener() { - @Override public void failed(State from, Throwable failure) { - log.error("Subscriber failed from state {}: {}", from, failure.toString(), failure); // ERROR - } - }, Runnable::run); + this.subscriber.addListener( + new Listener() { + @Override + public void failed(State from, Throwable failure) { + log.error( + "Subscriber failed from state {}: {}", + from, + failure.toString(), + failure); // ERROR + } + }, + Runnable::run); this.subscriber.startAsync().awaitRunning(); log.info("Subscriber running"); } @@ -72,25 +80,34 @@ private void processMessage(PubsubMessage message, AckReplyConsumer consumer) { log.debug("Received message msgId={} size={}B", msgId, message.getData().size()); try { - Map raw = objectMapper.readValue(message.getData().toByteArray(), new TypeReference<>() {}); + Map raw = + objectMapper.readValue( + message.getData().toByteArray(), new TypeReference<>() {}); PubsubEvent event = eventMapper.fromMap(raw); log.info("Processing event type={} msgId={}", event.getType(), msgId); if ("PORTFOLIO_ASSET_ADDED".equals(event.getType())) { - if (event.getAggregateId() == null || event.getPayload() == null || - !event.getPayload().containsKey("userId") || !event.getPayload().containsKey("assetId")) { - log.warn("Malformed event: Skipping PORTFOLIO_ASSET_ADDED event with missing fields msgId={}", msgId); + if (event.getAggregateId() == null + || event.getPayload() == null + || !event.getPayload().containsKey("userId") + || !event.getPayload().containsKey("assetId")) { + log.warn( + "Malformed event: Skipping PORTFOLIO_ASSET_ADDED event with missing fields msgId={}", + msgId); consumer.ack(); return; } - RegisterPortfolioAssetCommand cmd = new RegisterPortfolioAssetCommand( - (String) event.getPayload().get("userId"), - (String) event.getPayload().get("assetId") - ); + RegisterPortfolioAssetCommand cmd = + new RegisterPortfolioAssetCommand( + (String) event.getPayload().get("userId"), + (String) event.getPayload().get("assetId")); this.commandHandler.handle(cmd); - log.info("Decision registered for userId={} assetId={} msgId={}", - cmd.userId(), cmd.assetId(), msgId); + log.info( + "Decision registered for userId={} assetId={} msgId={}", + cmd.userId(), + cmd.assetId(), + msgId); } else { log.debug("Ignored unsupported event type={} msgId={}", event.getType(), msgId); } diff --git a/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubUsersEventSubscriber.java b/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubUsersEventSubscriber.java index 179e827..7cbc6bd 100644 --- a/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubUsersEventSubscriber.java +++ b/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubUsersEventSubscriber.java @@ -1,24 +1,26 @@ package io.autoinvestor.infrastructure.listeners; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.api.core.ApiService.Listener; -import com.google.api.core.ApiService.State; -import com.google.cloud.pubsub.v1.AckReplyConsumer; -import com.google.cloud.pubsub.v1.MessageReceiver; -import com.google.cloud.pubsub.v1.Subscriber; -import com.google.pubsub.v1.ProjectSubscriptionName; -import com.google.pubsub.v1.PubsubMessage; import io.autoinvestor.application.RegisterUserCommand; import io.autoinvestor.application.RegisterUserCommandHandler; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; + +import java.util.Map; + import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Profile; import org.springframework.stereotype.Component; -import java.util.Map; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.core.ApiService.Listener; +import com.google.api.core.ApiService.State; +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; @Slf4j @Component @@ -37,8 +39,8 @@ public PubsubUsersEventSubscriber( PubsubEventMapper eventMapper, @Value("${GCP_PROJECT}") String projectId, @Value("${PUBSUB_SUBSCRIPTION_USERS}") String subscriptionId) { - this.commandHandler = commandHandler; - this.eventMapper = eventMapper; + this.commandHandler = commandHandler; + this.eventMapper = eventMapper; this.subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId); } @@ -49,11 +51,18 @@ public void listen() { MessageReceiver receiver = this::processMessage; this.subscriber = Subscriber.newBuilder(subscriptionName, receiver).build(); - this.subscriber.addListener(new Listener() { - @Override public void failed(State from, Throwable failure) { - log.error("Subscriber failed from state {}: {}", from, failure.toString(), failure); // ERROR - } - }, Runnable::run); + this.subscriber.addListener( + new Listener() { + @Override + public void failed(State from, Throwable failure) { + log.error( + "Subscriber failed from state {}: {}", + from, + failure.toString(), + failure); // ERROR + } + }, + Runnable::run); this.subscriber.startAsync().awaitRunning(); log.info("Subscriber running"); } @@ -71,24 +80,31 @@ private void processMessage(PubsubMessage message, AckReplyConsumer consumer) { log.debug("Received message msgId={} size={}B", msgId, message.getData().size()); try { - Map raw = objectMapper.readValue(message.getData().toByteArray(), new TypeReference<>() {}); + Map raw = + objectMapper.readValue( + message.getData().toByteArray(), new TypeReference<>() {}); PubsubEvent event = eventMapper.fromMap(raw); log.info("Processing event type={} msgId={}", event.getType(), msgId); if ("USER_CREATED".equals(event.getType())) { - if (event.getAggregateId() == null || !event.getPayload().containsKey("riskLevel")) { - log.warn("Malformed event: Skipping USER_CREATED event with missing aggregateId or riskLevel msgId={}", msgId); + if (event.getAggregateId() == null + || !event.getPayload().containsKey("riskLevel")) { + log.warn( + "Malformed event: Skipping USER_CREATED event with missing aggregateId or riskLevel msgId={}", + msgId); consumer.ack(); return; } - RegisterUserCommand cmd = new RegisterUserCommand( - event.getAggregateId(), - (int) event.getPayload().get("riskLevel") - ); + RegisterUserCommand cmd = + new RegisterUserCommand( + event.getAggregateId(), (int) event.getPayload().get("riskLevel")); this.commandHandler.handle(cmd); - log.info("Decision registered for userId={} riskLevel={} msgId={}", - cmd.userId(), cmd.riskLevel(), msgId); + log.info( + "Decision registered for userId={} riskLevel={} msgId={}", + cmd.userId(), + cmd.riskLevel(), + msgId); } else { log.debug("Ignored unsupported event type={} msgId={}", event.getType(), msgId); } diff --git a/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/DecisionDocument.java b/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/DecisionDocument.java index a7eae2e..bacd072 100644 --- a/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/DecisionDocument.java +++ b/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/DecisionDocument.java @@ -2,13 +2,11 @@ import lombok.Getter; import lombok.Setter; -import org.springframework.data.annotation.Id; -import org.springframework.data.mongodb.core.index.CompoundIndex; -import org.springframework.data.mongodb.core.index.CompoundIndexes; -import org.springframework.data.mongodb.core.mapping.Document; import java.util.Date; +import org.springframework.data.mongodb.core.mapping.Document; + @Setter @Getter @Document(collection = "alerts") @@ -18,7 +16,7 @@ public class DecisionDocument { private String type; private Date date; - public DecisionDocument() { } + public DecisionDocument() {} public DecisionDocument(String userId, String assetId, String type, Date date) { this.userId = userId; diff --git a/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/DecisionMapper.java b/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/DecisionMapper.java index bfc4109..5e2e901 100644 --- a/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/DecisionMapper.java +++ b/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/DecisionMapper.java @@ -1,27 +1,17 @@ package io.autoinvestor.infrastructure.read_models.alerts; import io.autoinvestor.application.AlertDTO; -import org.springframework.stereotype.Component; +import org.springframework.stereotype.Component; @Component public class DecisionMapper { public DecisionDocument toDocument(AlertDTO dto) { - return new DecisionDocument( - dto.userId(), - dto.assetId(), - dto.type(), - dto.date() - ); + return new DecisionDocument(dto.userId(), dto.assetId(), dto.type(), dto.date()); } public AlertDTO toDTO(DecisionDocument doc) { - return new AlertDTO( - doc.getUserId(), - doc.getAssetId(), - doc.getType(), - doc.getDate() - ); + return new AlertDTO(doc.getUserId(), doc.getAssetId(), doc.getType(), doc.getDate()); } } diff --git a/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/InMemoryAlertsReadModelRepository.java b/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/InMemoryAlertsReadModelRepository.java index fd49908..5f435c6 100644 --- a/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/InMemoryAlertsReadModelRepository.java +++ b/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/InMemoryAlertsReadModelRepository.java @@ -2,24 +2,38 @@ import io.autoinvestor.application.AlertDTO; import io.autoinvestor.application.AlertsReadModelRepository; -import org.springframework.context.annotation.Profile; -import org.springframework.stereotype.Repository; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.stream.Collectors; +import org.springframework.context.annotation.Profile; +import org.springframework.stereotype.Repository; + @Repository @Profile("local") public class InMemoryAlertsReadModelRepository implements AlertsReadModelRepository { - private final List alerts = new ArrayList<>(List.of( - new AlertDTO("user1", "BTC", "SELL", new Date(System.currentTimeMillis() - 86_400_000L)), - new AlertDTO("user1", "ETH", "HOLD", new Date(System.currentTimeMillis() - 3_600_000L)), - new AlertDTO("user2", "AAPL","SELL", new Date()), - new AlertDTO("user3", "GOOG","BUY", new Date(System.currentTimeMillis() - 7_200_000L)) - )); + private final List alerts = + new ArrayList<>( + List.of( + new AlertDTO( + "user1", + "BTC", + "SELL", + new Date(System.currentTimeMillis() - 86_400_000L)), + new AlertDTO( + "user1", + "ETH", + "HOLD", + new Date(System.currentTimeMillis() - 3_600_000L)), + new AlertDTO("user2", "AAPL", "SELL", new Date()), + new AlertDTO( + "user3", + "GOOG", + "BUY", + new Date(System.currentTimeMillis() - 7_200_000L)))); @Override public void save(AlertDTO alertDTO) { diff --git a/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/MongoAlertsReadModelRepository.java b/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/MongoAlertsReadModelRepository.java index 5d3cecf..b9183d3 100644 --- a/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/MongoAlertsReadModelRepository.java +++ b/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/MongoAlertsReadModelRepository.java @@ -2,18 +2,15 @@ import io.autoinvestor.application.AlertDTO; import io.autoinvestor.application.AlertsReadModelRepository; + +import java.util.List; + import org.springframework.context.annotation.Profile; -import org.springframework.data.domain.Sort; import org.springframework.data.mongodb.core.MongoTemplate; -import org.springframework.data.mongodb.core.index.Index; -import org.springframework.data.mongodb.core.index.IndexOperations; import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Query; import org.springframework.stereotype.Repository; -import java.util.List; - - @Repository @Profile("prod") public class MongoAlertsReadModelRepository implements AlertsReadModelRepository { @@ -35,11 +32,8 @@ public void save(AlertDTO alertDTO) { @Override public List get(String userId) { - Query query = Query.query( - Criteria.where("userId").is(userId) - ); - return template.find(query, DecisionDocument.class, COLLECTION) - .stream() + Query query = Query.query(Criteria.where("userId").is(userId)); + return template.find(query, DecisionDocument.class, COLLECTION).stream() .map(mapper::toDTO) .toList(); } diff --git a/src/main/java/io/autoinvestor/infrastructure/read_models/users/DecisionDocument.java b/src/main/java/io/autoinvestor/infrastructure/read_models/users/DecisionDocument.java index b960622..7546d70 100644 --- a/src/main/java/io/autoinvestor/infrastructure/read_models/users/DecisionDocument.java +++ b/src/main/java/io/autoinvestor/infrastructure/read_models/users/DecisionDocument.java @@ -2,25 +2,20 @@ import lombok.Getter; import lombok.Setter; + import org.springframework.data.annotation.Id; -import org.springframework.data.mongodb.core.index.CompoundIndex; -import org.springframework.data.mongodb.core.index.CompoundIndexes; import org.springframework.data.mongodb.core.mapping.Document; -import java.util.Date; - @Setter @Getter @Document(collection = "inbox") public class DecisionDocument { - @Id - private String userId; + @Id private String userId; private String inboxId; - public DecisionDocument() { } + public DecisionDocument() {} - public DecisionDocument(String userId, - String inboxId) { + public DecisionDocument(String userId, String inboxId) { this.userId = userId; this.inboxId = inboxId; } diff --git a/src/main/java/io/autoinvestor/infrastructure/read_models/users/InMemoryInboxReadModelRepository.java b/src/main/java/io/autoinvestor/infrastructure/read_models/users/InMemoryInboxReadModelRepository.java index e67e80b..394aa72 100644 --- a/src/main/java/io/autoinvestor/infrastructure/read_models/users/InMemoryInboxReadModelRepository.java +++ b/src/main/java/io/autoinvestor/infrastructure/read_models/users/InMemoryInboxReadModelRepository.java @@ -3,13 +3,14 @@ import io.autoinvestor.application.InboxReadModelRepository; import io.autoinvestor.domain.model.InboxId; import io.autoinvestor.domain.model.UserId; -import org.springframework.context.annotation.Profile; -import org.springframework.stereotype.Repository; import java.util.HashMap; import java.util.Map; import java.util.Optional; +import org.springframework.context.annotation.Profile; +import org.springframework.stereotype.Repository; + @Repository @Profile("local") public class InMemoryInboxReadModelRepository implements InboxReadModelRepository { @@ -29,8 +30,6 @@ public void save(UserId userId, InboxId inboxId) { @Override public Optional getInboxId(UserId userId) { String raw = inbox.get(userId.value()); - return raw != null - ? Optional.of(InboxId.from(raw)) - : Optional.empty(); + return raw != null ? Optional.of(InboxId.from(raw)) : Optional.empty(); } } diff --git a/src/main/java/io/autoinvestor/infrastructure/read_models/users/MongoInboxReadModelRepository.java b/src/main/java/io/autoinvestor/infrastructure/read_models/users/MongoInboxReadModelRepository.java index aff4fde..979a564 100644 --- a/src/main/java/io/autoinvestor/infrastructure/read_models/users/MongoInboxReadModelRepository.java +++ b/src/main/java/io/autoinvestor/infrastructure/read_models/users/MongoInboxReadModelRepository.java @@ -3,12 +3,13 @@ import io.autoinvestor.application.InboxReadModelRepository; import io.autoinvestor.domain.model.InboxId; import io.autoinvestor.domain.model.UserId; + +import java.util.Optional; + import org.springframework.context.annotation.Profile; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.stereotype.Repository; -import java.util.Optional; - @Repository @Profile("prod") public class MongoInboxReadModelRepository implements InboxReadModelRepository { diff --git a/src/main/java/io/autoinvestor/infrastructure/repositories/event_store/EventDocument.java b/src/main/java/io/autoinvestor/infrastructure/repositories/event_store/EventDocument.java index 4f2af5d..5b7cd75 100644 --- a/src/main/java/io/autoinvestor/infrastructure/repositories/event_store/EventDocument.java +++ b/src/main/java/io/autoinvestor/infrastructure/repositories/event_store/EventDocument.java @@ -2,49 +2,45 @@ import lombok.Getter; import lombok.Setter; -import org.springframework.data.annotation.Id; -import org.springframework.data.mongodb.core.mapping.Document; -import org.springframework.data.mongodb.core.mapping.Field; import java.util.Date; import java.util.Map; +import org.springframework.data.annotation.Id; +import org.springframework.data.mongodb.core.mapping.Document; +import org.springframework.data.mongodb.core.mapping.Field; + @Getter @Setter @Document(collection = "events") public class EventDocument { - @Id - private String id; + @Id private String id; - @Field - private String aggregateId; + @Field private String aggregateId; - @Field - private String type; + @Field private String type; - @Field - private Map payload; + @Field private Map payload; - @Field - private Date occurredAt; + @Field private Date occurredAt; - @Field - private int version; + @Field private int version; - public EventDocument() { } + public EventDocument() {} - public EventDocument(String id, - String aggregateId, - String type, - Map payload, - Date occurredAt, - int version) { - this.id = id; + public EventDocument( + String id, + String aggregateId, + String type, + Map payload, + Date occurredAt, + int version) { + this.id = id; this.aggregateId = aggregateId; - this.type = type; - this.payload = payload; - this.occurredAt = occurredAt; - this.version = version; + this.type = type; + this.payload = payload; + this.occurredAt = occurredAt; + this.version = version; } } diff --git a/src/main/java/io/autoinvestor/infrastructure/repositories/event_store/EventMapper.java b/src/main/java/io/autoinvestor/infrastructure/repositories/event_store/EventMapper.java index d4a941f..6e2a13b 100644 --- a/src/main/java/io/autoinvestor/infrastructure/repositories/event_store/EventMapper.java +++ b/src/main/java/io/autoinvestor/infrastructure/repositories/event_store/EventMapper.java @@ -1,14 +1,16 @@ package io.autoinvestor.infrastructure.repositories.event_store; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; import io.autoinvestor.domain.events.*; import io.autoinvestor.domain.model.UserId; -import org.springframework.stereotype.Component; import java.util.Date; import java.util.Map; +import org.springframework.stereotype.Component; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; + @Component public class EventMapper { @@ -24,8 +26,7 @@ public

EventDocument toDocument(Event

evt) { evt.getType(), payloadMap, evt.getOccurredAt(), - evt.getVersion() - ); + evt.getVersion()); } public Event toDomain(EventDocument doc) { @@ -53,9 +54,7 @@ public Event toDomain(EventDocument doc) { return AlertEmittedEvent.hydrate(id, aggId, payload, occurred, version); } - default -> throw new IllegalArgumentException( - "Unknown event type: " + doc.getType() - ); + default -> throw new IllegalArgumentException("Unknown event type: " + doc.getType()); } } } diff --git a/src/main/java/io/autoinvestor/infrastructure/repositories/event_store/InMemoryEventStoreRepository.java b/src/main/java/io/autoinvestor/infrastructure/repositories/event_store/InMemoryEventStoreRepository.java index 3ef8cbd..0cdadb5 100644 --- a/src/main/java/io/autoinvestor/infrastructure/repositories/event_store/InMemoryEventStoreRepository.java +++ b/src/main/java/io/autoinvestor/infrastructure/repositories/event_store/InMemoryEventStoreRepository.java @@ -4,15 +4,16 @@ import io.autoinvestor.domain.events.EventStoreRepository; import io.autoinvestor.domain.model.Inbox; import io.autoinvestor.domain.model.InboxId; -import org.springframework.context.annotation.Profile; -import org.springframework.stereotype.Repository; +import java.util.Comparator; import java.util.List; import java.util.Optional; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.Comparator; import java.util.stream.Collectors; +import org.springframework.context.annotation.Profile; +import org.springframework.stereotype.Repository; + @Repository @Profile("local") public class InMemoryEventStoreRepository implements EventStoreRepository { @@ -26,10 +27,11 @@ public void save(Inbox inbox) { @Override public Optional get(InboxId inboxId) { - List> events = eventStore.stream() - .filter(e -> e.getAggregateId().value().equals(inboxId.value())) - .sorted(Comparator.comparingLong(Event::getVersion)) - .collect(Collectors.toList()); + List> events = + eventStore.stream() + .filter(e -> e.getAggregateId().value().equals(inboxId.value())) + .sorted(Comparator.comparingLong(Event::getVersion)) + .collect(Collectors.toList()); if (events.isEmpty()) { return Optional.empty(); diff --git a/src/main/java/io/autoinvestor/infrastructure/repositories/event_store/MongoEventStoreRepository.java b/src/main/java/io/autoinvestor/infrastructure/repositories/event_store/MongoEventStoreRepository.java index ef3c5d4..78c87b7 100644 --- a/src/main/java/io/autoinvestor/infrastructure/repositories/event_store/MongoEventStoreRepository.java +++ b/src/main/java/io/autoinvestor/infrastructure/repositories/event_store/MongoEventStoreRepository.java @@ -4,7 +4,11 @@ import io.autoinvestor.domain.events.EventStoreRepository; import io.autoinvestor.domain.model.Inbox; import io.autoinvestor.domain.model.InboxId; -import io.autoinvestor.domain.model.UserId; + +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + import org.springframework.context.annotation.Profile; import org.springframework.data.domain.Sort; import org.springframework.data.mongodb.core.MongoTemplate; @@ -12,10 +16,6 @@ import org.springframework.data.mongodb.core.query.Query; import org.springframework.stereotype.Repository; -import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; - @Repository @Profile("prod") public class MongoEventStoreRepository implements EventStoreRepository { @@ -31,20 +31,18 @@ public MongoEventStoreRepository(MongoTemplate template, EventMapper mapper) { @Override public void save(Inbox inbox) { - List docs = inbox.getUncommittedEvents() - .stream() - .map(mapper::toDocument) - .collect(Collectors.toList()); + List docs = + inbox.getUncommittedEvents().stream() + .map(mapper::toDocument) + .collect(Collectors.toList()); template.insertAll(docs); } @Override public Optional get(InboxId inboxId) { - Query q = Query.query( - Criteria.where("aggregateId") - .is(inboxId.value()) - ) - .with(Sort.by("version")); + Query q = + Query.query(Criteria.where("aggregateId").is(inboxId.value())) + .with(Sort.by("version")); List docs = template.find(q, EventDocument.class, COLLECTION); @@ -52,9 +50,7 @@ public Optional get(InboxId inboxId) { return Optional.empty(); } - List> events = docs.stream() - .map(mapper::toDomain) - .collect(Collectors.toList()); + List> events = docs.stream().map(mapper::toDomain).collect(Collectors.toList()); if (events.isEmpty()) { return Optional.empty(); @@ -65,10 +61,7 @@ public Optional get(InboxId inboxId) { @Override public boolean exists(InboxId inboxId) { - Query q = Query.query( - Criteria.where("aggregateId") - .is(inboxId.value()) - ); + Query q = Query.query(Criteria.where("aggregateId").is(inboxId.value())); return template.exists(q, EventDocument.class, COLLECTION); } diff --git a/src/main/java/io/autoinvestor/infrastructure/repositories/portfolio/InMemoryPortfolioRepository.java b/src/main/java/io/autoinvestor/infrastructure/repositories/portfolio/InMemoryPortfolioRepository.java index 0e159a9..3c4715c 100644 --- a/src/main/java/io/autoinvestor/infrastructure/repositories/portfolio/InMemoryPortfolioRepository.java +++ b/src/main/java/io/autoinvestor/infrastructure/repositories/portfolio/InMemoryPortfolioRepository.java @@ -2,13 +2,14 @@ import io.autoinvestor.domain.PortfolioRepository; import io.autoinvestor.domain.model.UserId; -import org.springframework.context.annotation.Profile; -import org.springframework.stereotype.Repository; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; +import org.springframework.context.annotation.Profile; +import org.springframework.stereotype.Repository; + @Repository @Profile("local") public class InMemoryPortfolioRepository implements PortfolioRepository { diff --git a/src/main/java/io/autoinvestor/infrastructure/repositories/portfolio/MongoPortfolioRepository.java b/src/main/java/io/autoinvestor/infrastructure/repositories/portfolio/MongoPortfolioRepository.java index 69d264b..ddf2107 100644 --- a/src/main/java/io/autoinvestor/infrastructure/repositories/portfolio/MongoPortfolioRepository.java +++ b/src/main/java/io/autoinvestor/infrastructure/repositories/portfolio/MongoPortfolioRepository.java @@ -1,21 +1,23 @@ package io.autoinvestor.infrastructure.repositories.portfolio; +import static org.springframework.data.mongodb.core.aggregation.Aggregation.*; + import io.autoinvestor.domain.PortfolioRepository; import io.autoinvestor.domain.model.UserId; import lombok.Getter; import lombok.Setter; + +import java.util.List; +import java.util.stream.Collectors; + import org.springframework.context.annotation.Profile; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.aggregation.Aggregation; +import org.springframework.data.mongodb.core.aggregation.AggregationResults; import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Query; -import org.springframework.data.mongodb.core.aggregation.AggregationResults; -import static org.springframework.data.mongodb.core.aggregation.Aggregation.*; import org.springframework.stereotype.Repository; -import java.util.List; -import java.util.stream.Collectors; - @Repository @Profile("prod") public class MongoPortfolioRepository implements PortfolioRepository { @@ -30,16 +32,13 @@ public MongoPortfolioRepository(MongoTemplate template) { @Override public List getUsersIdByAssetAndRiskLevel(String assetId, int riskLevel) { - Aggregation agg = newAggregation( - match(Criteria.where("assetId").is(assetId)), - lookup(USERS_COLLECTION, - "userId", - "userId", - "userDocs"), - unwind("userDocs"), - match(Criteria.where("userDocs.riskLevel").is(riskLevel)), - project("userId") - ); + Aggregation agg = + newAggregation( + match(Criteria.where("assetId").is(assetId)), + lookup(USERS_COLLECTION, "userId", "userId", "userDocs"), + unwind("userDocs"), + match(Criteria.where("userDocs.riskLevel").is(riskLevel)), + project("userId")); AggregationResults results = template.aggregate(agg, PORTFOLIO_COLLECTION, IdProjection.class); @@ -68,9 +67,7 @@ public void addPortfolioAsset(String userId, String assetId) { @Override public boolean existsPortfolioAsset(String userId, String assetId) { return template.exists( - Query.query(Criteria.where("userId").is(userId) - .and("assetId").is(assetId)), - PortfolioDocument.class - ); + Query.query(Criteria.where("userId").is(userId).and("assetId").is(assetId)), + PortfolioDocument.class); } } diff --git a/src/main/java/io/autoinvestor/infrastructure/repositories/portfolio/PortfolioDocument.java b/src/main/java/io/autoinvestor/infrastructure/repositories/portfolio/PortfolioDocument.java index 712b600..e1b1c9b 100644 --- a/src/main/java/io/autoinvestor/infrastructure/repositories/portfolio/PortfolioDocument.java +++ b/src/main/java/io/autoinvestor/infrastructure/repositories/portfolio/PortfolioDocument.java @@ -3,6 +3,7 @@ import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; + import org.springframework.data.annotation.Id; import org.springframework.data.mongodb.core.mapping.Document; @@ -11,8 +12,7 @@ @AllArgsConstructor @Document(collection = "portfolio") public class PortfolioDocument { - @Id - private String id; + @Id private String id; private String userId; private String assetId; -} \ No newline at end of file +} diff --git a/src/main/java/io/autoinvestor/infrastructure/repositories/portfolio/UserDocument.java b/src/main/java/io/autoinvestor/infrastructure/repositories/portfolio/UserDocument.java index 46fad71..15cda79 100644 --- a/src/main/java/io/autoinvestor/infrastructure/repositories/portfolio/UserDocument.java +++ b/src/main/java/io/autoinvestor/infrastructure/repositories/portfolio/UserDocument.java @@ -3,6 +3,7 @@ import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; + import org.springframework.data.annotation.Id; import org.springframework.data.mongodb.core.mapping.Document; @@ -11,8 +12,7 @@ @AllArgsConstructor @Document(collection = "users") public class UserDocument { - @Id - private String id; + @Id private String id; private String userId; private int riskLevel; } diff --git a/src/main/java/io/autoinvestor/ui/GetAlertsController.java b/src/main/java/io/autoinvestor/ui/GetAlertsController.java index 5559a65..2b2594e 100644 --- a/src/main/java/io/autoinvestor/ui/GetAlertsController.java +++ b/src/main/java/io/autoinvestor/ui/GetAlertsController.java @@ -1,13 +1,14 @@ package io.autoinvestor.ui; -import io.autoinvestor.application.GetDecisionsQuery; import io.autoinvestor.application.GetAlertsQueryHandler; import io.autoinvestor.application.GetAlertsQueryResponse; -import org.springframework.http.ResponseEntity; -import org.springframework.web.bind.annotation.*; +import io.autoinvestor.application.GetDecisionsQuery; import java.util.List; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.*; + @RestController @RequestMapping("/alerts") public class GetAlertsController { @@ -19,19 +20,16 @@ public GetAlertsController(GetAlertsQueryHandler handler) { } @GetMapping - public ResponseEntity> getAlerts(@RequestHeader(value = "X-User-Id") String userId) { - - List queryResponse = this.handler.handle( - new GetDecisionsQuery(userId) - ); - - List dto = queryResponse.stream() - .map(d -> new GetAlertsDTO( - d.assetId(), - d.type(), - d.date() - )) - .toList(); + public ResponseEntity> getAlerts( + @RequestHeader(value = "X-User-Id") String userId) { + + List queryResponse = + this.handler.handle(new GetDecisionsQuery(userId)); + + List dto = + queryResponse.stream() + .map(d -> new GetAlertsDTO(d.assetId(), d.type(), d.date())) + .toList(); return ResponseEntity.ok(dto); } diff --git a/src/main/java/io/autoinvestor/ui/GetAlertsDTO.java b/src/main/java/io/autoinvestor/ui/GetAlertsDTO.java index 41e66a9..12b0ae3 100644 --- a/src/main/java/io/autoinvestor/ui/GetAlertsDTO.java +++ b/src/main/java/io/autoinvestor/ui/GetAlertsDTO.java @@ -2,5 +2,4 @@ import java.util.Date; -public record GetAlertsDTO(String assetId, String type, Date date) { -} +public record GetAlertsDTO(String assetId, String type, Date date) {} From 511ab6561140f63aafa5062e88e1403475be8d38 Mon Sep 17 00:00:00 2001 From: Pol Pinol Castuera Date: Sat, 31 May 2025 10:55:21 +0200 Subject: [PATCH 03/13] Reduce duplicated code --- .../AbstractPubsubEventSubscriber.java | 95 +++++++++++++ .../PubsubDecisionsEventSubscriber.java | 128 ++++-------------- .../PubsubPortfolioAssetEventSubscriber.java | 117 ++++------------ .../listeners/PubsubUsersEventSubscriber.java | 107 ++++----------- 4 files changed, 179 insertions(+), 268 deletions(-) create mode 100644 src/main/java/io/autoinvestor/infrastructure/listeners/AbstractPubsubEventSubscriber.java diff --git a/src/main/java/io/autoinvestor/infrastructure/listeners/AbstractPubsubEventSubscriber.java b/src/main/java/io/autoinvestor/infrastructure/listeners/AbstractPubsubEventSubscriber.java new file mode 100644 index 0000000..ae2c286 --- /dev/null +++ b/src/main/java/io/autoinvestor/infrastructure/listeners/AbstractPubsubEventSubscriber.java @@ -0,0 +1,95 @@ +package io.autoinvestor.infrastructure.listeners; + +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import lombok.extern.slf4j.Slf4j; + +import java.util.Map; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.core.ApiService.Listener; +import com.google.api.core.ApiService.State; +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; + +@Slf4j +public abstract class AbstractPubsubEventSubscriber { + + private final PubsubEventMapper eventMapper; + private final ProjectSubscriptionName subscriptionName; + private final ObjectMapper objectMapper = new ObjectMapper(); + private Subscriber subscriber; + + protected AbstractPubsubEventSubscriber( + PubsubEventMapper eventMapper, String projectId, String subscriptionId) { + this.eventMapper = eventMapper; + this.subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId); + } + + @PostConstruct + public void listen() { + log.info("Starting Pub/Sub subscriber for {}", subscriptionName); + + MessageReceiver receiver = this::processMessage; + this.subscriber = Subscriber.newBuilder(subscriptionName, receiver).build(); + + this.subscriber.addListener( + new Listener() { + @Override + public void failed(State from, Throwable failure) { + log.error( + "Subscriber failed from state {}: {}", + from, + failure.toString(), + failure); + } + }, + Runnable::run); + + this.subscriber.startAsync().awaitRunning(); + log.info("Subscriber running"); + } + + @PreDestroy + public void stop() { + if (this.subscriber != null) { + log.info("Stopping subscriber..."); + this.subscriber.stopAsync(); + } + } + + private void processMessage(PubsubMessage message, AckReplyConsumer consumer) { + String msgId = message.getMessageId(); + log.debug("Received message msgId={} size={}B", msgId, message.getData().size()); + + try { + // 1) deserialize into Map + Map raw = + objectMapper.readValue( + message.getData().toByteArray(), new TypeReference<>() {}); + + // 2) convert into our domain‐level PubsubEvent + PubsubEvent event = eventMapper.fromMap(raw); + log.info("Processing event type={} msgId={}", event.getType(), msgId); + + // 3) only dispatch if it matches the one type this subscriber cares about + if (getEventType().equals(event.getType())) { + handleEvent(event, msgId, consumer); + } else { + log.debug("Ignored unsupported event type={} msgId={}", event.getType(), msgId); + consumer.ack(); + } + } catch (Exception ex) { + log.error("Failed to handle msgId={} — nacking", msgId, ex); + consumer.nack(); + } + } + + protected abstract String getEventType(); + + protected abstract void handleEvent(PubsubEvent event, String msgId, AckReplyConsumer consumer); +} diff --git a/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubDecisionsEventSubscriber.java b/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubDecisionsEventSubscriber.java index 64f48bd..3695ec5 100644 --- a/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubDecisionsEventSubscriber.java +++ b/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubDecisionsEventSubscriber.java @@ -2,8 +2,6 @@ import io.autoinvestor.application.EmitAlertsCommand; import io.autoinvestor.application.EmitAlertsCommandHandler; -import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import java.util.Map; @@ -12,122 +10,56 @@ import org.springframework.context.annotation.Profile; import org.springframework.stereotype.Component; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.api.core.ApiService.Listener; -import com.google.api.core.ApiService.State; import com.google.cloud.pubsub.v1.AckReplyConsumer; -import com.google.cloud.pubsub.v1.MessageReceiver; -import com.google.cloud.pubsub.v1.Subscriber; -import com.google.pubsub.v1.ProjectSubscriptionName; -import com.google.pubsub.v1.PubsubMessage; @Slf4j @Component @Profile("prod") -public class PubsubDecisionsEventSubscriber { +public class PubsubDecisionsEventSubscriber extends AbstractPubsubEventSubscriber { private final EmitAlertsCommandHandler commandHandler; - private final PubsubEventMapper eventMapper; - private final ProjectSubscriptionName subscriptionName; - private final ObjectMapper objectMapper = new ObjectMapper(); - - private Subscriber subscriber; public PubsubDecisionsEventSubscriber( EmitAlertsCommandHandler commandHandler, PubsubEventMapper eventMapper, @Value("${GCP_PROJECT}") String projectId, @Value("${PUBSUB_SUBSCRIPTION_DECISION_MAKING}") String subscriptionId) { + super(eventMapper, projectId, subscriptionId); this.commandHandler = commandHandler; - this.eventMapper = eventMapper; - this.subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId); } - @PostConstruct - public void listen() { - log.info("Starting Pub/Sub subscriber for {}", subscriptionName); - - MessageReceiver receiver = this::processMessage; - - this.subscriber = Subscriber.newBuilder(subscriptionName, receiver).build(); - this.subscriber.addListener( - new Listener() { - @Override - public void failed(State from, Throwable failure) { - log.error( - "Subscriber failed from state {}: {}", - from, - failure.toString(), - failure); - } - }, - Runnable::run); - this.subscriber.startAsync().awaitRunning(); - log.info("Subscriber running"); + @Override + protected String getEventType() { + return "ASSET_DECISION_TAKEN"; } - @PreDestroy - public void stop() { - if (this.subscriber != null) { - log.info("Stopping subscriber..."); - this.subscriber.stopAsync(); - } - } - - private void processMessage(PubsubMessage message, AckReplyConsumer consumer) { - String msgId = message.getMessageId(); - log.debug("Received message msgId={} size={}B", msgId, message.getData().size()); - - try { - Map raw = - objectMapper.readValue( - message.getData().toByteArray(), new TypeReference<>() {}); - PubsubEvent event = eventMapper.fromMap(raw); - log.info("Processing event type={} msgId={}", event.getType(), msgId); - - if ("ASSET_DECISION_TAKEN".equals(event.getType())) { - if (!event.getPayload().containsKey("assetId")) { - log.warn( - "Event payload missing 'assetId' field, ignoring event msgId={}", - msgId); - consumer.nack(); - return; - } - if (!event.getPayload().containsKey("decision")) { - log.warn( - "Event payload missing 'decision' field, ignoring event msgId={}", - msgId); - consumer.nack(); - return; - } - if (!event.getPayload().containsKey("riskLevel")) { - log.warn( - "Event payload missing 'riskLevel' field, ignoring event msgId={}", - msgId); - consumer.nack(); - return; - } - EmitAlertsCommand cmd = - new EmitAlertsCommand( - (String) event.getPayload().get("assetId"), - (String) event.getPayload().get("decision"), - (int) event.getPayload().get("riskLevel")); - this.commandHandler.handle(cmd); - log.info( - "Decision registered for asset={} decision={} riskLevel={} msgId={}", - cmd.assetId(), - cmd.decision(), - cmd.riskLevel(), - msgId); - } else { - log.debug("Ignored unsupported event type={} msgId={}", event.getType(), msgId); - } + @Override + protected void handleEvent(PubsubEvent event, String msgId, AckReplyConsumer consumer) { + Map payload = event.getPayload(); + if (!payload.containsKey("assetId") + || !payload.containsKey("decision") + || !payload.containsKey("riskLevel")) { + log.warn( + "Malformed event: Event payload missing required fields (assetId, decision, riskLevel). Ignoring event msgId={}", + msgId); consumer.ack(); - } catch (Exception ex) { - log.error("Failed to handle msgId={} — nacking", msgId, ex); - consumer.nack(); + return; } + + EmitAlertsCommand cmd = + new EmitAlertsCommand( + (String) payload.get("assetId"), + (String) payload.get("decision"), + (int) payload.get("riskLevel")); + this.commandHandler.handle(cmd); + + log.info( + "Decision registered for asset={} decision={} riskLevel={} msgId={}", + cmd.assetId(), + cmd.decision(), + cmd.riskLevel(), + msgId); + consumer.ack(); } } diff --git a/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubPortfolioAssetEventSubscriber.java b/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubPortfolioAssetEventSubscriber.java index e82bddd..fea8255 100644 --- a/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubPortfolioAssetEventSubscriber.java +++ b/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubPortfolioAssetEventSubscriber.java @@ -2,8 +2,6 @@ import io.autoinvestor.application.RegisterPortfolioAssetCommand; import io.autoinvestor.application.RegisterPortfolioAssetCommandHandler; -import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import java.util.Map; @@ -12,110 +10,55 @@ import org.springframework.context.annotation.Profile; import org.springframework.stereotype.Component; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.api.core.ApiService.Listener; -import com.google.api.core.ApiService.State; import com.google.cloud.pubsub.v1.AckReplyConsumer; -import com.google.cloud.pubsub.v1.MessageReceiver; -import com.google.cloud.pubsub.v1.Subscriber; -import com.google.pubsub.v1.ProjectSubscriptionName; -import com.google.pubsub.v1.PubsubMessage; @Slf4j @Component @Profile("prod") -public class PubsubPortfolioAssetEventSubscriber { +public class PubsubPortfolioAssetEventSubscriber extends AbstractPubsubEventSubscriber { private final RegisterPortfolioAssetCommandHandler commandHandler; - private final PubsubEventMapper eventMapper; - private final ProjectSubscriptionName subscriptionName; - private final ObjectMapper objectMapper = new ObjectMapper(); - - private Subscriber subscriber; public PubsubPortfolioAssetEventSubscriber( RegisterPortfolioAssetCommandHandler commandHandler, PubsubEventMapper eventMapper, @Value("${GCP_PROJECT}") String projectId, @Value("${PUBSUB_SUBSCRIPTION_PORTFOLIO}") String subscriptionId) { + super(eventMapper, projectId, subscriptionId); this.commandHandler = commandHandler; - this.eventMapper = eventMapper; - this.subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId); - } - - @PostConstruct - public void listen() { - log.info("Starting Pub/Sub subscriber for {}", subscriptionName); - - MessageReceiver receiver = this::processMessage; - - this.subscriber = Subscriber.newBuilder(subscriptionName, receiver).build(); - this.subscriber.addListener( - new Listener() { - @Override - public void failed(State from, Throwable failure) { - log.error( - "Subscriber failed from state {}: {}", - from, - failure.toString(), - failure); // ERROR - } - }, - Runnable::run); - this.subscriber.startAsync().awaitRunning(); - log.info("Subscriber running"); } - @PreDestroy - public void stop() { - if (this.subscriber != null) { - log.info("Stopping subscriber..."); - this.subscriber.stopAsync(); - } + @Override + protected String getEventType() { + return "PORTFOLIO_ASSET_ADDED"; } - private void processMessage(PubsubMessage message, AckReplyConsumer consumer) { - String msgId = message.getMessageId(); - log.debug("Received message msgId={} size={}B", msgId, message.getData().size()); - - try { - Map raw = - objectMapper.readValue( - message.getData().toByteArray(), new TypeReference<>() {}); - PubsubEvent event = eventMapper.fromMap(raw); - log.info("Processing event type={} msgId={}", event.getType(), msgId); - - if ("PORTFOLIO_ASSET_ADDED".equals(event.getType())) { - if (event.getAggregateId() == null - || event.getPayload() == null - || !event.getPayload().containsKey("userId") - || !event.getPayload().containsKey("assetId")) { - log.warn( - "Malformed event: Skipping PORTFOLIO_ASSET_ADDED event with missing fields msgId={}", - msgId); - consumer.ack(); - return; - } - - RegisterPortfolioAssetCommand cmd = - new RegisterPortfolioAssetCommand( - (String) event.getPayload().get("userId"), - (String) event.getPayload().get("assetId")); - this.commandHandler.handle(cmd); - log.info( - "Decision registered for userId={} assetId={} msgId={}", - cmd.userId(), - cmd.assetId(), - msgId); - } else { - log.debug("Ignored unsupported event type={} msgId={}", event.getType(), msgId); - } - + @Override + protected void handleEvent(PubsubEvent event, String msgId, AckReplyConsumer consumer) { + Map payload = event.getPayload(); + + if (event.getAggregateId() == null + || payload == null + || !payload.containsKey("userId") + || !payload.containsKey("assetId")) { + log.warn( + "Malformed event: Skipping PORTFOLIO_ASSET_ADDED " + + "event with missing fields msgId={}", + msgId); consumer.ack(); - } catch (Exception ex) { - log.error("Failed to handle msgId={} — nacking", msgId, ex); - consumer.nack(); + return; } + + RegisterPortfolioAssetCommand cmd = + new RegisterPortfolioAssetCommand( + (String) payload.get("userId"), (String) payload.get("assetId")); + this.commandHandler.handle(cmd); + + log.info( + "Portfolio asset registered for userId={} assetId={} msgId={}", + cmd.userId(), + cmd.assetId(), + msgId); + consumer.ack(); } } diff --git a/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubUsersEventSubscriber.java b/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubUsersEventSubscriber.java index 7cbc6bd..0b39557 100644 --- a/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubUsersEventSubscriber.java +++ b/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubUsersEventSubscriber.java @@ -2,8 +2,6 @@ import io.autoinvestor.application.RegisterUserCommand; import io.autoinvestor.application.RegisterUserCommandHandler; -import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import java.util.Map; @@ -12,107 +10,50 @@ import org.springframework.context.annotation.Profile; import org.springframework.stereotype.Component; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.api.core.ApiService.Listener; -import com.google.api.core.ApiService.State; import com.google.cloud.pubsub.v1.AckReplyConsumer; -import com.google.cloud.pubsub.v1.MessageReceiver; -import com.google.cloud.pubsub.v1.Subscriber; -import com.google.pubsub.v1.ProjectSubscriptionName; -import com.google.pubsub.v1.PubsubMessage; @Slf4j @Component @Profile("prod") -public class PubsubUsersEventSubscriber { +public class PubsubUsersEventSubscriber extends AbstractPubsubEventSubscriber { private final RegisterUserCommandHandler commandHandler; - private final PubsubEventMapper eventMapper; - private final ProjectSubscriptionName subscriptionName; - private final ObjectMapper objectMapper = new ObjectMapper(); - - private Subscriber subscriber; public PubsubUsersEventSubscriber( RegisterUserCommandHandler commandHandler, PubsubEventMapper eventMapper, @Value("${GCP_PROJECT}") String projectId, @Value("${PUBSUB_SUBSCRIPTION_USERS}") String subscriptionId) { + super(eventMapper, projectId, subscriptionId); this.commandHandler = commandHandler; - this.eventMapper = eventMapper; - this.subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId); - } - - @PostConstruct - public void listen() { - log.info("Starting Pub/Sub subscriber for {}", subscriptionName); - - MessageReceiver receiver = this::processMessage; - - this.subscriber = Subscriber.newBuilder(subscriptionName, receiver).build(); - this.subscriber.addListener( - new Listener() { - @Override - public void failed(State from, Throwable failure) { - log.error( - "Subscriber failed from state {}: {}", - from, - failure.toString(), - failure); // ERROR - } - }, - Runnable::run); - this.subscriber.startAsync().awaitRunning(); - log.info("Subscriber running"); } - @PreDestroy - public void stop() { - if (this.subscriber != null) { - log.info("Stopping subscriber..."); - this.subscriber.stopAsync(); - } + @Override + protected String getEventType() { + return "USER_CREATED"; } - private void processMessage(PubsubMessage message, AckReplyConsumer consumer) { - String msgId = message.getMessageId(); - log.debug("Received message msgId={} size={}B", msgId, message.getData().size()); - - try { - Map raw = - objectMapper.readValue( - message.getData().toByteArray(), new TypeReference<>() {}); - PubsubEvent event = eventMapper.fromMap(raw); - log.info("Processing event type={} msgId={}", event.getType(), msgId); - - if ("USER_CREATED".equals(event.getType())) { - if (event.getAggregateId() == null - || !event.getPayload().containsKey("riskLevel")) { - log.warn( - "Malformed event: Skipping USER_CREATED event with missing aggregateId or riskLevel msgId={}", - msgId); - consumer.ack(); - return; - } - - RegisterUserCommand cmd = - new RegisterUserCommand( - event.getAggregateId(), (int) event.getPayload().get("riskLevel")); - this.commandHandler.handle(cmd); - log.info( - "Decision registered for userId={} riskLevel={} msgId={}", - cmd.userId(), - cmd.riskLevel(), - msgId); - } else { - log.debug("Ignored unsupported event type={} msgId={}", event.getType(), msgId); - } + @Override + protected void handleEvent(PubsubEvent event, String msgId, AckReplyConsumer consumer) { + Map payload = event.getPayload(); + if (event.getAggregateId() == null || !payload.containsKey("riskLevel")) { + log.warn( + "Malformed event: Skipping USER_CREATED event with missing aggregateId or riskLevel msgId={}", + msgId); consumer.ack(); - } catch (Exception ex) { - log.error("Failed to handle msgId={} — nacking", msgId, ex); - consumer.nack(); + return; } + + RegisterUserCommand cmd = + new RegisterUserCommand(event.getAggregateId(), (int) payload.get("riskLevel")); + this.commandHandler.handle(cmd); + + log.info( + "User registered userId={} riskLevel={} msgId={}", + cmd.userId(), + cmd.riskLevel(), + msgId); + consumer.ack(); } } From aadb63023a19a6148b559ad482aa8872d54c0758 Mon Sep 17 00:00:00 2001 From: Pol Pinol Castuera Date: Tue, 3 Jun 2025 19:25:30 +0200 Subject: [PATCH 04/13] Add logs and tests --- .github/workflows/post-commit-pipeline.yml | 3 +- .../PubsubDecisionsEventSubscriber.java | 39 +++- .../PubsubPortfolioAssetEventSubscriber.java | 40 ++-- .../listeners/PubsubUsersEventSubscriber.java | 38 +++- .../InMemoryAlertsReadModelRepository.java | 4 + .../MongoAlertsReadModelRepository.java | 34 ++- .../InMemoryInboxReadModelRepository.java | 4 + .../users/MongoInboxReadModelRepository.java | 37 +++- .../MongoEventStoreRepository.java | 67 +++++- .../InMemoryPortfolioRepository.java | 5 + .../portfolio/MongoPortfolioRepository.java | 70 ++++++- src/main/resources/application.properties | 2 +- .../autoinvestor/AlertsIntegrationTest.java | 197 ++++++++++++++++++ 13 files changed, 473 insertions(+), 67 deletions(-) create mode 100644 src/test/java/io/autoinvestor/AlertsIntegrationTest.java diff --git a/.github/workflows/post-commit-pipeline.yml b/.github/workflows/post-commit-pipeline.yml index 5ea97eb..00b736b 100644 --- a/.github/workflows/post-commit-pipeline.yml +++ b/.github/workflows/post-commit-pipeline.yml @@ -13,8 +13,7 @@ jobs: build_test_release: runs-on: ubuntu-22.04 env: - SPRING_PROFILES_ACTIVE: prod - BP_SPRING_PROFILES_ACTIVE: prod + SPRING_PROFILES_ACTIVE: local outputs: new-version: ${{ (steps.version_increment.outputs.bump != 'none' && steps.calculate_version.outputs.new_version) || '' }} steps: diff --git a/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubDecisionsEventSubscriber.java b/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubDecisionsEventSubscriber.java index 3695ec5..c49b266 100644 --- a/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubDecisionsEventSubscriber.java +++ b/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubDecisionsEventSubscriber.java @@ -35,13 +35,18 @@ protected String getEventType() { @Override protected void handleEvent(PubsubEvent event, String msgId, AckReplyConsumer consumer) { - Map payload = event.getPayload(); + log.debug( + "Received ASSET_DECISION_TAKEN event msgId={} payloadKeys={}", + msgId, + event.getPayload().keySet()); - if (!payload.containsKey("assetId") + Map payload = event.getPayload(); + if (payload == null + || !payload.containsKey("assetId") || !payload.containsKey("decision") || !payload.containsKey("riskLevel")) { log.warn( - "Malformed event: Event payload missing required fields (assetId, decision, riskLevel). Ignoring event msgId={}", + "Malformed ASSET_DECISION_TAKEN event (missing assetId/decision/riskLevel). Skipping msgId={}", msgId); consumer.ack(); return; @@ -52,14 +57,26 @@ protected void handleEvent(PubsubEvent event, String msgId, AckReplyConsumer con (String) payload.get("assetId"), (String) payload.get("decision"), (int) payload.get("riskLevel")); - this.commandHandler.handle(cmd); - log.info( - "Decision registered for asset={} decision={} riskLevel={} msgId={}", - cmd.assetId(), - cmd.decision(), - cmd.riskLevel(), - msgId); - consumer.ack(); + try { + this.commandHandler.handle(cmd); + log.info( + "Handled ASSET_DECISION_TAKEN: assetId={} decision={} riskLevel={} msgId={}", + cmd.assetId(), + cmd.decision(), + cmd.riskLevel(), + msgId); + consumer.ack(); + } catch (Exception ex) { + log.error( + "Error while handling ASSET_DECISION_TAKEN for assetId={} decision={} riskLevel={}, msgId={}: {}", + cmd.assetId(), + cmd.decision(), + cmd.riskLevel(), + msgId, + ex.getMessage(), + ex); + consumer.nack(); + } } } diff --git a/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubPortfolioAssetEventSubscriber.java b/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubPortfolioAssetEventSubscriber.java index fea8255..1212e3e 100644 --- a/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubPortfolioAssetEventSubscriber.java +++ b/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubPortfolioAssetEventSubscriber.java @@ -35,15 +35,15 @@ protected String getEventType() { @Override protected void handleEvent(PubsubEvent event, String msgId, AckReplyConsumer consumer) { - Map payload = event.getPayload(); + log.debug( + "Received PORTFOLIO_ASSET_ADDED event msgId={} payloadKeys={}", + msgId, + event.getPayload().keySet()); - if (event.getAggregateId() == null - || payload == null - || !payload.containsKey("userId") - || !payload.containsKey("assetId")) { + Map payload = event.getPayload(); + if (payload == null || !payload.containsKey("userId") || !payload.containsKey("assetId")) { log.warn( - "Malformed event: Skipping PORTFOLIO_ASSET_ADDED " - + "event with missing fields msgId={}", + "Malformed PORTFOLIO_ASSET_ADDED event (missing userId or assetId). Skipping msgId={}", msgId); consumer.ack(); return; @@ -52,13 +52,23 @@ protected void handleEvent(PubsubEvent event, String msgId, AckReplyConsumer con RegisterPortfolioAssetCommand cmd = new RegisterPortfolioAssetCommand( (String) payload.get("userId"), (String) payload.get("assetId")); - this.commandHandler.handle(cmd); - - log.info( - "Portfolio asset registered for userId={} assetId={} msgId={}", - cmd.userId(), - cmd.assetId(), - msgId); - consumer.ack(); + try { + this.commandHandler.handle(cmd); + log.info( + "Handled PORTFOLIO_ASSET_ADDED: userId={} assetId={} msgId={}", + cmd.userId(), + cmd.assetId(), + msgId); + consumer.ack(); + } catch (Exception ex) { + log.error( + "Error while handling PORTFOLIO_ASSET_ADDED for userId={} assetId={}, msgId={}: {}", + cmd.userId(), + cmd.assetId(), + msgId, + ex.getMessage(), + ex); + consumer.nack(); + } } } diff --git a/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubUsersEventSubscriber.java b/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubUsersEventSubscriber.java index 0b39557..8608570 100644 --- a/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubUsersEventSubscriber.java +++ b/src/main/java/io/autoinvestor/infrastructure/listeners/PubsubUsersEventSubscriber.java @@ -35,11 +35,17 @@ protected String getEventType() { @Override protected void handleEvent(PubsubEvent event, String msgId, AckReplyConsumer consumer) { - Map payload = event.getPayload(); + log.debug( + "Received USER_CREATED event msgId={} aggregateId={}", + msgId, + event.getAggregateId()); - if (event.getAggregateId() == null || !payload.containsKey("riskLevel")) { + Map payload = event.getPayload(); + if (event.getAggregateId() == null + || payload == null + || !payload.containsKey("riskLevel")) { log.warn( - "Malformed event: Skipping USER_CREATED event with missing aggregateId or riskLevel msgId={}", + "Malformed USER_CREATED event (missing aggregateId or riskLevel). Skipping msgId={}", msgId); consumer.ack(); return; @@ -47,13 +53,23 @@ protected void handleEvent(PubsubEvent event, String msgId, AckReplyConsumer con RegisterUserCommand cmd = new RegisterUserCommand(event.getAggregateId(), (int) payload.get("riskLevel")); - this.commandHandler.handle(cmd); - - log.info( - "User registered userId={} riskLevel={} msgId={}", - cmd.userId(), - cmd.riskLevel(), - msgId); - consumer.ack(); + try { + this.commandHandler.handle(cmd); + log.info( + "Handled USER_CREATED: userId={} riskLevel={} msgId={}", + cmd.userId(), + cmd.riskLevel(), + msgId); + consumer.ack(); + } catch (Exception ex) { + log.error( + "Error while handling USER_CREATED for userId={} riskLevel={}, msgId={}: {}", + cmd.userId(), + cmd.riskLevel(), + msgId, + ex.getMessage(), + ex); + consumer.nack(); + } } } diff --git a/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/InMemoryAlertsReadModelRepository.java b/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/InMemoryAlertsReadModelRepository.java index 5f435c6..fb828e4 100644 --- a/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/InMemoryAlertsReadModelRepository.java +++ b/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/InMemoryAlertsReadModelRepository.java @@ -46,4 +46,8 @@ public List get(String userId) { .filter(alert -> alert.userId().equals(userId)) .collect(Collectors.toList()); } + + public void clear() { + alerts.clear(); + } } diff --git a/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/MongoAlertsReadModelRepository.java b/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/MongoAlertsReadModelRepository.java index b9183d3..a0e57f7 100644 --- a/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/MongoAlertsReadModelRepository.java +++ b/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/MongoAlertsReadModelRepository.java @@ -2,6 +2,7 @@ import io.autoinvestor.application.AlertDTO; import io.autoinvestor.application.AlertsReadModelRepository; +import lombok.extern.slf4j.Slf4j; import java.util.List; @@ -13,6 +14,7 @@ @Repository @Profile("prod") +@Slf4j public class MongoAlertsReadModelRepository implements AlertsReadModelRepository { private static final String COLLECTION = "alerts"; @@ -23,18 +25,42 @@ public class MongoAlertsReadModelRepository implements AlertsReadModelRepository public MongoAlertsReadModelRepository(MongoTemplate template, DecisionMapper mapper) { this.template = template; this.mapper = mapper; + log.info("MongoAlertsReadModelRepository initialized."); } @Override public void save(AlertDTO alertDTO) { - template.save(mapper.toDocument(alertDTO), COLLECTION); + try { + template.save(mapper.toDocument(alertDTO), COLLECTION); + log.info("Saved AlertDTO for userId={}", alertDTO.userId()); + } catch (Exception ex) { + log.error( + "Failed to save AlertDTO[userId={}, assetId={}]: {}", + alertDTO.userId(), + alertDTO.assetId(), + ex.getMessage(), + ex); + throw ex; + } } @Override public List get(String userId) { Query query = Query.query(Criteria.where("userId").is(userId)); - return template.find(query, DecisionDocument.class, COLLECTION).stream() - .map(mapper::toDTO) - .toList(); + List dtos; + try { + dtos = + template.find(query, DecisionDocument.class, COLLECTION).stream() + .map(mapper::toDTO) + .toList(); + } catch (Exception ex) { + log.error("Error retrieving alerts for userId={}: {}", userId, ex.getMessage(), ex); + throw ex; + } + + if (dtos.isEmpty()) { + log.warn("No alerts found for userId={}", userId); + } + return dtos; } } diff --git a/src/main/java/io/autoinvestor/infrastructure/read_models/users/InMemoryInboxReadModelRepository.java b/src/main/java/io/autoinvestor/infrastructure/read_models/users/InMemoryInboxReadModelRepository.java index 394aa72..3a51415 100644 --- a/src/main/java/io/autoinvestor/infrastructure/read_models/users/InMemoryInboxReadModelRepository.java +++ b/src/main/java/io/autoinvestor/infrastructure/read_models/users/InMemoryInboxReadModelRepository.java @@ -32,4 +32,8 @@ public Optional getInboxId(UserId userId) { String raw = inbox.get(userId.value()); return raw != null ? Optional.of(InboxId.from(raw)) : Optional.empty(); } + + public void clear() { + inbox.clear(); + } } diff --git a/src/main/java/io/autoinvestor/infrastructure/read_models/users/MongoInboxReadModelRepository.java b/src/main/java/io/autoinvestor/infrastructure/read_models/users/MongoInboxReadModelRepository.java index 979a564..9296208 100644 --- a/src/main/java/io/autoinvestor/infrastructure/read_models/users/MongoInboxReadModelRepository.java +++ b/src/main/java/io/autoinvestor/infrastructure/read_models/users/MongoInboxReadModelRepository.java @@ -3,6 +3,7 @@ import io.autoinvestor.application.InboxReadModelRepository; import io.autoinvestor.domain.model.InboxId; import io.autoinvestor.domain.model.UserId; +import lombok.extern.slf4j.Slf4j; import java.util.Optional; @@ -12,29 +13,55 @@ @Repository @Profile("prod") +@Slf4j public class MongoInboxReadModelRepository implements InboxReadModelRepository { private final MongoTemplate template; public MongoInboxReadModelRepository(MongoTemplate template) { this.template = template; + log.info("MongoInboxReadModelRepository initialized."); } @Override public void save(UserId userId, InboxId inboxId) { - String userIdStr = userId.value(); - String inboxIdStr = inboxId.value(); - DecisionDocument doc = new DecisionDocument(userIdStr, inboxIdStr); - template.save(doc); + DecisionDocument doc = new DecisionDocument(userId.value(), inboxId.value()); + try { + template.save(doc); + log.info( + "Saved DecisionDocument[userId={} -> inboxId={}]", + userId.value(), + inboxId.value()); + } catch (Exception ex) { + log.error( + "Failed to save DecisionDocument[userId={}]: {}", + userId.value(), + ex.getMessage(), + ex); + throw ex; + } } @Override public Optional getInboxId(UserId userId) { String userIdStr = userId.value(); - DecisionDocument doc = template.findById(userIdStr, DecisionDocument.class); + DecisionDocument doc; + try { + doc = template.findById(userIdStr, DecisionDocument.class); + } catch (Exception ex) { + log.error( + "Error fetching DecisionDocument for userId={}: {}", + userIdStr, + ex.getMessage(), + ex); + throw ex; + } + if (doc == null) { + log.warn("No DecisionDocument found for userId={}", userIdStr); return Optional.empty(); } + return Optional.of(InboxId.from(doc.getInboxId())); } } diff --git a/src/main/java/io/autoinvestor/infrastructure/repositories/event_store/MongoEventStoreRepository.java b/src/main/java/io/autoinvestor/infrastructure/repositories/event_store/MongoEventStoreRepository.java index 78c87b7..3b028e7 100644 --- a/src/main/java/io/autoinvestor/infrastructure/repositories/event_store/MongoEventStoreRepository.java +++ b/src/main/java/io/autoinvestor/infrastructure/repositories/event_store/MongoEventStoreRepository.java @@ -4,6 +4,7 @@ import io.autoinvestor.domain.events.EventStoreRepository; import io.autoinvestor.domain.model.Inbox; import io.autoinvestor.domain.model.InboxId; +import lombok.extern.slf4j.Slf4j; import java.util.List; import java.util.Optional; @@ -18,6 +19,7 @@ @Repository @Profile("prod") +@Slf4j public class MongoEventStoreRepository implements EventStoreRepository { private static final String COLLECTION = "events"; @@ -27,15 +29,32 @@ public class MongoEventStoreRepository implements EventStoreRepository { public MongoEventStoreRepository(MongoTemplate template, EventMapper mapper) { this.template = template; this.mapper = mapper; + log.info("MongoEventStoreRepository initialized."); } @Override public void save(Inbox inbox) { - List docs = - inbox.getUncommittedEvents().stream() - .map(mapper::toDocument) - .collect(Collectors.toList()); - template.insertAll(docs); + List> uncommitted = inbox.getUncommittedEvents(); + if (uncommitted.isEmpty()) { + log.debug( + "No uncommitted events to save for inboxId={}", + inbox.getState().getInboxId().value()); + return; + } + + try { + List docs = + uncommitted.stream().map(mapper::toDocument).collect(Collectors.toList()); + template.insertAll(docs); + log.info("Inserted {} event(s) into '{}'", docs.size(), COLLECTION); + } catch (Exception ex) { + log.error( + "Failed to insert events for inboxId={}: {}", + inbox.getState().getInboxId().value(), + ex.getMessage(), + ex); + throw ex; + } } @Override @@ -43,16 +62,31 @@ public Optional get(InboxId inboxId) { Query q = Query.query(Criteria.where("aggregateId").is(inboxId.value())) .with(Sort.by("version")); + log.debug("Querying '{}' for aggregateId={}", COLLECTION, inboxId.value()); - List docs = template.find(q, EventDocument.class, COLLECTION); + List docs; + try { + docs = template.find(q, EventDocument.class, COLLECTION); + } catch (Exception ex) { + log.error( + "Error querying events for inboxId={}: {}", + inboxId.value(), + ex.getMessage(), + ex); + throw ex; + } if (docs.isEmpty()) { + log.warn("No EventDocument found for inboxId={}", inboxId.value()); return Optional.empty(); } List> events = docs.stream().map(mapper::toDomain).collect(Collectors.toList()); - if (events.isEmpty()) { + log.warn( + "Mapped {} EventDocument(s) but got 0 domain events for inboxId={}", + docs.size(), + inboxId.value()); return Optional.empty(); } @@ -61,8 +95,21 @@ public Optional get(InboxId inboxId) { @Override public boolean exists(InboxId inboxId) { - Query q = Query.query(Criteria.where("aggregateId").is(inboxId.value())); - - return template.exists(q, EventDocument.class, COLLECTION); + try { + boolean exists = + template.exists( + Query.query(Criteria.where("aggregateId").is(inboxId.value())), + EventDocument.class, + COLLECTION); + log.debug("exists(inboxId={}) = {}", inboxId.value(), exists); + return exists; + } catch (Exception ex) { + log.error( + "Error checking existence of events for inboxId={}: {}", + inboxId.value(), + ex.getMessage(), + ex); + throw ex; + } } } diff --git a/src/main/java/io/autoinvestor/infrastructure/repositories/portfolio/InMemoryPortfolioRepository.java b/src/main/java/io/autoinvestor/infrastructure/repositories/portfolio/InMemoryPortfolioRepository.java index 3c4715c..2a47435 100644 --- a/src/main/java/io/autoinvestor/infrastructure/repositories/portfolio/InMemoryPortfolioRepository.java +++ b/src/main/java/io/autoinvestor/infrastructure/repositories/portfolio/InMemoryPortfolioRepository.java @@ -48,4 +48,9 @@ public boolean existsPortfolioAsset(String userId, String assetId) { .map(set -> set.contains(assetId)) .orElse(false); } + + public void clear() { + users.clear(); + portfolio.clear(); + } } diff --git a/src/main/java/io/autoinvestor/infrastructure/repositories/portfolio/MongoPortfolioRepository.java b/src/main/java/io/autoinvestor/infrastructure/repositories/portfolio/MongoPortfolioRepository.java index ddf2107..9290dcf 100644 --- a/src/main/java/io/autoinvestor/infrastructure/repositories/portfolio/MongoPortfolioRepository.java +++ b/src/main/java/io/autoinvestor/infrastructure/repositories/portfolio/MongoPortfolioRepository.java @@ -6,6 +6,7 @@ import io.autoinvestor.domain.model.UserId; import lombok.Getter; import lombok.Setter; +import lombok.extern.slf4j.Slf4j; import java.util.List; import java.util.stream.Collectors; @@ -20,6 +21,7 @@ @Repository @Profile("prod") +@Slf4j public class MongoPortfolioRepository implements PortfolioRepository { public static final String PORTFOLIO_COLLECTION = "portfolio"; public static final String USERS_COLLECTION = "users"; @@ -28,6 +30,7 @@ public class MongoPortfolioRepository implements PortfolioRepository { public MongoPortfolioRepository(MongoTemplate template) { this.template = template; + log.info("MongoPortfolioRepository initialized."); } @Override @@ -40,12 +43,27 @@ public List getUsersIdByAssetAndRiskLevel(String assetId, int riskLevel) match(Criteria.where("userDocs.riskLevel").is(riskLevel)), project("userId")); + log.debug("Aggregation pipeline for assetId={}, riskLevel={}: {}", assetId, riskLevel, agg); + AggregationResults results = template.aggregate(agg, PORTFOLIO_COLLECTION, IdProjection.class); - return results.getMappedResults().stream() - .map(p -> UserId.from(p.getUserId())) - .collect(Collectors.toList()); + List raw = results.getMappedResults(); + + if (raw.isEmpty()) { + log.warn("No users found for assetId='{}' with riskLevel={}", assetId, riskLevel); + return List.of(); + } + + List userIds = + raw.stream().map(p -> UserId.from(p.getUserId())).collect(Collectors.toList()); + + log.info( + "Found {} user(s) for assetId='{}', riskLevel={}", + userIds.size(), + assetId, + riskLevel); + return userIds; } @Setter @@ -56,18 +74,54 @@ private static class IdProjection { @Override public void addUser(String userId, int riskLevel) { - template.save(new UserDocument(null, userId, riskLevel)); + try { + template.save(new UserDocument(null, userId, riskLevel)); + log.info("Saved UserDocument[userId={}]", userId); + } catch (Exception ex) { + log.error( + "Failed to save UserDocument[userId={}, riskLevel={}]: {}", + userId, + riskLevel, + ex.getMessage(), + ex); + throw ex; + } } @Override public void addPortfolioAsset(String userId, String assetId) { - template.save(new PortfolioDocument(null, userId, assetId)); + try { + template.save(new PortfolioDocument(null, userId, assetId)); + log.info("Saved PortfolioDocument[userId={}, assetId={}]", userId, assetId); + } catch (Exception ex) { + log.error( + "Failed to save PortfolioDocument[userId={}, assetId={}]: {}", + userId, + assetId, + ex.getMessage(), + ex); + throw ex; + } } @Override public boolean existsPortfolioAsset(String userId, String assetId) { - return template.exists( - Query.query(Criteria.where("userId").is(userId).and("assetId").is(assetId)), - PortfolioDocument.class); + try { + boolean exists = + template.exists( + Query.query( + Criteria.where("userId").is(userId).and("assetId").is(assetId)), + PortfolioDocument.class); + log.debug("existsPortfolioAsset(userId={}, assetId={}) = {}", userId, assetId, exists); + return exists; + } catch (Exception ex) { + log.error( + "Error checking existence of PortfolioDocument[userId={}, assetId={}]: {}", + userId, + assetId, + ex.getMessage(), + ex); + throw ex; + } } } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 283e1d0..3aa94d1 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1,2 +1,2 @@ spring.application.name=alerts -spring.profiles.active=local +spring.profiles.active=prod diff --git a/src/test/java/io/autoinvestor/AlertsIntegrationTest.java b/src/test/java/io/autoinvestor/AlertsIntegrationTest.java new file mode 100644 index 0000000..35dde3e --- /dev/null +++ b/src/test/java/io/autoinvestor/AlertsIntegrationTest.java @@ -0,0 +1,197 @@ +package io.autoinvestor; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.*; + +import io.autoinvestor.application.*; +import io.autoinvestor.domain.model.Decision; +import io.autoinvestor.domain.model.UserId; +import io.autoinvestor.infrastructure.read_models.alerts.InMemoryAlertsReadModelRepository; +import io.autoinvestor.infrastructure.read_models.users.InMemoryInboxReadModelRepository; +import io.autoinvestor.infrastructure.repositories.portfolio.InMemoryPortfolioRepository; + +import java.util.List; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.http.MediaType; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.web.servlet.MockMvc; + +@SpringBootTest +@AutoConfigureMockMvc +@ActiveProfiles("local") +class AlertsIntegrationTest { + + @Autowired private RegisterUserCommandHandler registerUserHandler; + + @Autowired private RegisterPortfolioAssetCommandHandler registerPortfolioAssetHandler; + + @Autowired private EmitAlertsCommandHandler emitAlertsHandler; + + @Autowired private GetAlertsQueryHandler getAlertsQueryHandler; + + @Autowired private InMemoryPortfolioRepository portfolioRepository; + + @Autowired private InMemoryInboxReadModelRepository inboxReadModel; + + @Autowired private InMemoryAlertsReadModelRepository alertsReadModel; + + @Autowired private MockMvc mockMvc; + + @BeforeEach + void resetState() { + inboxReadModel.clear(); + alertsReadModel.clear(); + portfolioRepository.clear(); + } + + @Test + void registerUserHandler_shouldCreateInboxAndPortfolioUser() { + // GIVEN: a brand-new userId that does not exist yet + String userId = "user-1"; + int riskLevel = 10; + + // PRECONDITION: inboxReadModel has no mapping for "user-1" + assertThat(inboxReadModel.getInboxId(UserId.from(userId))).isEmpty(); + + // WHEN: we invoke the RegisterUserCommandHandler + RegisterUserCommand cmd = new RegisterUserCommand(userId, riskLevel); + registerUserHandler.handle(cmd); + + // THEN: the InMemoryInboxReadModelRepository should now contain an inboxId for "user-1" + assertThat(inboxReadModel.getInboxId(UserId.from(userId))).isPresent(); + + // AND: the InMemoryPortfolioRepository should have registered the user internally + // We know that existsPortfolioAsset returns false for any asset because none added yet, + // so simply verify that getUsersIdByAssetAndRiskLevel yields an empty list for an arbitrary + // asset. + List usersForBtcRisk10 = + portfolioRepository.getUsersIdByAssetAndRiskLevel("BTC", 10); + assertThat(usersForBtcRisk10).isEmpty(); + } + + @Test + void registerPortfolioAssetHandler_shouldAttachAssetToExistingUser() { + String userId = "user-2"; + int riskLevel = 5; + String assetId = "AAPL"; + + // First, register the user so that an inbox exists and portfolioRepository has that user + registerUserHandler.handle(new RegisterUserCommand(userId, riskLevel)); + + // PRECONDITION: this user does not yet have "AAPL" in their portfolio + boolean before = portfolioRepository.existsPortfolioAsset(userId, assetId); + assertThat(before).isFalse(); + + // WHEN: we add the portfolio asset + RegisterPortfolioAssetCommand addCmd = new RegisterPortfolioAssetCommand(userId, assetId); + registerPortfolioAssetHandler.handle(addCmd); + + // THEN: the portfolioRepository should reflect that "user-2" now has AAPL + boolean after = portfolioRepository.existsPortfolioAsset(userId, assetId); + assertThat(after).isTrue(); + + // AND: getUsersIdByAssetAndRiskLevel("AAPL", 5) should return exactly user-2 + List matched = + portfolioRepository.getUsersIdByAssetAndRiskLevel(assetId, riskLevel); + assertThat(matched).hasSize(1).allMatch(uid -> uid.value().equals(userId)); + } + + @Test + void emitAlertsHandler_shouldCreateAndStoreAlertForMatchingUsers() { + String userId = "user-3"; + int riskLevel = 3; + String assetId = "GOOG"; + Decision decision = Decision.BUY; + + // 1) register the user: + registerUserHandler.handle(new RegisterUserCommand(userId, riskLevel)); + + // 2) attach "GOOG" to user-3's portfolio: + registerPortfolioAssetHandler.handle(new RegisterPortfolioAssetCommand(userId, assetId)); + + // PRECONDITION: alertsReadModel.get(userId) is empty + List beforeAlerts = alertsReadModel.get(userId); + assertThat(beforeAlerts).isEmpty(); + + // WHEN: we emit an alert for asset "GOOG" at riskLevel=3 with decision=BUY + EmitAlertsCommand emitCmd = new EmitAlertsCommand(assetId, decision.name(), riskLevel); + emitAlertsHandler.handle(emitCmd); + + // THEN: alertsReadModel.get(userId) should contain exactly one AlertDTO + List afterAlerts = alertsReadModel.get(userId); + assertThat(afterAlerts).hasSize(1); + + AlertDTO alertDto = afterAlerts.getFirst(); + assertThat(alertDto.userId()).isEqualTo(userId); + assertThat(alertDto.assetId()).isEqualTo(assetId); + assertThat(alertDto.type()).isEqualTo(decision.name()); + assertThat(alertDto.date()).isNotNull(); + } + + @Test + void getAlertsQueryHandler_shouldReturnAllSavedAlertsForUser() { + String userId = "user-4"; + int riskLevel = 2; + String assetId1 = "MSFT"; + String assetId2 = "TSLA"; + Decision decision1 = Decision.SELL; + Decision decision2 = Decision.HOLD; + + // Setup: register user, attach two assets, emit two alerts + registerUserHandler.handle(new RegisterUserCommand(userId, riskLevel)); + registerPortfolioAssetHandler.handle(new RegisterPortfolioAssetCommand(userId, assetId1)); + registerPortfolioAssetHandler.handle(new RegisterPortfolioAssetCommand(userId, assetId2)); + + // Emit first alert (MSFT, SELL) + emitAlertsHandler.handle(new EmitAlertsCommand(assetId1, decision1.name(), riskLevel)); + + // Emit second alert (TSLA, HOLD) + emitAlertsHandler.handle(new EmitAlertsCommand(assetId2, decision2.name(), riskLevel)); + + // WHEN: we query via GetAlertsQueryHandler + GetDecisionsQuery query = new GetDecisionsQuery(userId); + List responses = getAlertsQueryHandler.handle(query); + + // THEN: We should see exactly two entries, one for MSFT and one for TSLA (in insertion + // order) + assertThat(responses).hasSize(2); + + // Order is not strictly guaranteed, so verify that the set of (assetId, type) matches + assertThat(responses) + .extracting(GetAlertsQueryResponse::assetId, GetAlertsQueryResponse::type) + .containsExactlyInAnyOrder( + org.assertj.core.groups.Tuple.tuple(assetId1, decision1.name()), + org.assertj.core.groups.Tuple.tuple(assetId2, decision2.name())); + } + + @Test + void getAlertsController_endpointShouldReturnJsonListOfAlerts() throws Exception { + String userId = "user-5"; + int riskLevel = 7; + String assetId = "NFLX"; + Decision decision = Decision.SELL; + + // 1) Register user & portfolio & emit one alert (same setup as above) + registerUserHandler.handle(new RegisterUserCommand(userId, riskLevel)); + registerPortfolioAssetHandler.handle(new RegisterPortfolioAssetCommand(userId, assetId)); + emitAlertsHandler.handle(new EmitAlertsCommand(assetId, decision.name(), riskLevel)); + + // 2) Perform an HTTP GET against /alerts with X-User-Id=user-5 + mockMvc.perform( + get("/alerts") + .header("X-User-Id", userId) + .accept(MediaType.APPLICATION_JSON)) + .andExpect(status().isOk()) + // Expect a JSON array of size 1 + .andExpect(jsonPath("$.length()").value(1)) + // The first (and only) element should have "assetId": "NFLX" and "type": "SELL" + .andExpect(jsonPath("$[0].assetId").value(assetId)) + .andExpect(jsonPath("$[0].type").value(decision.name())); + } +} From 641e61e7b167e3f5cf9d840b3794c31244cd928d Mon Sep 17 00:00:00 2001 From: Pol Pinol Castuera Date: Tue, 3 Jun 2025 19:36:51 +0200 Subject: [PATCH 05/13] Apply lint --- .../read_models/alerts/MongoAlertsReadModelRepository.java | 3 --- .../repositories/event_store/MongoEventStoreRepository.java | 3 +-- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/MongoAlertsReadModelRepository.java b/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/MongoAlertsReadModelRepository.java index 404d4b9..2202c52 100644 --- a/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/MongoAlertsReadModelRepository.java +++ b/src/main/java/io/autoinvestor/infrastructure/read_models/alerts/MongoAlertsReadModelRepository.java @@ -2,9 +2,6 @@ import io.autoinvestor.application.AlertDTO; import io.autoinvestor.application.AlertsReadModelRepository; - -import java.util.List; - import lombok.extern.slf4j.Slf4j; import java.util.List; diff --git a/src/main/java/io/autoinvestor/infrastructure/repositories/event_store/MongoEventStoreRepository.java b/src/main/java/io/autoinvestor/infrastructure/repositories/event_store/MongoEventStoreRepository.java index 2db64d0..d3cf883 100644 --- a/src/main/java/io/autoinvestor/infrastructure/repositories/event_store/MongoEventStoreRepository.java +++ b/src/main/java/io/autoinvestor/infrastructure/repositories/event_store/MongoEventStoreRepository.java @@ -4,13 +4,12 @@ import io.autoinvestor.domain.events.EventStoreRepository; import io.autoinvestor.domain.model.Inbox; import io.autoinvestor.domain.model.InboxId; +import lombok.extern.slf4j.Slf4j; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; -import lombok.extern.slf4j.Slf4j; - import org.springframework.context.annotation.Profile; import org.springframework.data.domain.Sort; import org.springframework.data.mongodb.core.MongoTemplate; From f5328ff0b7403ea4234b17572f61babfa02f72c8 Mon Sep 17 00:00:00 2001 From: Pol Pinol Castuera Date: Tue, 3 Jun 2025 19:38:32 +0200 Subject: [PATCH 06/13] Fix sonar cloud --- .../repositories/event_store/MongoEventStoreRepository.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/java/io/autoinvestor/infrastructure/repositories/event_store/MongoEventStoreRepository.java b/src/main/java/io/autoinvestor/infrastructure/repositories/event_store/MongoEventStoreRepository.java index d3cf883..3d02e2d 100644 --- a/src/main/java/io/autoinvestor/infrastructure/repositories/event_store/MongoEventStoreRepository.java +++ b/src/main/java/io/autoinvestor/infrastructure/repositories/event_store/MongoEventStoreRepository.java @@ -43,8 +43,7 @@ public void save(Inbox inbox) { } try { - List docs = - uncommitted.stream().map(mapper::toDocument).collect(Collectors.toList()); + List docs = uncommitted.stream().map(mapper::toDocument).toList(); template.insertAll(docs); log.info("Inserted {} event(s) into '{}'", docs.size(), COLLECTION); } catch (Exception ex) { From 353b9c873f152ce833b1131773392ca0af81883c Mon Sep 17 00:00:00 2001 From: Pol Pinol Castuera Date: Tue, 3 Jun 2025 19:44:31 +0200 Subject: [PATCH 07/13] remove local --- .github/workflows/pull-request-pipeline.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/.github/workflows/pull-request-pipeline.yml b/.github/workflows/pull-request-pipeline.yml index 11f9959..c3a02a7 100644 --- a/.github/workflows/pull-request-pipeline.yml +++ b/.github/workflows/pull-request-pipeline.yml @@ -8,8 +8,6 @@ on: jobs: build-and-test-pipeline: runs-on: ubuntu-22.04 - env: - SPRING_PROFILES_ACTIVE: local steps: - name: Check out the repository uses: actions/checkout@v4 From 1e3885741036d61acb3dcb9527efc21551b74ae1 Mon Sep 17 00:00:00 2001 From: Pol Pinol Castuera Date: Tue, 3 Jun 2025 19:47:27 +0200 Subject: [PATCH 08/13] Push properties for test --- src/test/resources/application-local.properties | 7 +++++++ src/test/resources/application-prod.properties | 6 ++++++ src/test/resources/application.properties | 2 ++ 3 files changed, 15 insertions(+) create mode 100644 src/test/resources/application-local.properties create mode 100644 src/test/resources/application-prod.properties create mode 100644 src/test/resources/application.properties diff --git a/src/test/resources/application-local.properties b/src/test/resources/application-local.properties new file mode 100644 index 0000000..1ed44a7 --- /dev/null +++ b/src/test/resources/application-local.properties @@ -0,0 +1,7 @@ +spring.autoconfigure.exclude=\ + org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration,\ + org.springframework.boot.autoconfigure.data.mongo.MongoDataAutoConfiguration + +spring.autoconfigure.exclude+=,\ + com.google.cloud.spring.autoconfigure.core.GcpContextAutoConfiguration,\ + com.google.cloud.spring.autoconfigure.pubsub.GcpPubSubAutoConfiguration \ No newline at end of file diff --git a/src/test/resources/application-prod.properties b/src/test/resources/application-prod.properties new file mode 100644 index 0000000..7a67f94 --- /dev/null +++ b/src/test/resources/application-prod.properties @@ -0,0 +1,6 @@ +spring.data.mongodb.uri=${MONGODB_URI} +spring.data.mongodb.database=${MONGODB_DB} +GCP_PROJECT=${GCP_PROJECT} +PUBSUB_TOPIC=${PUBSUB_TOPIC} +PUBSUB_SUBSCRIPTION_DECISION_MAKING=${PUBSUB_SUBSCRIPTION_DECISION_MAKING} +PUBSUB_SUBSCRIPTION_PORTFOLIO=${PUBSUB_SUBSCRIPTION_PORTFOLIO} \ No newline at end of file diff --git a/src/test/resources/application.properties b/src/test/resources/application.properties new file mode 100644 index 0000000..3aa94d1 --- /dev/null +++ b/src/test/resources/application.properties @@ -0,0 +1,2 @@ +spring.application.name=alerts +spring.profiles.active=prod From 95de8ad8a813e50a3768b3a2ac173afda7e578f2 Mon Sep 17 00:00:00 2001 From: Pol Pinol Castuera Date: Tue, 3 Jun 2025 19:49:21 +0200 Subject: [PATCH 09/13] resources for test --- src/test/resources/application-local.properties | 7 ------- src/test/resources/application-prod.properties | 6 ------ src/test/resources/application.properties | 9 ++++++++- 3 files changed, 8 insertions(+), 14 deletions(-) delete mode 100644 src/test/resources/application-local.properties delete mode 100644 src/test/resources/application-prod.properties diff --git a/src/test/resources/application-local.properties b/src/test/resources/application-local.properties deleted file mode 100644 index 1ed44a7..0000000 --- a/src/test/resources/application-local.properties +++ /dev/null @@ -1,7 +0,0 @@ -spring.autoconfigure.exclude=\ - org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration,\ - org.springframework.boot.autoconfigure.data.mongo.MongoDataAutoConfiguration - -spring.autoconfigure.exclude+=,\ - com.google.cloud.spring.autoconfigure.core.GcpContextAutoConfiguration,\ - com.google.cloud.spring.autoconfigure.pubsub.GcpPubSubAutoConfiguration \ No newline at end of file diff --git a/src/test/resources/application-prod.properties b/src/test/resources/application-prod.properties deleted file mode 100644 index 7a67f94..0000000 --- a/src/test/resources/application-prod.properties +++ /dev/null @@ -1,6 +0,0 @@ -spring.data.mongodb.uri=${MONGODB_URI} -spring.data.mongodb.database=${MONGODB_DB} -GCP_PROJECT=${GCP_PROJECT} -PUBSUB_TOPIC=${PUBSUB_TOPIC} -PUBSUB_SUBSCRIPTION_DECISION_MAKING=${PUBSUB_SUBSCRIPTION_DECISION_MAKING} -PUBSUB_SUBSCRIPTION_PORTFOLIO=${PUBSUB_SUBSCRIPTION_PORTFOLIO} \ No newline at end of file diff --git a/src/test/resources/application.properties b/src/test/resources/application.properties index 3aa94d1..7aaf4f5 100644 --- a/src/test/resources/application.properties +++ b/src/test/resources/application.properties @@ -1,2 +1,9 @@ spring.application.name=alerts -spring.profiles.active=prod + +spring.autoconfigure.exclude=\ + org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration,\ + org.springframework.boot.autoconfigure.data.mongo.MongoDataAutoConfiguration + +spring.autoconfigure.exclude+=,\ + com.google.cloud.spring.autoconfigure.core.GcpContextAutoConfiguration,\ + com.google.cloud.spring.autoconfigure.pubsub.GcpPubSubAutoConfiguration \ No newline at end of file From b6b401d93fe0ab9320ce1ffe3cbf4901654f4861 Mon Sep 17 00:00:00 2001 From: Pol Pinol Castuera Date: Tue, 3 Jun 2025 19:56:18 +0200 Subject: [PATCH 10/13] add more resources for test --- src/test/resources/application-local.properties | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 src/test/resources/application-local.properties diff --git a/src/test/resources/application-local.properties b/src/test/resources/application-local.properties new file mode 100644 index 0000000..195f192 --- /dev/null +++ b/src/test/resources/application-local.properties @@ -0,0 +1,7 @@ +spring.main.banner-mode=off +spring.application.name=alerts + +# 1) Disable Mongo auto-config (you already had these) +spring.autoconfigure.exclude=\ + org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration,\ + org.springframework.boot.autoconfigure.data.mongo.MongoDataAutoConfiguration From 0dcb8049224dd6eb18957d5c598c6fa2d98c2161 Mon Sep 17 00:00:00 2001 From: Pol Pinol Castuera Date: Tue, 3 Jun 2025 19:59:42 +0200 Subject: [PATCH 11/13] fix resoruces for test --- src/test/resources/application-local.properties | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/test/resources/application-local.properties b/src/test/resources/application-local.properties index 195f192..2675adc 100644 --- a/src/test/resources/application-local.properties +++ b/src/test/resources/application-local.properties @@ -1,7 +1,15 @@ spring.main.banner-mode=off + spring.application.name=alerts -# 1) Disable Mongo auto-config (you already had these) spring.autoconfigure.exclude=\ org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration,\ - org.springframework.boot.autoconfigure.data.mongo.MongoDataAutoConfiguration + org.springframework.boot.autoconfigure.data.mongo.MongoDataAutoConfiguration,\ + com.google.cloud.spring.autoconfigure.core.GcpContextAutoConfiguration,\ + com.google.cloud.spring.autoconfigure.pubsub.GcpPubSubAutoConfiguration,\ + org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration,\ + org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration,\ + org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration,\ + org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration + +inbox.initial-size=10 From 5b1889ace836c82f253b515ecc6b819dcb226bd4 Mon Sep 17 00:00:00 2001 From: Pol Pinol Castuera Date: Tue, 3 Jun 2025 20:03:56 +0200 Subject: [PATCH 12/13] fix resoruces for test --- .../resources/application-local.properties | 18 +++++------------- src/test/resources/application.properties | 9 --------- 2 files changed, 5 insertions(+), 22 deletions(-) delete mode 100644 src/test/resources/application.properties diff --git a/src/test/resources/application-local.properties b/src/test/resources/application-local.properties index 2675adc..1ed44a7 100644 --- a/src/test/resources/application-local.properties +++ b/src/test/resources/application-local.properties @@ -1,15 +1,7 @@ -spring.main.banner-mode=off - -spring.application.name=alerts - spring.autoconfigure.exclude=\ - org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration,\ - org.springframework.boot.autoconfigure.data.mongo.MongoDataAutoConfiguration,\ - com.google.cloud.spring.autoconfigure.core.GcpContextAutoConfiguration,\ - com.google.cloud.spring.autoconfigure.pubsub.GcpPubSubAutoConfiguration,\ - org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration,\ - org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration,\ - org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration,\ - org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration + org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration,\ + org.springframework.boot.autoconfigure.data.mongo.MongoDataAutoConfiguration -inbox.initial-size=10 +spring.autoconfigure.exclude+=,\ + com.google.cloud.spring.autoconfigure.core.GcpContextAutoConfiguration,\ + com.google.cloud.spring.autoconfigure.pubsub.GcpPubSubAutoConfiguration \ No newline at end of file diff --git a/src/test/resources/application.properties b/src/test/resources/application.properties deleted file mode 100644 index 7aaf4f5..0000000 --- a/src/test/resources/application.properties +++ /dev/null @@ -1,9 +0,0 @@ -spring.application.name=alerts - -spring.autoconfigure.exclude=\ - org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration,\ - org.springframework.boot.autoconfigure.data.mongo.MongoDataAutoConfiguration - -spring.autoconfigure.exclude+=,\ - com.google.cloud.spring.autoconfigure.core.GcpContextAutoConfiguration,\ - com.google.cloud.spring.autoconfigure.pubsub.GcpPubSubAutoConfiguration \ No newline at end of file From f7bc85fca683fce62dac14b3c3b02a9db6068248 Mon Sep 17 00:00:00 2001 From: Pol Pinol Castuera Date: Tue, 3 Jun 2025 20:07:54 +0200 Subject: [PATCH 13/13] exclude everuthing --- src/main/resources/application-local.properties | 10 ++++------ src/test/resources/application-local.properties | 10 ++++------ 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/src/main/resources/application-local.properties b/src/main/resources/application-local.properties index 1ed44a7..ef1eefe 100644 --- a/src/main/resources/application-local.properties +++ b/src/main/resources/application-local.properties @@ -1,7 +1,5 @@ spring.autoconfigure.exclude=\ - org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration,\ - org.springframework.boot.autoconfigure.data.mongo.MongoDataAutoConfiguration - -spring.autoconfigure.exclude+=,\ - com.google.cloud.spring.autoconfigure.core.GcpContextAutoConfiguration,\ - com.google.cloud.spring.autoconfigure.pubsub.GcpPubSubAutoConfiguration \ No newline at end of file + org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration,\ + org.springframework.boot.autoconfigure.data.mongo.MongoDataAutoConfiguration,\ + com.google.cloud.spring.autoconfigure.core.GcpContextAutoConfiguration,\ + com.google.cloud.spring.autoconfigure.pubsub.GcpPubSubAutoConfiguration diff --git a/src/test/resources/application-local.properties b/src/test/resources/application-local.properties index 1ed44a7..ef1eefe 100644 --- a/src/test/resources/application-local.properties +++ b/src/test/resources/application-local.properties @@ -1,7 +1,5 @@ spring.autoconfigure.exclude=\ - org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration,\ - org.springframework.boot.autoconfigure.data.mongo.MongoDataAutoConfiguration - -spring.autoconfigure.exclude+=,\ - com.google.cloud.spring.autoconfigure.core.GcpContextAutoConfiguration,\ - com.google.cloud.spring.autoconfigure.pubsub.GcpPubSubAutoConfiguration \ No newline at end of file + org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration,\ + org.springframework.boot.autoconfigure.data.mongo.MongoDataAutoConfiguration,\ + com.google.cloud.spring.autoconfigure.core.GcpContextAutoConfiguration,\ + com.google.cloud.spring.autoconfigure.pubsub.GcpPubSubAutoConfiguration