From 04268e0790ca52735160895970b4cc3c82286de4 Mon Sep 17 00:00:00 2001 From: andrew Date: Mon, 26 Jan 2026 18:10:35 +0900 Subject: [PATCH 1/2] =?UTF-8?q?UPLUS-143=20feat:=20=EC=B2=AD=EA=B5=AC?= =?UTF-8?q?=EC=84=9C=20=EC=95=8C=EB=A6=BC=20Consumer=20=EB=A1=9C=EC=A7=81?= =?UTF-8?q?=20=EA=B5=AC=ED=98=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../TestNotificationController.java | 8 +- .../consumer/InvoiceConsumer.java | 95 +++++++++++++++++++ .../consumer/NotificationConsumer.java | 8 +- ...ationEvent.java => NotificationEvent.java} | 2 +- .../service/MessageSendService.java | 23 +++-- 5 files changed, 118 insertions(+), 18 deletions(-) create mode 100644 src/main/java/com/project/notification/consumer/InvoiceConsumer.java rename src/main/java/com/project/notification/consumer/{UsageNotificationEvent.java => NotificationEvent.java} (89%) diff --git a/src/main/java/com/project/controller/TestNotificationController.java b/src/main/java/com/project/controller/TestNotificationController.java index 61cbe03..33f0b61 100644 --- a/src/main/java/com/project/controller/TestNotificationController.java +++ b/src/main/java/com/project/controller/TestNotificationController.java @@ -7,7 +7,7 @@ import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; -import com.project.notification.consumer.UsageNotificationEvent; +import com.project.notification.consumer.NotificationEvent; import com.project.notification.service.MessageSendService; import lombok.RequiredArgsConstructor; @@ -34,11 +34,11 @@ public String sendTest(@RequestBody Map request) { Map variables = (Map) request.get("variables"); // 2. 가짜 이벤트 객체(UsageNotificationEvent) 생성 - UsageNotificationEvent event = - new UsageNotificationEvent( + NotificationEvent event = + new NotificationEvent( UUID.randomUUID(), // 임의의 Event ID 생성 templateGroupId, - new UsageNotificationEvent.SubscriptionInfo(subId, phoneNumber, email), + new NotificationEvent.SubscriptionInfo(subId, phoneNumber, email), variables); log.info("[TEST TRIGGER] subId={}, groupId={}", subId, templateGroupId); diff --git a/src/main/java/com/project/notification/consumer/InvoiceConsumer.java b/src/main/java/com/project/notification/consumer/InvoiceConsumer.java new file mode 100644 index 0000000..11121f6 --- /dev/null +++ b/src/main/java/com/project/notification/consumer/InvoiceConsumer.java @@ -0,0 +1,95 @@ +package com.project.notification.consumer; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.stereotype.Component; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.project.notification.service.MessageSendService; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Timer; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Component +@RequiredArgsConstructor +public class InvoiceConsumer { + + private final NotificationSendDedupService dedupService; + private final ObjectMapper objectMapper; + private final MessageSendService messageSendService; + private final Counter kafkaBatchProcessedCounter; + private final Timer kafkaBatchProcessingTimer; + + @KafkaListener(topics = "invoice_tp", containerFactory = "kafkaListenerContainerFactory") + public void consume(List> records, Acknowledgment ack) { + + final String threadName = Thread.currentThread().getName(); + final int batchSize = records.size(); + + log.info("[BATCH START] thread={}, records={}", threadName, batchSize); + + long batchStart = System.currentTimeMillis(); + + List events = new ArrayList<>(batchSize); + List eventIds = new ArrayList<>(batchSize); + + for (ConsumerRecord record : records) { + try { + NotificationEvent event = + objectMapper.readValue(record.value(), NotificationEvent.class); + events.add(event); + eventIds.add(event.eventId().toString()); + } catch (Exception e) { + log.warn("[DESERIALIZE FAIL] offset={}", record.offset(), e); + } + } + + List dedupResults = dedupService.tryAcquireBatch(eventIds); + + int processed = 0; + int skipped = 0; + + for (int i = 0; i < events.size(); i++) { + if (!dedupResults.get(i)) { + skipped++; + continue; + } + + try { + NotificationEvent event = events.get(i); + messageSendService.processEvent(event); + processed++; + } catch (Exception e) { + log.error("[PROCESS FAIL] eventId={}", eventIds.get(i), e); + } + } + + ack.acknowledge(); + + long elapsedMs = System.currentTimeMillis() - batchStart; + double tps = elapsedMs > 0 ? processed / (elapsedMs / 1000.0) : 0; + + // Record metrics + kafkaBatchProcessedCounter.increment(processed); + kafkaBatchProcessingTimer.record(elapsedMs, TimeUnit.MILLISECONDS); + + log.info( + "[BATCH DONE] thread={}, batchSize={}, processed={}, skipped={}, elapsed={}ms," + + " tps={}", + threadName, + batchSize, + processed, + skipped, + elapsedMs, + String.format("%.0f", tps)); + } +} diff --git a/src/main/java/com/project/notification/consumer/NotificationConsumer.java b/src/main/java/com/project/notification/consumer/NotificationConsumer.java index 6d877fd..416e70c 100644 --- a/src/main/java/com/project/notification/consumer/NotificationConsumer.java +++ b/src/main/java/com/project/notification/consumer/NotificationConsumer.java @@ -39,13 +39,13 @@ public void consume(List> records, Acknowledgment long batchStart = System.currentTimeMillis(); - List events = new ArrayList<>(batchSize); + List events = new ArrayList<>(batchSize); List eventIds = new ArrayList<>(batchSize); for (ConsumerRecord record : records) { try { - UsageNotificationEvent event = - objectMapper.readValue(record.value(), UsageNotificationEvent.class); + NotificationEvent event = + objectMapper.readValue(record.value(), NotificationEvent.class); events.add(event); eventIds.add(event.eventId().toString()); } catch (Exception e) { @@ -65,7 +65,7 @@ public void consume(List> records, Acknowledgment } try { - UsageNotificationEvent event = events.get(i); + NotificationEvent event = events.get(i); messageSendService.processEvent(event); processed++; } catch (Exception e) { diff --git a/src/main/java/com/project/notification/consumer/UsageNotificationEvent.java b/src/main/java/com/project/notification/consumer/NotificationEvent.java similarity index 89% rename from src/main/java/com/project/notification/consumer/UsageNotificationEvent.java rename to src/main/java/com/project/notification/consumer/NotificationEvent.java index 77362ec..29e20d2 100644 --- a/src/main/java/com/project/notification/consumer/UsageNotificationEvent.java +++ b/src/main/java/com/project/notification/consumer/NotificationEvent.java @@ -3,7 +3,7 @@ import java.util.Map; import java.util.UUID; -public record UsageNotificationEvent( +public record NotificationEvent( UUID eventId, Long templateGroupId, SubscriptionInfo subscriptionInfo, diff --git a/src/main/java/com/project/notification/service/MessageSendService.java b/src/main/java/com/project/notification/service/MessageSendService.java index e49bb16..e6b8743 100644 --- a/src/main/java/com/project/notification/service/MessageSendService.java +++ b/src/main/java/com/project/notification/service/MessageSendService.java @@ -12,7 +12,7 @@ import com.project.global.util.AesUtil; import com.project.global.util.MaskingUtil; -import com.project.notification.consumer.UsageNotificationEvent; +import com.project.notification.consumer.NotificationEvent; import com.project.notification.dto.EmailSendRequest; import com.project.notification.dto.SendResponse; import com.project.notification.dto.SmsSendRequest; @@ -56,10 +56,15 @@ public class MessageSendService { private final Timer smsProcessingTimer; // variables 중 암호화된 필드 키 목록 - private static final Set ENCRYPTED_KEYS = Set.of("phone_number", "email"); + private static final Set ENCRYPTED_KEYS = Set.of("phoneNumber", "email"); @Transactional - public void processEvent(UsageNotificationEvent event) { + public void processEvent(NotificationEvent event) { + + if (event.subscriptionInfo().subId() != 1001) { + return; + } + long startTime = System.currentTimeMillis(); Map maskedVariables = prepareVariablesForSending(event.variables()); @@ -110,9 +115,9 @@ private Map prepareVariablesForSending(Map rawVa } private boolean tryEmailSend( - UsageNotificationEvent event, Map maskedVariables, long startTime) { + NotificationEvent event, Map maskedVariables, long startTime) { long emailStartTime = System.currentTimeMillis(); - UsageNotificationEvent.SubscriptionInfo subInfo = event.subscriptionInfo(); + NotificationEvent.SubscriptionInfo subInfo = event.subscriptionInfo(); String email = aesUtil.decrypt(subInfo.email()); if (email == null || email.isBlank()) { @@ -222,9 +227,9 @@ private boolean tryEmailSend( } private void trySmsSend( - UsageNotificationEvent event, boolean isFallback, Map maskedVariables) { + NotificationEvent event, boolean isFallback, Map maskedVariables) { long smsStartTime = System.currentTimeMillis(); - UsageNotificationEvent.SubscriptionInfo subInfo = event.subscriptionInfo(); + NotificationEvent.SubscriptionInfo subInfo = event.subscriptionInfo(); String phoneNumber = aesUtil.decrypt(subInfo.phoneNumber()); if (phoneNumber == null || phoneNumber.isBlank()) { @@ -321,7 +326,7 @@ private void trySmsSend( } private void saveMessageLog( - UsageNotificationEvent event, + NotificationEvent event, Map maskedVariables, Long templateVersionId, Channel channel, @@ -329,7 +334,7 @@ private void saveMessageLog( String errorMessage, Long processingTimeMs) { - UsageNotificationEvent.SubscriptionInfo subInfo = event.subscriptionInfo(); + NotificationEvent.SubscriptionInfo subInfo = event.subscriptionInfo(); String recipientEnc = channel == Channel.EMAIL ? subInfo.email() : subInfo.phoneNumber(); Map payload = new HashMap<>(); From c7a68a15ef39a97ea2bac1a223b5525b418070e8 Mon Sep 17 00:00:00 2001 From: andrew Date: Mon, 26 Jan 2026 22:50:18 +0900 Subject: [PATCH 2/2] =?UTF-8?q?UPLUS-143=20feat:=20=EC=B2=AD=EA=B5=AC?= =?UTF-8?q?=EC=84=9C=20=EC=95=8C=EB=A6=BC=20=EC=9D=B4=EB=B2=A4=ED=8A=B8=20?= =?UTF-8?q?=EC=88=98=EC=8B=A0=20=EB=A1=9C=EC=A7=81=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/project/notification/consumer/InvoiceConsumer.java | 5 ++++- .../project/notification/consumer/NotificationConsumer.java | 5 ++++- .../notification/consumer/NotificationSendDedupService.java | 3 +++ .../com/project/notification/service/MessageSendService.java | 4 +++- 4 files changed, 14 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/project/notification/consumer/InvoiceConsumer.java b/src/main/java/com/project/notification/consumer/InvoiceConsumer.java index 11121f6..b758e5f 100644 --- a/src/main/java/com/project/notification/consumer/InvoiceConsumer.java +++ b/src/main/java/com/project/notification/consumer/InvoiceConsumer.java @@ -29,7 +29,10 @@ public class InvoiceConsumer { private final Counter kafkaBatchProcessedCounter; private final Timer kafkaBatchProcessingTimer; - @KafkaListener(topics = "invoice_tp", containerFactory = "kafkaListenerContainerFactory") + @KafkaListener( + topics = "invoice", + containerFactory = "kafkaListenerContainerFactory", + autoStartup = "true") public void consume(List> records, Acknowledgment ack) { final String threadName = Thread.currentThread().getName(); diff --git a/src/main/java/com/project/notification/consumer/NotificationConsumer.java b/src/main/java/com/project/notification/consumer/NotificationConsumer.java index 416e70c..37d4af5 100644 --- a/src/main/java/com/project/notification/consumer/NotificationConsumer.java +++ b/src/main/java/com/project/notification/consumer/NotificationConsumer.java @@ -29,7 +29,10 @@ public class NotificationConsumer { private final Counter kafkaBatchProcessedCounter; private final Timer kafkaBatchProcessingTimer; - @KafkaListener(topics = "usage-noti", containerFactory = "kafkaListenerContainerFactory") + @KafkaListener( + topics = "usage", + containerFactory = "kafkaListenerContainerFactory", + autoStartup = "false") public void consume(List> records, Acknowledgment ack) { final String threadName = Thread.currentThread().getName(); diff --git a/src/main/java/com/project/notification/consumer/NotificationSendDedupService.java b/src/main/java/com/project/notification/consumer/NotificationSendDedupService.java index ad9a87b..a41bcb2 100644 --- a/src/main/java/com/project/notification/consumer/NotificationSendDedupService.java +++ b/src/main/java/com/project/notification/consumer/NotificationSendDedupService.java @@ -8,9 +8,11 @@ import org.springframework.stereotype.Service; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; @Service @RequiredArgsConstructor +@Slf4j public class NotificationSendDedupService { private final StringRedisTemplate redisTemplate; @@ -26,6 +28,7 @@ public boolean tryAcquire(String eventId) { public List tryAcquireBatch(List eventIds) { if (eventIds == null || eventIds.isEmpty()) { + log.info("evenIds is null or empty"); return List.of(); } diff --git a/src/main/java/com/project/notification/service/MessageSendService.java b/src/main/java/com/project/notification/service/MessageSendService.java index e6b8743..ed8ebda 100644 --- a/src/main/java/com/project/notification/service/MessageSendService.java +++ b/src/main/java/com/project/notification/service/MessageSendService.java @@ -61,7 +61,7 @@ public class MessageSendService { @Transactional public void processEvent(NotificationEvent event) { - if (event.subscriptionInfo().subId() != 1001) { + if (event.subscriptionInfo().subId() != 1001 && event.subscriptionInfo().subId() != 1002) { return; } @@ -99,6 +99,8 @@ private Map prepareVariablesForSending(Map rawVa processed.forEach( (key, value) -> { if (ENCRYPTED_KEYS.contains(key)) { + log.info("Encrypted key: {}", key); + log.info("Encrypted value: {}", value); try { // 복호화 시도 String decrypted = aesUtil.decrypt(value.toString());