diff --git a/src/main/java/com/project/controller/TestNotificationController.java b/src/main/java/com/project/controller/TestNotificationController.java index 61cbe03..33f0b61 100644 --- a/src/main/java/com/project/controller/TestNotificationController.java +++ b/src/main/java/com/project/controller/TestNotificationController.java @@ -7,7 +7,7 @@ import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; -import com.project.notification.consumer.UsageNotificationEvent; +import com.project.notification.consumer.NotificationEvent; import com.project.notification.service.MessageSendService; import lombok.RequiredArgsConstructor; @@ -34,11 +34,11 @@ public String sendTest(@RequestBody Map request) { Map variables = (Map) request.get("variables"); // 2. 가짜 이벤트 객체(UsageNotificationEvent) 생성 - UsageNotificationEvent event = - new UsageNotificationEvent( + NotificationEvent event = + new NotificationEvent( UUID.randomUUID(), // 임의의 Event ID 생성 templateGroupId, - new UsageNotificationEvent.SubscriptionInfo(subId, phoneNumber, email), + new NotificationEvent.SubscriptionInfo(subId, phoneNumber, email), variables); log.info("[TEST TRIGGER] subId={}, groupId={}", subId, templateGroupId); diff --git a/src/main/java/com/project/notification/consumer/InvoiceConsumer.java b/src/main/java/com/project/notification/consumer/InvoiceConsumer.java new file mode 100644 index 0000000..b758e5f --- /dev/null +++ b/src/main/java/com/project/notification/consumer/InvoiceConsumer.java @@ -0,0 +1,98 @@ +package com.project.notification.consumer; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.stereotype.Component; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.project.notification.service.MessageSendService; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Timer; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Component +@RequiredArgsConstructor +public class InvoiceConsumer { + + private final NotificationSendDedupService dedupService; + private final ObjectMapper objectMapper; + private final MessageSendService messageSendService; + private final Counter kafkaBatchProcessedCounter; + private final Timer kafkaBatchProcessingTimer; + + @KafkaListener( + topics = "invoice", + containerFactory = "kafkaListenerContainerFactory", + autoStartup = "true") + public void consume(List> records, Acknowledgment ack) { + + final String threadName = Thread.currentThread().getName(); + final int batchSize = records.size(); + + log.info("[BATCH START] thread={}, records={}", threadName, batchSize); + + long batchStart = System.currentTimeMillis(); + + List events = new ArrayList<>(batchSize); + List eventIds = new ArrayList<>(batchSize); + + for (ConsumerRecord record : records) { + try { + NotificationEvent event = + objectMapper.readValue(record.value(), NotificationEvent.class); + events.add(event); + eventIds.add(event.eventId().toString()); + } catch (Exception e) { + log.warn("[DESERIALIZE FAIL] offset={}", record.offset(), e); + } + } + + List dedupResults = dedupService.tryAcquireBatch(eventIds); + + int processed = 0; + int skipped = 0; + + for (int i = 0; i < events.size(); i++) { + if (!dedupResults.get(i)) { + skipped++; + continue; + } + + try { + NotificationEvent event = events.get(i); + messageSendService.processEvent(event); + processed++; + } catch (Exception e) { + log.error("[PROCESS FAIL] eventId={}", eventIds.get(i), e); + } + } + + ack.acknowledge(); + + long elapsedMs = System.currentTimeMillis() - batchStart; + double tps = elapsedMs > 0 ? processed / (elapsedMs / 1000.0) : 0; + + // Record metrics + kafkaBatchProcessedCounter.increment(processed); + kafkaBatchProcessingTimer.record(elapsedMs, TimeUnit.MILLISECONDS); + + log.info( + "[BATCH DONE] thread={}, batchSize={}, processed={}, skipped={}, elapsed={}ms," + + " tps={}", + threadName, + batchSize, + processed, + skipped, + elapsedMs, + String.format("%.0f", tps)); + } +} diff --git a/src/main/java/com/project/notification/consumer/NotificationConsumer.java b/src/main/java/com/project/notification/consumer/NotificationConsumer.java index 6d877fd..37d4af5 100644 --- a/src/main/java/com/project/notification/consumer/NotificationConsumer.java +++ b/src/main/java/com/project/notification/consumer/NotificationConsumer.java @@ -29,7 +29,10 @@ public class NotificationConsumer { private final Counter kafkaBatchProcessedCounter; private final Timer kafkaBatchProcessingTimer; - @KafkaListener(topics = "usage-noti", containerFactory = "kafkaListenerContainerFactory") + @KafkaListener( + topics = "usage", + containerFactory = "kafkaListenerContainerFactory", + autoStartup = "false") public void consume(List> records, Acknowledgment ack) { final String threadName = Thread.currentThread().getName(); @@ -39,13 +42,13 @@ public void consume(List> records, Acknowledgment long batchStart = System.currentTimeMillis(); - List events = new ArrayList<>(batchSize); + List events = new ArrayList<>(batchSize); List eventIds = new ArrayList<>(batchSize); for (ConsumerRecord record : records) { try { - UsageNotificationEvent event = - objectMapper.readValue(record.value(), UsageNotificationEvent.class); + NotificationEvent event = + objectMapper.readValue(record.value(), NotificationEvent.class); events.add(event); eventIds.add(event.eventId().toString()); } catch (Exception e) { @@ -65,7 +68,7 @@ public void consume(List> records, Acknowledgment } try { - UsageNotificationEvent event = events.get(i); + NotificationEvent event = events.get(i); messageSendService.processEvent(event); processed++; } catch (Exception e) { diff --git a/src/main/java/com/project/notification/consumer/UsageNotificationEvent.java b/src/main/java/com/project/notification/consumer/NotificationEvent.java similarity index 89% rename from src/main/java/com/project/notification/consumer/UsageNotificationEvent.java rename to src/main/java/com/project/notification/consumer/NotificationEvent.java index 77362ec..29e20d2 100644 --- a/src/main/java/com/project/notification/consumer/UsageNotificationEvent.java +++ b/src/main/java/com/project/notification/consumer/NotificationEvent.java @@ -3,7 +3,7 @@ import java.util.Map; import java.util.UUID; -public record UsageNotificationEvent( +public record NotificationEvent( UUID eventId, Long templateGroupId, SubscriptionInfo subscriptionInfo, diff --git a/src/main/java/com/project/notification/consumer/NotificationSendDedupService.java b/src/main/java/com/project/notification/consumer/NotificationSendDedupService.java index ad9a87b..a41bcb2 100644 --- a/src/main/java/com/project/notification/consumer/NotificationSendDedupService.java +++ b/src/main/java/com/project/notification/consumer/NotificationSendDedupService.java @@ -8,9 +8,11 @@ import org.springframework.stereotype.Service; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; @Service @RequiredArgsConstructor +@Slf4j public class NotificationSendDedupService { private final StringRedisTemplate redisTemplate; @@ -26,6 +28,7 @@ public boolean tryAcquire(String eventId) { public List tryAcquireBatch(List eventIds) { if (eventIds == null || eventIds.isEmpty()) { + log.info("evenIds is null or empty"); return List.of(); } diff --git a/src/main/java/com/project/notification/service/MessageSendService.java b/src/main/java/com/project/notification/service/MessageSendService.java index e49bb16..ed8ebda 100644 --- a/src/main/java/com/project/notification/service/MessageSendService.java +++ b/src/main/java/com/project/notification/service/MessageSendService.java @@ -12,7 +12,7 @@ import com.project.global.util.AesUtil; import com.project.global.util.MaskingUtil; -import com.project.notification.consumer.UsageNotificationEvent; +import com.project.notification.consumer.NotificationEvent; import com.project.notification.dto.EmailSendRequest; import com.project.notification.dto.SendResponse; import com.project.notification.dto.SmsSendRequest; @@ -56,10 +56,15 @@ public class MessageSendService { private final Timer smsProcessingTimer; // variables 중 암호화된 필드 키 목록 - private static final Set ENCRYPTED_KEYS = Set.of("phone_number", "email"); + private static final Set ENCRYPTED_KEYS = Set.of("phoneNumber", "email"); @Transactional - public void processEvent(UsageNotificationEvent event) { + public void processEvent(NotificationEvent event) { + + if (event.subscriptionInfo().subId() != 1001 && event.subscriptionInfo().subId() != 1002) { + return; + } + long startTime = System.currentTimeMillis(); Map maskedVariables = prepareVariablesForSending(event.variables()); @@ -94,6 +99,8 @@ private Map prepareVariablesForSending(Map rawVa processed.forEach( (key, value) -> { if (ENCRYPTED_KEYS.contains(key)) { + log.info("Encrypted key: {}", key); + log.info("Encrypted value: {}", value); try { // 복호화 시도 String decrypted = aesUtil.decrypt(value.toString()); @@ -110,9 +117,9 @@ private Map prepareVariablesForSending(Map rawVa } private boolean tryEmailSend( - UsageNotificationEvent event, Map maskedVariables, long startTime) { + NotificationEvent event, Map maskedVariables, long startTime) { long emailStartTime = System.currentTimeMillis(); - UsageNotificationEvent.SubscriptionInfo subInfo = event.subscriptionInfo(); + NotificationEvent.SubscriptionInfo subInfo = event.subscriptionInfo(); String email = aesUtil.decrypt(subInfo.email()); if (email == null || email.isBlank()) { @@ -222,9 +229,9 @@ private boolean tryEmailSend( } private void trySmsSend( - UsageNotificationEvent event, boolean isFallback, Map maskedVariables) { + NotificationEvent event, boolean isFallback, Map maskedVariables) { long smsStartTime = System.currentTimeMillis(); - UsageNotificationEvent.SubscriptionInfo subInfo = event.subscriptionInfo(); + NotificationEvent.SubscriptionInfo subInfo = event.subscriptionInfo(); String phoneNumber = aesUtil.decrypt(subInfo.phoneNumber()); if (phoneNumber == null || phoneNumber.isBlank()) { @@ -321,7 +328,7 @@ private void trySmsSend( } private void saveMessageLog( - UsageNotificationEvent event, + NotificationEvent event, Map maskedVariables, Long templateVersionId, Channel channel, @@ -329,7 +336,7 @@ private void saveMessageLog( String errorMessage, Long processingTimeMs) { - UsageNotificationEvent.SubscriptionInfo subInfo = event.subscriptionInfo(); + NotificationEvent.SubscriptionInfo subInfo = event.subscriptionInfo(); String recipientEnc = channel == Channel.EMAIL ? subInfo.email() : subInfo.phoneNumber(); Map payload = new HashMap<>();