From e435eba86d40b53cd3b4025ddaff5cb10e964039 Mon Sep 17 00:00:00 2001 From: andrew Date: Fri, 23 Jan 2026 11:41:48 +0900 Subject: [PATCH 1/2] =?UTF-8?q?UPLUS-134=20feat:=20=EC=95=8C=EB=A6=BC=20?= =?UTF-8?q?=EC=9D=B4=EB=B2=A4=ED=8A=B8=20Consumer=20=EA=B8=B0=EB=8A=A5=20?= =?UTF-8?q?=EA=B5=AC=ED=98=84(=EC=B4=88=EC=95=88)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- logs/notification-preview.log | 0 .../project/global/config/KafkaConfig.java | 13 +++++- .../consumer/NotificationConsumer.java | 45 ++++++++----------- .../NotificationSendDedupService.java | 21 +++++++++ .../consumer/SendNotificationLogger.java | 37 +++++++++++++++ .../consumer/UsageNotificationEvent.java | 14 ++++++ .../UsageNotificationMessageFormatter.java | 37 +++++++++++++++ 7 files changed, 138 insertions(+), 29 deletions(-) create mode 100644 logs/notification-preview.log create mode 100644 src/main/java/com/project/notification/consumer/NotificationSendDedupService.java create mode 100644 src/main/java/com/project/notification/consumer/SendNotificationLogger.java create mode 100644 src/main/java/com/project/notification/consumer/UsageNotificationEvent.java create mode 100644 src/main/java/com/project/notification/consumer/UsageNotificationMessageFormatter.java diff --git a/logs/notification-preview.log b/logs/notification-preview.log new file mode 100644 index 0000000..e69de29 diff --git a/src/main/java/com/project/global/config/KafkaConfig.java b/src/main/java/com/project/global/config/KafkaConfig.java index babf028..522a3e6 100644 --- a/src/main/java/com/project/global/config/KafkaConfig.java +++ b/src/main/java/com/project/global/config/KafkaConfig.java @@ -1,9 +1,13 @@ package com.project.global.config; +import java.util.Arrays; + import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.core.env.Environment; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.listener.ContainerProperties; @@ -12,12 +16,17 @@ public class KafkaConfig { @Bean - public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( - ConsumerFactory consumerFactory) { + public KafkaListenerContainerFactory kafkaListenerContainerFactory( + ConsumerFactory consumerFactory, Environment env) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); + + boolean isWorker = Arrays.asList(env.getActiveProfiles()).contains("notification-worker"); + + factory.setAutoStartup(isWorker); + factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); return factory; diff --git a/src/main/java/com/project/notification/consumer/NotificationConsumer.java b/src/main/java/com/project/notification/consumer/NotificationConsumer.java index 764d74b..549af1b 100644 --- a/src/main/java/com/project/notification/consumer/NotificationConsumer.java +++ b/src/main/java/com/project/notification/consumer/NotificationConsumer.java @@ -1,18 +1,16 @@ package com.project.notification.consumer; +import java.time.LocalDateTime; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Profile; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.project.notification.dto.NotificationRequestEvent; -import com.project.notification.service.NotificationService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -22,44 +20,37 @@ @RequiredArgsConstructor public class NotificationConsumer { - private final NotificationService notificationService; + private final NotificationSendDedupService dedupService; private final ObjectMapper objectMapper; - - @Value("${notification.consumer.concurrency:1}") - private int concurrency; + private final UsageNotificationMessageFormatter formatter; @KafkaListener( - id = "notification-consumer", - topics = "notification_topic", - groupId = "notification-consumer-group", + topics = "notification-usage", containerFactory = "kafkaListenerContainerFactory") + @Profile("notification-worker") public void consume(ConsumerRecord record, Acknowledgment ack) { - log.info( - "Received notification message. topic: {}, partition: {}, offset: {}", - record.topic(), - record.partition(), - record.offset()); + log.info("CONSUME START offset={}, value={}", record.offset(), record.value()); try { - Map rawPayload = - objectMapper.readValue(record.value(), new TypeReference<>() {}); + UsageNotificationEvent event = + objectMapper.readValue(record.value(), UsageNotificationEvent.class); - NotificationRequestEvent event = parseEvent(rawPayload); + String eventId = event.eventId().toString(); - if (event == null) { - log.error("Failed to parse notification event. value: {}", record.value()); + if (!dedupService.tryAcquire(eventId)) { + log.info("[SKIP] duplicated eventId={}", eventId); ack.acknowledge(); return; } - notificationService.processNotification(event, rawPayload); + String format = formatter.format(event, LocalDateTime.now()); + + SendNotificationLogger.write(format); - } catch (JsonProcessingException e) { - log.error("Failed to deserialize message. value: {}", record.value(), e); - } catch (Exception e) { - log.error("Failed to process notification. value: {}", record.value(), e); - } finally { ack.acknowledge(); + } catch (Exception e) { + log.error("[CONSUME FAIL]", e); + ack.acknowledge(); // 지금 구조상 스킵이 맞음 } } diff --git a/src/main/java/com/project/notification/consumer/NotificationSendDedupService.java b/src/main/java/com/project/notification/consumer/NotificationSendDedupService.java new file mode 100644 index 0000000..ea00c50 --- /dev/null +++ b/src/main/java/com/project/notification/consumer/NotificationSendDedupService.java @@ -0,0 +1,21 @@ +package com.project.notification.consumer; + +import java.time.Duration; + +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Service; + +import lombok.RequiredArgsConstructor; + +@Service +@RequiredArgsConstructor +public class NotificationSendDedupService { + private final StringRedisTemplate redisTemplate; + private static final Duration TTL = Duration.ofDays(7); + + public boolean tryAcquire(String eventId) { + Boolean success = + redisTemplate.opsForValue().setIfAbsent("notification:event:" + eventId, "1", TTL); + return Boolean.TRUE.equals(success); + } +} diff --git a/src/main/java/com/project/notification/consumer/SendNotificationLogger.java b/src/main/java/com/project/notification/consumer/SendNotificationLogger.java new file mode 100644 index 0000000..8095bba --- /dev/null +++ b/src/main/java/com/project/notification/consumer/SendNotificationLogger.java @@ -0,0 +1,37 @@ +package com.project.notification.consumer; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.time.LocalDateTime; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class SendNotificationLogger { + + private static final Path LOG_PATH = Path.of("logs/notification-preview.log"); + + public static void write(String content) { + try { + Files.createDirectories(LOG_PATH.getParent()); + + Files.writeString( + LOG_PATH, + """ + =============================== + %s + %s + =============================== + + """ + .formatted(LocalDateTime.now(), content), + StandardOpenOption.CREATE, + StandardOpenOption.WRITE, + StandardOpenOption.APPEND); + } catch (IOException e) { + log.error("Failed to write notification preview file", e); + } + } +} diff --git a/src/main/java/com/project/notification/consumer/UsageNotificationEvent.java b/src/main/java/com/project/notification/consumer/UsageNotificationEvent.java new file mode 100644 index 0000000..485f2eb --- /dev/null +++ b/src/main/java/com/project/notification/consumer/UsageNotificationEvent.java @@ -0,0 +1,14 @@ +package com.project.notification.consumer; + +import java.util.UUID; + +public record UsageNotificationEvent( + UUID eventId, + Long id, + Long subId, + String period, + String unit, + int threshold, + int percent, + long totalUsedMb, + long allotmentMb) {} diff --git a/src/main/java/com/project/notification/consumer/UsageNotificationMessageFormatter.java b/src/main/java/com/project/notification/consumer/UsageNotificationMessageFormatter.java new file mode 100644 index 0000000..ccbbb00 --- /dev/null +++ b/src/main/java/com/project/notification/consumer/UsageNotificationMessageFormatter.java @@ -0,0 +1,37 @@ +package com.project.notification.consumer; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; + +import org.springframework.stereotype.Component; + +@Component +public class UsageNotificationMessageFormatter { + + public String format(UsageNotificationEvent event, LocalDateTime now) { + + String providedGb = formatGb(event.allotmentMb()); + String usedGb = formatGb(event.totalUsedMb()); + String time = now.format(DateTimeFormatter.ofPattern("MM/dd HH:mm:ss")); + + return """ + [LG U+] + [Web발신] + [LG U+] 이번 달 데이터 사용량 안내 + + 고객님, 「유쓰 5G 데이터 플러스」 + 요금제의 기본 데이터 사용량을 안내해 드립니다. + + ▶ 데이터 사용량 안내 + - 제공량: %sGB + - 사용량: %d%% %sGB + ※ %s 기준 + + """ + .formatted(providedGb, event.percent(), usedGb, time); + } + + private String formatGb(long mb) { + return String.format("%.2f", mb / 1024.0); + } +} From e97b37c2537bcd8d6f62208bd9f1a0670734340c Mon Sep 17 00:00:00 2001 From: andrew Date: Fri, 23 Jan 2026 11:42:55 +0900 Subject: [PATCH 2/2] =?UTF-8?q?UPLUS-134=20feat:=20=EC=84=A4=EC=A0=95?= =?UTF-8?q?=ED=8C=8C=EC=9D=BC=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/resources/application.yml | 67 +++++++++++++++++++++++------- 1 file changed, 52 insertions(+), 15 deletions(-) diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index adcadbc..9846251 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -9,25 +9,19 @@ spring: application: name: api-message - kafka: - bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092} - consumer: - group-id: notification-consumer-group - auto-offset-reset: earliest - enable-auto-commit: false - max-poll-records: 1 - key-deserializer: org.apache.kafka.common.serialization.StringDeserializer - value-deserializer: org.apache.kafka.common.serialization.StringDeserializer - datasource: url: jdbc:postgresql://${POSTGRES_HOST:localhost}:${POSTGRES_PORT:5432}/${POSTGRES_DATABASE:app} username: ${POSTGRES_USER:postgres} password: ${POSTGRES_PASSWORD:postgres} driver-class-name: org.postgresql.Driver + batch: + job: + enabled: false + jpa: hibernate: - ddl-auto: update + ddl-auto: none show-sql: true database-platform: org.hibernate.dialect.PostgreSQLDialect properties: @@ -42,6 +36,52 @@ spring: redis: url: redis://${REDIS_USERNAME:default}:${REDIS_PASSWORD:}@${REDIS_ENDPOINT:localhost:6379} + kafka: + bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS} + + properties: + security.protocol: SASL_SSL + sasl.mechanism: PLAIN + sasl.jaas.config: > + org.apache.kafka.common.security.plain.PlainLoginModule required + username='${KAFKA_API_KEY}' + password='${KAFKA_API_SECRET}'; + + producer: + retries: 2147483647 + batch-size: 65536 + compression-type: lz4 + buffer-memory: 67108864 + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.apache.kafka.common.serialization.StringSerializer + + properties: + enable-idempotence: true + linger.ms: 20 + compression.type: lz4 + max.in.flight.requests.per.connection: 5 + + consumer: + group-id: usage-notification-worker + auto-offset-reset: earliest + enable-auto-commit: false + + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.apache.kafka.common.serialization.StringDeserializer + + max-poll-records: 1 + max-poll-interval-ms: 300000 + session-timeout-ms: 10000 + heartbeat-interval-ms: 3000 + + properties: + spring.json.trusted.packages: com.project.rdb.batch.model.dto + + listener: + auto-startup: false + + + server: port: 8080 @@ -85,9 +125,6 @@ logging: org.springframework.web: INFO org.hibernate.SQL: WARN + ureca: secret-key: ${AES_SECRET_KEY:12345678901234567890123456789012} - -notification: - consumer: - concurrency: ${CONSUMER_CONCURRENCY:1}