diff --git a/gradlew b/gradlew old mode 100644 new mode 100755 diff --git a/src/main/java/com/project/global/config/KafkaConfig.java b/src/main/java/com/project/global/config/KafkaConfig.java index 522a3e6..b27ebd1 100644 --- a/src/main/java/com/project/global/config/KafkaConfig.java +++ b/src/main/java/com/project/global/config/KafkaConfig.java @@ -1,13 +1,10 @@ package com.project.global.config; -import java.util.Arrays; - import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.env.Environment; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.listener.ContainerProperties; @@ -16,17 +13,13 @@ public class KafkaConfig { @Bean - public KafkaListenerContainerFactory kafkaListenerContainerFactory( + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( ConsumerFactory consumerFactory, Environment env) { - ConcurrentKafkaListenerContainerFactory factory = - new ConcurrentKafkaListenerContainerFactory<>(); - + var factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory); - boolean isWorker = Arrays.asList(env.getActiveProfiles()).contains("notification-worker"); - - factory.setAutoStartup(isWorker); - + factory.setBatchListener(true); + factory.setConcurrency(6); // 🔥 핵심 factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); return factory; diff --git a/src/main/java/com/project/global/config/RedisLuaConfig.java b/src/main/java/com/project/global/config/RedisLuaConfig.java new file mode 100644 index 0000000..02cb743 --- /dev/null +++ b/src/main/java/com/project/global/config/RedisLuaConfig.java @@ -0,0 +1,21 @@ +package com.project.global.config; + +import java.util.List; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.io.ClassPathResource; +import org.springframework.data.redis.core.script.DefaultRedisScript; +import org.springframework.data.redis.core.script.RedisScript; + +@Configuration +public class RedisLuaConfig { + + @Bean + public RedisScript dedupBatchScript() { + DefaultRedisScript script = new DefaultRedisScript<>(); + script.setLocation(new ClassPathResource("redis/dedup_batch.lua")); + script.setResultType(List.class); + return script; + } +} diff --git a/src/main/java/com/project/notification/consumer/NotificationConsumer.java b/src/main/java/com/project/notification/consumer/NotificationConsumer.java index 549af1b..308c590 100644 --- a/src/main/java/com/project/notification/consumer/NotificationConsumer.java +++ b/src/main/java/com/project/notification/consumer/NotificationConsumer.java @@ -1,10 +1,12 @@ package com.project.notification.consumer; import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.springframework.context.annotation.Profile; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; @@ -24,34 +26,65 @@ public class NotificationConsumer { private final ObjectMapper objectMapper; private final UsageNotificationMessageFormatter formatter; - @KafkaListener( - topics = "notification-usage", - containerFactory = "kafkaListenerContainerFactory") - @Profile("notification-worker") - public void consume(ConsumerRecord record, Acknowledgment ack) { - log.info("CONSUME START offset={}, value={}", record.offset(), record.value()); + @KafkaListener(topics = "usage-noti", containerFactory = "kafkaListenerContainerFactory") + public void consume(List> records, Acknowledgment ack) { - try { - UsageNotificationEvent event = - objectMapper.readValue(record.value(), UsageNotificationEvent.class); + final String threadName = Thread.currentThread().getName(); + final int batchSize = records.size(); + + log.info("[BATCH START] thread={}, records={}", threadName, batchSize); + + long batchStart = System.currentTimeMillis(); - String eventId = event.eventId().toString(); + // 1️⃣ eventId 수집 + List events = new ArrayList<>(batchSize); + List eventIds = new ArrayList<>(batchSize); - if (!dedupService.tryAcquire(eventId)) { - log.info("[SKIP] duplicated eventId={}", eventId); - ack.acknowledge(); - return; + for (ConsumerRecord record : records) { + try { + UsageNotificationEvent event = + objectMapper.readValue(record.value(), UsageNotificationEvent.class); + events.add(event); + eventIds.add(event.eventId().toString()); + } catch (Exception e) { + log.warn("[DESERIALIZE FAIL] offset={}", record.offset(), e); } + } - String format = formatter.format(event, LocalDateTime.now()); + // redis Lua dedup (단 1회 호출) + List dedupResults = dedupService.tryAcquireBatch(eventIds); - SendNotificationLogger.write(format); + int processed = 0; + int skipped = 0; - ack.acknowledge(); - } catch (Exception e) { - log.error("[CONSUME FAIL]", e); - ack.acknowledge(); // 지금 구조상 스킵이 맞음 + // 3️⃣ 결과 기반 처리 + for (int i = 0; i < events.size(); i++) { + if (!dedupResults.get(i)) { + skipped++; + continue; + } + + UsageNotificationEvent event = events.get(i); + String message = formatter.format(event, LocalDateTime.now(ZoneId.of("Asia/Seoul"))); + + SendNotificationLogger.write(message); + processed++; } + + ack.acknowledge(); + + long elapsedMs = System.currentTimeMillis() - batchStart; + double tps = processed / (elapsedMs / 1000.0); + + log.info( + "[BATCH DONE] thread={}, batchSize={}, processed={}, skipped={}, elapsed={}ms," + + " tps={}", + threadName, + batchSize, + processed, + skipped, + elapsedMs, + String.format("%.0f", tps)); } @SuppressWarnings("unchecked") diff --git a/src/main/java/com/project/notification/consumer/NotificationSendDedupService.java b/src/main/java/com/project/notification/consumer/NotificationSendDedupService.java index ea00c50..882284d 100644 --- a/src/main/java/com/project/notification/consumer/NotificationSendDedupService.java +++ b/src/main/java/com/project/notification/consumer/NotificationSendDedupService.java @@ -1,8 +1,10 @@ package com.project.notification.consumer; import java.time.Duration; +import java.util.List; import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.data.redis.core.script.RedisScript; import org.springframework.stereotype.Service; import lombok.RequiredArgsConstructor; @@ -10,12 +12,23 @@ @Service @RequiredArgsConstructor public class NotificationSendDedupService { + private final StringRedisTemplate redisTemplate; + private static final Duration TTL = Duration.ofDays(7); - public boolean tryAcquire(String eventId) { - Boolean success = - redisTemplate.opsForValue().setIfAbsent("notification:event:" + eventId, "1", TTL); - return Boolean.TRUE.equals(success); + private final RedisScript dedupBatchScript; + + public List tryAcquireBatch(List eventIds) { + + List keys = eventIds.stream().map(id -> "notification:event:" + id).toList(); + + @SuppressWarnings("unchecked") + List results = + (List) + redisTemplate.execute( + dedupBatchScript, keys, String.valueOf(TTL.toSeconds())); + + return results.stream().map(v -> v == 1L).toList(); } } diff --git a/src/main/java/com/project/notification/consumer/SendNotificationLogger.java b/src/main/java/com/project/notification/consumer/SendNotificationLogger.java index 8095bba..f636522 100644 --- a/src/main/java/com/project/notification/consumer/SendNotificationLogger.java +++ b/src/main/java/com/project/notification/consumer/SendNotificationLogger.java @@ -11,7 +11,7 @@ @Slf4j public class SendNotificationLogger { - private static final Path LOG_PATH = Path.of("logs/notification-preview.log"); + private static final Path LOG_PATH = Path.of("logs/notification.log"); public static void write(String content) { try { diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index b222242..718475e 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,19 +1,16 @@ project: - name: API Message + name: Backend API version: 1.0.0 cors: allowed-origins: ${FRONTEND_URL:http://localhost:3000} spring: - profiles: - include: secret - application: name: api-message datasource: - url: jdbc:postgresql://${POSTGRES_HOST:localhost}:${POSTGRES_PORT:5433}/${POSTGRES_DATABASE:app} + url: jdbc:postgresql://${POSTGRES_HOST:localhost}:${POSTGRES_PORT:5432}/${POSTGRES_DATABASE:app} username: ${POSTGRES_USER:postgres} password: ${POSTGRES_PASSWORD:postgres} driver-class-name: org.postgresql.Driver @@ -73,25 +70,24 @@ spring: max.in.flight.requests.per.connection: 5 consumer: - group-id: usage-notification-worker + group-id: usage-noti-worker auto-offset-reset: earliest enable-auto-commit: false key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer - max-poll-records: 1 - max-poll-interval-ms: 300000 - session-timeout-ms: 10000 - heartbeat-interval-ms: 3000 - - properties: - spring.json.trusted.packages: com.project.rdb.batch.model.dto + max-poll-records: 10000 + fetch.min.bytes: 1048576 # 1MB + fetch.max.wait.ms: 500 - listener: - auto-startup: false + fetch.max.bytes: 52428800 # 50MB + max.partition.fetch.bytes: 10485760 # 10MB + max-poll-interval-ms: 600000 + listener: + auto-startup: true server: port: 8080 @@ -147,3 +143,7 @@ logging: org.hibernate.SQL: WARN pattern: level: "%5p [${spring.application.name:},%X{traceId:-},%X{spanId:-}]" + + +ureca: + secret-key: ${AES_SECRET_KEY} diff --git a/src/main/resources/redis/dedup_batch.lua b/src/main/resources/redis/dedup_batch.lua new file mode 100644 index 0000000..ba13536 --- /dev/null +++ b/src/main/resources/redis/dedup_batch.lua @@ -0,0 +1,14 @@ +-- redis/dedup_batch.lua +local ttl = tonumber(ARGV[1]) +local results = {} + +for i, key in ipairs(KEYS) do + if redis.call('SETNX', key, 1) == 1 then + redis.call('EXPIRE', key, ttl) + results[i] = 1 + else + results[i] = 0 + end +end + +return results \ No newline at end of file