Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ dependencies {
implementation 'org.springframework.boot:spring-boot-starter-data-redis'
runtimeOnly 'com.mysql:mysql-connector-j'
implementation 'org.springframework.kafka:spring-kafka'
implementation 'com.github.da-bom:lib-kafka:v1.0.0'
implementation 'com.github.da-bom:lib-kafka:v1.0.1'

// QueryDSL
implementation 'com.querydsl:querydsl-jpa:5.1.0:jakarta'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.project.domain.usage.entity;
package com.project.domain.eventoutbox.entity;

import java.time.LocalDateTime;

Expand All @@ -14,7 +14,7 @@
import jakarta.persistence.UniqueConstraint;

import com.project.common.util.BaseEntity;
import com.project.domain.usage.enums.UsageOutboxStatus;
import com.project.domain.eventoutbox.enums.EventOutboxStatus;

import lombok.AccessLevel;
import lombok.Builder;
Expand All @@ -32,7 +32,7 @@
indexes = {
@Index(name = "idx_usage_outbox_status_retry", columnList = "status, next_retry_at")
})
public class UsageEventOutbox extends BaseEntity {
public class EventOutbox extends BaseEntity {

@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
Expand All @@ -49,7 +49,7 @@ public class UsageEventOutbox extends BaseEntity {

@Enumerated(EnumType.STRING)
@Column(name = "status", nullable = false, length = 30)
private UsageOutboxStatus status;
private EventOutboxStatus status;

@Column(name = "payload_json", columnDefinition = "TEXT")
private String payloadJson;
Expand All @@ -64,12 +64,12 @@ public class UsageEventOutbox extends BaseEntity {
private String lastError;

@Builder
private UsageEventOutbox(
private EventOutbox(
Long id,
String eventId,
Long familyId,
Long customerId,
UsageOutboxStatus status,
EventOutboxStatus status,
String payloadJson,
int retryCount,
LocalDateTime nextRetryAt,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.project.domain.eventoutbox.enums;

public enum EventOutboxStatus {
PUBLISH_PENDING,
SENT,
FAILED
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.project.domain.usage.repository;
package com.project.domain.eventoutbox.repository;

import java.util.Optional;

Expand All @@ -7,11 +7,11 @@
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;

import com.project.domain.usage.entity.UsageEventOutbox;
import com.project.domain.eventoutbox.entity.EventOutbox;

public interface UsageEventOutboxRepository extends JpaRepository<UsageEventOutbox, Long> {
public interface EventOutboxRepository extends JpaRepository<EventOutbox, Long> {

Optional<UsageEventOutbox> findByEventId(String eventId);
Optional<EventOutbox> findByEventId(String eventId);

@Modifying
@Query(
Expand All @@ -31,25 +31,25 @@ int insertPublishPendingIgnore(
@Modifying
@Query(
"""
update UsageEventOutbox o
update EventOutbox o
set o.payloadJson = :payloadJson,
o.nextRetryAt = null,
o.lastError = null
where o.eventId = :eventId
and o.status = com.project.domain.usage.enums.UsageOutboxStatus.PUBLISH_PENDING
and o.status = com.project.domain.eventoutbox.enums.EventOutboxStatus.PUBLISH_PENDING
""")
int refreshPendingPayload(
@Param("eventId") String eventId, @Param("payloadJson") String payloadJson);

@Modifying
@Query(
"""
update UsageEventOutbox o
set o.status = com.project.domain.usage.enums.UsageOutboxStatus.SENT,
update EventOutbox o
set o.status = com.project.domain.eventoutbox.enums.EventOutboxStatus.SENT,
o.nextRetryAt = null,
o.lastError = null
where o.id = :outboxId
and o.status = com.project.domain.usage.enums.UsageOutboxStatus.PUBLISH_PENDING
and o.status = com.project.domain.eventoutbox.enums.EventOutboxStatus.PUBLISH_PENDING
""")
int markSentIfPending(@Param("outboxId") Long outboxId);
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.project.domain.usage.service.helper;
package com.project.domain.eventoutbox.service;

import java.util.Optional;

Expand All @@ -9,18 +9,18 @@
import com.dabom.messaging.kafka.event.dto.notification.NotificationPayload;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.project.domain.usage.enums.UsageOutboxStatus;
import com.project.domain.usage.repository.UsageEventOutboxRepository;
import com.project.domain.eventoutbox.enums.EventOutboxStatus;
import com.project.domain.eventoutbox.repository.EventOutboxRepository;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Service
@RequiredArgsConstructor
public class UsageEventOutboxService {
public class EventOutboxService {

private final UsageEventOutboxRepository usageEventOutboxRepository;
private final EventOutboxRepository eventOutboxRepository;
private final ObjectMapper objectMapper;

// notification 대상인 경우에만 PUBLISH_PENDING row를 보장한다.
Expand All @@ -32,12 +32,12 @@ public Optional<PendingNotificationDispatch> stageAfterRedisApplied(
}

String payloadJson = toJson(payload);
usageEventOutboxRepository.insertPublishPendingIgnore(
eventOutboxRepository.insertPublishPendingIgnore(
eventId, payload.familyId(), payload.customerId(), payloadJson);
usageEventOutboxRepository.refreshPendingPayload(eventId, payloadJson);
return usageEventOutboxRepository
eventOutboxRepository.refreshPendingPayload(eventId, payloadJson);
return eventOutboxRepository
.findByEventId(eventId)
.filter(row -> row.getStatus() == UsageOutboxStatus.PUBLISH_PENDING)
.filter(row -> row.getStatus() == EventOutboxStatus.PUBLISH_PENDING)
.map(
row ->
new PendingNotificationDispatch(
Expand All @@ -48,9 +48,9 @@ public Optional<PendingNotificationDispatch> stageAfterRedisApplied(
// eventId 기준으로 아직 발행되지 않은 notification payload를 찾는다.
@Transactional(readOnly = true)
public Optional<PendingNotificationDispatch> findPendingDispatchByEventId(String eventId) {
return usageEventOutboxRepository
return eventOutboxRepository
.findByEventId(eventId)
.filter(row -> row.getStatus() == UsageOutboxStatus.PUBLISH_PENDING)
.filter(row -> row.getStatus() == EventOutboxStatus.PUBLISH_PENDING)
.map(
row ->
new PendingNotificationDispatch(
Expand All @@ -61,7 +61,7 @@ public Optional<PendingNotificationDispatch> findPendingDispatchByEventId(String
// 발행 성공 시 Outbox 상태를 SENT로 변경한다.
@Transactional
public void markSent(Long outboxId) {
int updated = usageEventOutboxRepository.markSentIfPending(outboxId);
int updated = eventOutboxRepository.markSentIfPending(outboxId);
if (updated == 0) {
log.debug("Skip markSent because outbox is no longer pending. outboxId={}", outboxId);
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import com.project.common.config.TimeConfig;
import com.project.common.util.LogSanitizer;
import com.project.common.util.RedisKeyGenerator;
import com.project.domain.eventoutbox.service.EventOutboxService;
import com.project.domain.policy.helper.PolicyConstraintWarmupHelper;
import com.project.domain.usage.service.dto.UsageUpdateResult;
import com.project.domain.usage.service.helper.UsageEventOutboxService;
import com.project.domain.usage.service.helper.UsageFamilyMembershipCacheHelper;
import com.project.domain.usage.service.helper.UsageLuaExecutor;
import com.project.domain.usage.service.helper.UsageNotificationPayloadMapper;
Expand All @@ -46,7 +46,7 @@ public class UsageSyncServiceImpl implements UsageSyncService {
private final PolicyConstraintWarmupHelper policyConstraintWarmupHelper;
private final UsageLuaExecutor usageLuaExecutor;
private final UsagePersistService usagePersistService;
private final UsageEventOutboxService usageEventOutboxService;
private final EventOutboxService eventOutboxService;
private final UsageProcessingDecisionMapper usageProcessingDecisionMapper;
private final UsageNotificationPayloadMapper usageNotificationPayloadMapper;
private final UsageNotificationPublisher usageNotificationPublisher;
Expand Down Expand Up @@ -167,7 +167,7 @@ public void syncUsage(String eventId, String eventTime, UsagePayload payload) {
usageNotificationPayloadMapper.toNotificationPayload(
eventId, resolvedEventDateTime, payload, decision.notificationStatus());

usageEventOutboxService
eventOutboxService
.stageAfterRedisApplied(eventId, notificationPayload, true)
.ifPresent(this::publishAsync);
}
Expand Down Expand Up @@ -214,17 +214,17 @@ private void ensureWarmupOrThrow(

// 이미 만들어진 pending notification이 있으면 다시 즉시 발행을 시도한다.
private void dispatchPendingNotificationIfExists(String eventId) {
usageEventOutboxService.findPendingDispatchByEventId(eventId).ifPresent(this::publishAsync);
eventOutboxService.findPendingDispatchByEventId(eventId).ifPresent(this::publishAsync);
}

// notification은 비동기로 발행하고 성공 시에만 SENT로 마감한다.
private void publishAsync(UsageEventOutboxService.PendingNotificationDispatch pending) {
private void publishAsync(EventOutboxService.PendingNotificationDispatch pending) {
usageNotificationPublisher
.publishAsync(pending.payload())
.whenComplete(
(SendResult<String, String> ignored, Throwable throwable) -> {
if (throwable == null) {
usageEventOutboxService.markSent(pending.outboxId());
eventOutboxService.markSent(pending.outboxId());
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ private NotificationPayload buildBlockedAlert(
return new NotificationPayload(
usagePayload.familyId(),
usagePayload.customerId(),
NotificationType.BLOCKED,
NotificationType.CUSTOMER_BLOCKED,
"데이터 사용 차단",
message,
data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@
import com.dabom.messaging.kafka.metrics.KafkaMetrics;
import com.project.common.util.LogSanitizer;
import com.project.common.util.RedisKeyGenerator;
import com.project.domain.eventoutbox.service.EventOutboxService;
import com.project.domain.policy.helper.PolicyConstraintWarmupHelper;
import com.project.domain.usage.service.dto.UsageUpdateResult;
import com.project.domain.usage.service.helper.UsageEventOutboxService;
import com.project.domain.usage.service.helper.UsageFamilyMembershipCacheHelper;
import com.project.domain.usage.service.helper.UsageLuaExecutor;
import com.project.domain.usage.service.helper.UsageNotificationPayloadMapper;
Expand All @@ -57,7 +57,7 @@ class UsageSyncServiceImplTest {
@Mock private PolicyConstraintWarmupHelper policyConstraintWarmupHelper;
@Mock private UsageLuaExecutor usageLuaExecutor;
@Mock private UsagePersistService usagePersistService;
@Mock private UsageEventOutboxService usageEventOutboxService;
@Mock private EventOutboxService eventOutboxService;
@Mock private UsageProcessingDecisionMapper usageProcessingDecisionMapper;
@Mock private UsageNotificationPayloadMapper usageNotificationPayloadMapper;
@Mock private UsageNotificationPublisher usageNotificationPublisher;
Expand Down Expand Up @@ -101,10 +101,10 @@ void syncUsage_SuccessFlow() {
usageNotificationPayloadMapper.toNotificationPayload(
any(), any(), any(), eq("WARNING_10")))
.willReturn(notificationPayload);
given(usageEventOutboxService.stageAfterRedisApplied(eventId, notificationPayload, true))
given(eventOutboxService.stageAfterRedisApplied(eventId, notificationPayload, true))
.willReturn(
Optional.of(
new UsageEventOutboxService.PendingNotificationDispatch(
new EventOutboxService.PendingNotificationDispatch(
11L, notificationPayload)));
given(usageNotificationPublisher.publishAsync(notificationPayload))
.willReturn(CompletableFuture.completedFuture(null));
Expand Down Expand Up @@ -132,7 +132,7 @@ void syncUsage_SuccessFlow() {

verify(usagePersistService).persistFromUsageEvent(eventId, eventTime, payload, "ALLOWED");
verify(usageNotificationPublisher).publishAsync(notificationPayload);
verify(usageEventOutboxService).markSent(11L);
verify(eventOutboxService).markSent(11L);
}

@Test
Expand Down Expand Up @@ -195,10 +195,10 @@ void syncUsage_DuplicateStillReentersPersist() {
usageNotificationPayloadMapper.toNotificationPayload(
any(), any(), any(), eq("WARNING_10")))
.willReturn(notificationPayload);
given(usageEventOutboxService.stageAfterRedisApplied(eventId, notificationPayload, true))
given(eventOutboxService.stageAfterRedisApplied(eventId, notificationPayload, true))
.willReturn(
Optional.of(
new UsageEventOutboxService.PendingNotificationDispatch(
new EventOutboxService.PendingNotificationDispatch(
21L, notificationPayload)));
given(usageNotificationPublisher.publishAsync(notificationPayload))
.willReturn(CompletableFuture.completedFuture(null));
Expand Down Expand Up @@ -234,20 +234,20 @@ void syncUsage_DuplicateRepublishesExistingPendingNotification() {
new UsageUpdateResult(
5000L, 5000L, "NORMAL", 1000L, 0.1, 10000L, false, true));
given(usageProcessingDecisionMapper.fromLuaStatus("NORMAL")).willReturn(decision);
given(usageEventOutboxService.findPendingDispatchByEventId(eventId))
given(eventOutboxService.findPendingDispatchByEventId(eventId))
.willReturn(
Optional.of(
new UsageEventOutboxService.PendingNotificationDispatch(
new EventOutboxService.PendingNotificationDispatch(
31L, notificationPayload)));
given(usageNotificationPublisher.publishAsync(notificationPayload))
.willReturn(CompletableFuture.completedFuture(null));

usageSyncServiceImpl.syncUsage(eventId, eventTime, payload);

verify(usagePersistService).persistFromUsageEvent(eventId, eventTime, payload, "ALLOWED");
verify(usageEventOutboxService, never()).stageAfterRedisApplied(any(), any(), anyBoolean());
verify(eventOutboxService, never()).stageAfterRedisApplied(any(), any(), anyBoolean());
verify(usageNotificationPublisher).publishAsync(notificationPayload);
verify(usageEventOutboxService).markSent(31L);
verify(eventOutboxService).markSent(31L);
}

@Test
Expand All @@ -267,15 +267,15 @@ void syncUsage_SkipsNotificationWhenShouldNotifyFalse() {
new UsageUpdateResult(
5000L, 5000L, "APP_BLOCK", 1000L, 0.1, 10000L, false, false));
given(usageProcessingDecisionMapper.fromLuaStatus("APP_BLOCK")).willReturn(decision);
given(usageEventOutboxService.findPendingDispatchByEventId(eventId))
given(eventOutboxService.findPendingDispatchByEventId(eventId))
.willReturn(Optional.empty());

usageSyncServiceImpl.syncUsage(eventId, eventTime, payload);

verify(usagePersistService).persistFromUsageEvent(eventId, eventTime, payload, "APP_BLOCK");
verify(usageNotificationPayloadMapper, never())
.toNotificationPayload(any(), any(), any(), any());
verify(usageEventOutboxService, never()).stageAfterRedisApplied(any(), any(), anyBoolean());
verify(eventOutboxService, never()).stageAfterRedisApplied(any(), any(), anyBoolean());
verify(usageNotificationPublisher, never()).publishAsync(any());
}

Expand All @@ -296,15 +296,15 @@ void syncUsage_NormalEventSkipsPayloadCreation() {
new UsageUpdateResult(
5000L, 5000L, "NORMAL", 1000L, 0.1, 10000L, false, false));
given(usageProcessingDecisionMapper.fromLuaStatus("NORMAL")).willReturn(decision);
given(usageEventOutboxService.findPendingDispatchByEventId(eventId))
given(eventOutboxService.findPendingDispatchByEventId(eventId))
.willReturn(Optional.empty());

usageSyncServiceImpl.syncUsage(eventId, eventTime, payload);

verify(usagePersistService).persistFromUsageEvent(eventId, eventTime, payload, "ALLOWED");
verify(usageNotificationPayloadMapper, never())
.toNotificationPayload(any(), any(), any(), any());
verify(usageEventOutboxService, never()).stageAfterRedisApplied(any(), any(), anyBoolean());
verify(eventOutboxService, never()).stageAfterRedisApplied(any(), any(), anyBoolean());
}

@Test
Expand Down
Loading
Loading