Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file modified gradlew
100644 → 100755
Empty file.
15 changes: 4 additions & 11 deletions src/main/java/com/project/global/config/KafkaConfig.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
package com.project.global.config;

import java.util.Arrays;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;

Expand All @@ -16,17 +13,13 @@
public class KafkaConfig {

@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory(
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
ConsumerFactory<String, String> consumerFactory, Environment env) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();

var factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(consumerFactory);

boolean isWorker = Arrays.asList(env.getActiveProfiles()).contains("notification-worker");

factory.setAutoStartup(isWorker);

factory.setBatchListener(true);
factory.setConcurrency(6); // 🔥 핵심
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Kafka 컨슈머의 동시성(concurrency)을 6으로 하드코딩하셨습니다. 이 값은 실행 환경에 따라 튜닝이 필요할 수 있으므로, application.yml 파일에서 설정으로 관리하는 것이 유지보수 측면에서 더 좋습니다. Environment 객체를 사용하여 프로퍼티 값을 주입받는 것을 권장합니다. 예를 들어 application.ymlspring.kafka.listener.concurrency: 6과 같이 설정할 수 있습니다.

Suggested change
factory.setConcurrency(6); // 🔥 핵심
factory.setConcurrency(env.getProperty("spring.kafka.listener.concurrency", Integer.class, 6)); // 🔥 핵심

factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);

return factory;
Expand Down
21 changes: 21 additions & 0 deletions src/main/java/com/project/global/config/RedisLuaConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.project.global.config;

import java.util.List;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;

@Configuration
public class RedisLuaConfig {

@Bean
public RedisScript<List> dedupBatchScript() {
DefaultRedisScript<List> script = new DefaultRedisScript<>();
Comment on lines +15 to +16
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

RedisScriptDefaultRedisScript에 raw type인 List를 사용하고 있습니다. NotificationSendDedupService에서 List<Long>으로 캐스팅하는 것을 고려할 때, RedisScript<List<Long>>과 같이 타입을 명시해주는 것이 타입 안정성을 높이고 코드 가독성을 향상시킵니다.

Suggested change
public RedisScript<List> dedupBatchScript() {
DefaultRedisScript<List> script = new DefaultRedisScript<>();
public RedisScript<List<Long>> dedupBatchScript() {
DefaultRedisScript<List<Long>> script = new DefaultRedisScript<>();

script.setLocation(new ClassPathResource("redis/dedup_batch.lua"));
script.setResultType(List.class);
return script;
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package com.project.notification.consumer;

import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.context.annotation.Profile;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
Expand All @@ -24,34 +26,65 @@ public class NotificationConsumer {
private final ObjectMapper objectMapper;
private final UsageNotificationMessageFormatter formatter;

@KafkaListener(
topics = "notification-usage",
containerFactory = "kafkaListenerContainerFactory")
@Profile("notification-worker")
public void consume(ConsumerRecord<String, String> record, Acknowledgment ack) {
log.info("CONSUME START offset={}, value={}", record.offset(), record.value());
@KafkaListener(topics = "usage-noti", containerFactory = "kafkaListenerContainerFactory")
public void consume(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {

try {
UsageNotificationEvent event =
objectMapper.readValue(record.value(), UsageNotificationEvent.class);
final String threadName = Thread.currentThread().getName();
final int batchSize = records.size();

log.info("[BATCH START] thread={}, records={}", threadName, batchSize);

long batchStart = System.currentTimeMillis();

String eventId = event.eventId().toString();
// 1️⃣ eventId 수집
List<UsageNotificationEvent> events = new ArrayList<>(batchSize);
List<String> eventIds = new ArrayList<>(batchSize);

if (!dedupService.tryAcquire(eventId)) {
log.info("[SKIP] duplicated eventId={}", eventId);
ack.acknowledge();
return;
for (ConsumerRecord<String, String> record : records) {
try {
UsageNotificationEvent event =
objectMapper.readValue(record.value(), UsageNotificationEvent.class);
events.add(event);
eventIds.add(event.eventId().toString());
} catch (Exception e) {
log.warn("[DESERIALIZE FAIL] offset={}", record.offset(), e);
}
Comment on lines +49 to 51
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

메시지 역직렬화에 실패했을 때 log.warn으로 기록만 하고 넘어간 뒤, 메소드 마지막에서 ack.acknowledge()를 호출하고 있습니다. 이 경우 역직렬화에 실패한 메시지는 처리되지 않고 유실될 수 있습니다. 데이터 유실은 심각한 문제로 이어질 수 있으므로, 실패한 메시지를 별도의 Dead Letter Queue(DLQ)로 보내거나, 배치를 재처리하는 등의 강력한 에러 처리 전략을 도입해야 합니다. 최소한 실패한 메시지의 원본 값이라도 로그로 남겨 추후 복구를 용이하게 하는 것을 고려해주세요.

}

String format = formatter.format(event, LocalDateTime.now());
// redis Lua dedup (단 1회 호출)
List<Boolean> dedupResults = dedupService.tryAcquireBatch(eventIds);

SendNotificationLogger.write(format);
int processed = 0;
int skipped = 0;

ack.acknowledge();
} catch (Exception e) {
log.error("[CONSUME FAIL]", e);
ack.acknowledge(); // 지금 구조상 스킵이 맞음
// 3️⃣ 결과 기반 처리
for (int i = 0; i < events.size(); i++) {
if (!dedupResults.get(i)) {
skipped++;
continue;
}

UsageNotificationEvent event = events.get(i);
String message = formatter.format(event, LocalDateTime.now(ZoneId.of("Asia/Seoul")));
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

ZoneId.of("Asia/Seoul")과 같이 타임존을 하드코딩하면 다른 시간대에서 애플리케이션을 실행해야 할 때 문제가 될 수 있습니다. 타임존 정보는 application.yml과 같은 설정 파일로 분리하여 유연성을 확보하는 것이 좋습니다.


SendNotificationLogger.write(message);
processed++;
}

ack.acknowledge();

long elapsedMs = System.currentTimeMillis() - batchStart;
double tps = processed / (elapsedMs / 1000.0);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

TPS를 계산하는 과정에서 elapsedMs가 0이 될 경우 Division by zero 예외가 발생할 수 있습니다. 배치 처리가 매우 빠르게 완료될 경우 가능한 시나리오입니다. elapsedMs가 0인지 확인하는 방어 코드를 추가하는 것이 안전합니다.

Suggested change
double tps = processed / (elapsedMs / 1000.0);
double tps = (elapsedMs > 0) ? (double) processed / (elapsedMs / 1000.0) : 0.0;


log.info(
"[BATCH DONE] thread={}, batchSize={}, processed={}, skipped={}, elapsed={}ms,"
+ " tps={}",
threadName,
batchSize,
processed,
skipped,
elapsedMs,
String.format("%.0f", tps));
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,34 @@
package com.project.notification.consumer;

import java.time.Duration;
import java.util.List;

import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Service;

import lombok.RequiredArgsConstructor;

@Service
@RequiredArgsConstructor
public class NotificationSendDedupService {

private final StringRedisTemplate redisTemplate;

private static final Duration TTL = Duration.ofDays(7);

public boolean tryAcquire(String eventId) {
Boolean success =
redisTemplate.opsForValue().setIfAbsent("notification:event:" + eventId, "1", TTL);
return Boolean.TRUE.equals(success);
private final RedisScript<List> dedupBatchScript;

public List<Boolean> tryAcquireBatch(List<String> eventIds) {

List<String> keys = eventIds.stream().map(id -> "notification:event:" + id).toList();

@SuppressWarnings("unchecked")
List<Long> results =
(List<Long>)
redisTemplate.execute(
dedupBatchScript, keys, String.valueOf(TTL.toSeconds()));

return results.stream().map(v -> v == 1L).toList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
@Slf4j
public class SendNotificationLogger {

private static final Path LOG_PATH = Path.of("logs/notification-preview.log");
private static final Path LOG_PATH = Path.of("logs/notification.log");

public static void write(String content) {
try {
Expand Down
30 changes: 15 additions & 15 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
project:
name: API Message
name: Backend API
version: 1.0.0

cors:
allowed-origins: ${FRONTEND_URL:http://localhost:3000}

spring:
profiles:
include: secret

application:
name: api-message

datasource:
url: jdbc:postgresql://${POSTGRES_HOST:localhost}:${POSTGRES_PORT:5433}/${POSTGRES_DATABASE:app}
url: jdbc:postgresql://${POSTGRES_HOST:localhost}:${POSTGRES_PORT:5432}/${POSTGRES_DATABASE:app}
username: ${POSTGRES_USER:postgres}
password: ${POSTGRES_PASSWORD:postgres}
driver-class-name: org.postgresql.Driver
Expand Down Expand Up @@ -73,25 +70,24 @@ spring:
max.in.flight.requests.per.connection: 5

consumer:
group-id: usage-notification-worker
group-id: usage-noti-worker
auto-offset-reset: earliest
enable-auto-commit: false

key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

max-poll-records: 1
max-poll-interval-ms: 300000
session-timeout-ms: 10000
heartbeat-interval-ms: 3000

properties:
spring.json.trusted.packages: com.project.rdb.batch.model.dto
max-poll-records: 10000
fetch.min.bytes: 1048576 # 1MB
fetch.max.wait.ms: 500

listener:
auto-startup: false
fetch.max.bytes: 52428800 # 50MB
max.partition.fetch.bytes: 10485760 # 10MB

max-poll-interval-ms: 600000

listener:
auto-startup: true

server:
port: 8080
Expand Down Expand Up @@ -147,3 +143,7 @@ logging:
org.hibernate.SQL: WARN
pattern:
level: "%5p [${spring.application.name:},%X{traceId:-},%X{spanId:-}]"


ureca:
secret-key: ${AES_SECRET_KEY}
14 changes: 14 additions & 0 deletions src/main/resources/redis/dedup_batch.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
-- redis/dedup_batch.lua
local ttl = tonumber(ARGV[1])
local results = {}

for i, key in ipairs(KEYS) do
if redis.call('SETNX', key, 1) == 1 then
redis.call('EXPIRE', key, ttl)
results[i] = 1
else
results[i] = 0
end
Comment on lines +6 to +11
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

SETNXEXPIRE를 순차적으로 호출하고 있습니다. SETNX 성공 후 EXPIRE가 실행되기 전에 Redis 서버가 다운되거나 스크립트 실행이 중단되면, TTL이 설정되지 않은 키가 영구적으로 남게 될 수 있습니다. 이는 의도치 않은 중복 제거 및 메모리 누수로 이어질 수 있습니다. Redis 2.6.12 버전부터 지원되는 SET key value EX seconds NX 명령어를 사용하면 SETNXEXPIRE를 원자적으로 실행할 수 있어 이 문제를 해결할 수 있습니다.

	if redis.call('SET', key, 1, 'EX', ttl, 'NX') then
		results[i] = 1
	else
		results[i] = 0
	end

end

return results
Loading