From 070fb543b9df3e85264468b971bb71329935eb74 Mon Sep 17 00:00:00 2001 From: andrew Date: Sun, 25 Jan 2026 14:20:19 +0900 Subject: [PATCH 1/4] =?UTF-8?q?UPLUS-134=20feat:=20Kafka=20Consumer=20?= =?UTF-8?q?=EC=84=B1=EB=8A=A5=20=EA=B0=9C=EC=84=A0=20=EB=B0=8F=20Redis=20?= =?UTF-8?q?=EC=84=B1=EB=8A=A5=EA=B0=9C=EC=84=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../project/global/config/KafkaConfig.java | 9 ++- .../project/global/config/RedisLuaConfig.java | 21 ++++++ .../consumer/NotificationConsumer.java | 69 ++++++++++++++----- .../NotificationSendDedupService.java | 21 ++++-- .../consumer/SendNotificationLogger.java | 2 +- src/main/resources/redis/dedup_batch.lua | 14 ++++ 6 files changed, 110 insertions(+), 26 deletions(-) create mode 100644 src/main/java/com/project/global/config/RedisLuaConfig.java create mode 100644 src/main/resources/redis/dedup_batch.lua diff --git a/src/main/java/com/project/global/config/KafkaConfig.java b/src/main/java/com/project/global/config/KafkaConfig.java index 522a3e6..f172b9d 100644 --- a/src/main/java/com/project/global/config/KafkaConfig.java +++ b/src/main/java/com/project/global/config/KafkaConfig.java @@ -7,7 +7,6 @@ import org.springframework.core.env.Environment; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.listener.ContainerProperties; @@ -16,17 +15,17 @@ public class KafkaConfig { @Bean - public KafkaListenerContainerFactory kafkaListenerContainerFactory( + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( ConsumerFactory consumerFactory, Environment env) { - ConcurrentKafkaListenerContainerFactory factory = - new ConcurrentKafkaListenerContainerFactory<>(); - + var factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory); boolean isWorker = Arrays.asList(env.getActiveProfiles()).contains("notification-worker"); factory.setAutoStartup(isWorker); + factory.setBatchListener(true); + factory.setConcurrency(6); // 🔥 핵심 factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); return factory; diff --git a/src/main/java/com/project/global/config/RedisLuaConfig.java b/src/main/java/com/project/global/config/RedisLuaConfig.java new file mode 100644 index 0000000..02cb743 --- /dev/null +++ b/src/main/java/com/project/global/config/RedisLuaConfig.java @@ -0,0 +1,21 @@ +package com.project.global.config; + +import java.util.List; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.io.ClassPathResource; +import org.springframework.data.redis.core.script.DefaultRedisScript; +import org.springframework.data.redis.core.script.RedisScript; + +@Configuration +public class RedisLuaConfig { + + @Bean + public RedisScript dedupBatchScript() { + DefaultRedisScript script = new DefaultRedisScript<>(); + script.setLocation(new ClassPathResource("redis/dedup_batch.lua")); + script.setResultType(List.class); + return script; + } +} diff --git a/src/main/java/com/project/notification/consumer/NotificationConsumer.java b/src/main/java/com/project/notification/consumer/NotificationConsumer.java index 549af1b..a72f1a6 100644 --- a/src/main/java/com/project/notification/consumer/NotificationConsumer.java +++ b/src/main/java/com/project/notification/consumer/NotificationConsumer.java @@ -1,6 +1,9 @@ package com.project.notification.consumer; import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -28,30 +31,64 @@ public class NotificationConsumer { topics = "notification-usage", containerFactory = "kafkaListenerContainerFactory") @Profile("notification-worker") - public void consume(ConsumerRecord record, Acknowledgment ack) { - log.info("CONSUME START offset={}, value={}", record.offset(), record.value()); + public void consume(List> records, Acknowledgment ack) { - try { - UsageNotificationEvent event = - objectMapper.readValue(record.value(), UsageNotificationEvent.class); + final String threadName = Thread.currentThread().getName(); + final int batchSize = records.size(); + + log.info("[BATCH START] thread={}, records={}", threadName, batchSize); + + long batchStart = System.currentTimeMillis(); - String eventId = event.eventId().toString(); + // 1️⃣ eventId 수집 + List events = new ArrayList<>(batchSize); + List eventIds = new ArrayList<>(batchSize); - if (!dedupService.tryAcquire(eventId)) { - log.info("[SKIP] duplicated eventId={}", eventId); - ack.acknowledge(); - return; + for (ConsumerRecord record : records) { + try { + UsageNotificationEvent event = + objectMapper.readValue(record.value(), UsageNotificationEvent.class); + events.add(event); + eventIds.add(event.eventId().toString()); + } catch (Exception e) { + log.warn("[DESERIALIZE FAIL] offset={}", record.offset(), e); } + } - String format = formatter.format(event, LocalDateTime.now()); + // 2️⃣ Redis Lua dedup (단 1회 호출) + List dedupResults = dedupService.tryAcquireBatch(eventIds); - SendNotificationLogger.write(format); + int processed = 0; + int skipped = 0; - ack.acknowledge(); - } catch (Exception e) { - log.error("[CONSUME FAIL]", e); - ack.acknowledge(); // 지금 구조상 스킵이 맞음 + // 3️⃣ 결과 기반 처리 + for (int i = 0; i < events.size(); i++) { + if (!dedupResults.get(i)) { + skipped++; + continue; + } + + UsageNotificationEvent event = events.get(i); + String message = formatter.format(event, LocalDateTime.now(ZoneId.of("Asia/Seoul"))); + + SendNotificationLogger.write(message); + processed++; } + + ack.acknowledge(); + + long elapsedMs = System.currentTimeMillis() - batchStart; + double tps = processed / (elapsedMs / 1000.0); + + log.info( + "[BATCH DONE] thread={}, batchSize={}, processed={}, skipped={}, elapsed={}ms," + + " tps={}", + threadName, + batchSize, + processed, + skipped, + elapsedMs, + String.format("%.0f", tps)); } @SuppressWarnings("unchecked") diff --git a/src/main/java/com/project/notification/consumer/NotificationSendDedupService.java b/src/main/java/com/project/notification/consumer/NotificationSendDedupService.java index ea00c50..882284d 100644 --- a/src/main/java/com/project/notification/consumer/NotificationSendDedupService.java +++ b/src/main/java/com/project/notification/consumer/NotificationSendDedupService.java @@ -1,8 +1,10 @@ package com.project.notification.consumer; import java.time.Duration; +import java.util.List; import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.data.redis.core.script.RedisScript; import org.springframework.stereotype.Service; import lombok.RequiredArgsConstructor; @@ -10,12 +12,23 @@ @Service @RequiredArgsConstructor public class NotificationSendDedupService { + private final StringRedisTemplate redisTemplate; + private static final Duration TTL = Duration.ofDays(7); - public boolean tryAcquire(String eventId) { - Boolean success = - redisTemplate.opsForValue().setIfAbsent("notification:event:" + eventId, "1", TTL); - return Boolean.TRUE.equals(success); + private final RedisScript dedupBatchScript; + + public List tryAcquireBatch(List eventIds) { + + List keys = eventIds.stream().map(id -> "notification:event:" + id).toList(); + + @SuppressWarnings("unchecked") + List results = + (List) + redisTemplate.execute( + dedupBatchScript, keys, String.valueOf(TTL.toSeconds())); + + return results.stream().map(v -> v == 1L).toList(); } } diff --git a/src/main/java/com/project/notification/consumer/SendNotificationLogger.java b/src/main/java/com/project/notification/consumer/SendNotificationLogger.java index 8095bba..f636522 100644 --- a/src/main/java/com/project/notification/consumer/SendNotificationLogger.java +++ b/src/main/java/com/project/notification/consumer/SendNotificationLogger.java @@ -11,7 +11,7 @@ @Slf4j public class SendNotificationLogger { - private static final Path LOG_PATH = Path.of("logs/notification-preview.log"); + private static final Path LOG_PATH = Path.of("logs/notification.log"); public static void write(String content) { try { diff --git a/src/main/resources/redis/dedup_batch.lua b/src/main/resources/redis/dedup_batch.lua new file mode 100644 index 0000000..ba13536 --- /dev/null +++ b/src/main/resources/redis/dedup_batch.lua @@ -0,0 +1,14 @@ +-- redis/dedup_batch.lua +local ttl = tonumber(ARGV[1]) +local results = {} + +for i, key in ipairs(KEYS) do + if redis.call('SETNX', key, 1) == 1 then + redis.call('EXPIRE', key, ttl) + results[i] = 1 + else + results[i] = 0 + end +end + +return results \ No newline at end of file From 502b624daaff6494729c43255ec734b5f701b76a Mon Sep 17 00:00:00 2001 From: andrew Date: Sun, 25 Jan 2026 14:43:31 +0900 Subject: [PATCH 2/4] =?UTF-8?q?UPLUS-134=20fix:=20Kafka=20Config=20?= =?UTF-8?q?=EC=84=A4=EC=A0=95=20=EB=B3=80=EA=B2=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../project/global/config/KafkaConfig.java | 4 ---- .../consumer/NotificationConsumer.java | 5 ++--- src/main/resources/application.yml | 21 +++++++++---------- 3 files changed, 12 insertions(+), 18 deletions(-) diff --git a/src/main/java/com/project/global/config/KafkaConfig.java b/src/main/java/com/project/global/config/KafkaConfig.java index f172b9d..2bb459c 100644 --- a/src/main/java/com/project/global/config/KafkaConfig.java +++ b/src/main/java/com/project/global/config/KafkaConfig.java @@ -20,10 +20,6 @@ public ConcurrentKafkaListenerContainerFactory kafkaListenerCont var factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory); - boolean isWorker = Arrays.asList(env.getActiveProfiles()).contains("notification-worker"); - - factory.setAutoStartup(isWorker); - factory.setBatchListener(true); factory.setConcurrency(6); // 🔥 핵심 factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); diff --git a/src/main/java/com/project/notification/consumer/NotificationConsumer.java b/src/main/java/com/project/notification/consumer/NotificationConsumer.java index a72f1a6..551ea3c 100644 --- a/src/main/java/com/project/notification/consumer/NotificationConsumer.java +++ b/src/main/java/com/project/notification/consumer/NotificationConsumer.java @@ -28,9 +28,8 @@ public class NotificationConsumer { private final UsageNotificationMessageFormatter formatter; @KafkaListener( - topics = "notification-usage", + topics = "noti-tp", containerFactory = "kafkaListenerContainerFactory") - @Profile("notification-worker") public void consume(List> records, Acknowledgment ack) { final String threadName = Thread.currentThread().getName(); @@ -55,7 +54,7 @@ public void consume(List> records, Acknowledgment } } - // 2️⃣ Redis Lua dedup (단 1회 호출) + // redis Lua dedup (단 1회 호출) List dedupResults = dedupService.tryAcquireBatch(eventIds); int processed = 0; diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 9846251..03672d9 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -62,25 +62,24 @@ spring: max.in.flight.requests.per.connection: 5 consumer: - group-id: usage-notification-worker + group-id: usage-noti-worker-v2 auto-offset-reset: earliest enable-auto-commit: false key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer - max-poll-records: 1 - max-poll-interval-ms: 300000 - session-timeout-ms: 10000 - heartbeat-interval-ms: 3000 + max-poll-records: 10000 + fetch.min.bytes: 1048576 # 1MB + fetch.max.wait.ms: 500 - properties: - spring.json.trusted.packages: com.project.rdb.batch.model.dto - - listener: - auto-startup: false + fetch.max.bytes: 52428800 # 50MB + max.partition.fetch.bytes: 10485760 # 10MB + max-poll-interval-ms: 600000 + listener: + auto-startup: true server: port: 8080 @@ -127,4 +126,4 @@ logging: ureca: - secret-key: ${AES_SECRET_KEY:12345678901234567890123456789012} + secret-key: ${AES_SECRET_KEY} From 6eafcc60360a5308edca600516c134757652b403 Mon Sep 17 00:00:00 2001 From: andrew Date: Sun, 25 Jan 2026 14:44:37 +0900 Subject: [PATCH 3/4] =?UTF-8?q?UPLUS-134=20fix:=20spotless=20Check=20?= =?UTF-8?q?=EC=8B=A4=ED=96=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/java/com/project/global/config/KafkaConfig.java | 2 -- .../project/notification/consumer/NotificationConsumer.java | 5 +---- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/src/main/java/com/project/global/config/KafkaConfig.java b/src/main/java/com/project/global/config/KafkaConfig.java index 2bb459c..b27ebd1 100644 --- a/src/main/java/com/project/global/config/KafkaConfig.java +++ b/src/main/java/com/project/global/config/KafkaConfig.java @@ -1,7 +1,5 @@ package com.project.global.config; -import java.util.Arrays; - import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.env.Environment; diff --git a/src/main/java/com/project/notification/consumer/NotificationConsumer.java b/src/main/java/com/project/notification/consumer/NotificationConsumer.java index 551ea3c..017ff2e 100644 --- a/src/main/java/com/project/notification/consumer/NotificationConsumer.java +++ b/src/main/java/com/project/notification/consumer/NotificationConsumer.java @@ -7,7 +7,6 @@ import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.springframework.context.annotation.Profile; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; @@ -27,9 +26,7 @@ public class NotificationConsumer { private final ObjectMapper objectMapper; private final UsageNotificationMessageFormatter formatter; - @KafkaListener( - topics = "noti-tp", - containerFactory = "kafkaListenerContainerFactory") + @KafkaListener(topics = "noti-tp", containerFactory = "kafkaListenerContainerFactory") public void consume(List> records, Acknowledgment ack) { final String threadName = Thread.currentThread().getName(); From 7fdd483cd2ff353a5f914e8b0b4b5aa702da9d99 Mon Sep 17 00:00:00 2001 From: andrew Date: Sun, 25 Jan 2026 14:53:50 +0900 Subject: [PATCH 4/4] =?UTF-8?q?UPLUS-134=20fix:=20topic=20=EC=88=98?= =?UTF-8?q?=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/project/notification/consumer/NotificationConsumer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/project/notification/consumer/NotificationConsumer.java b/src/main/java/com/project/notification/consumer/NotificationConsumer.java index 017ff2e..308c590 100644 --- a/src/main/java/com/project/notification/consumer/NotificationConsumer.java +++ b/src/main/java/com/project/notification/consumer/NotificationConsumer.java @@ -26,7 +26,7 @@ public class NotificationConsumer { private final ObjectMapper objectMapper; private final UsageNotificationMessageFormatter formatter; - @KafkaListener(topics = "noti-tp", containerFactory = "kafkaListenerContainerFactory") + @KafkaListener(topics = "usage-noti", containerFactory = "kafkaListenerContainerFactory") public void consume(List> records, Acknowledgment ack) { final String threadName = Thread.currentThread().getName();