From 24bc54559f575aa2768b87b14dabf6b0a1eccb8e Mon Sep 17 00:00:00 2001 From: andrew Date: Wed, 21 Jan 2026 17:34:32 +0900 Subject: [PATCH 01/19] =?UTF-8?q?UPLUS-105=20fix=20:=20=EB=B6=88=ED=95=84?= =?UTF-8?q?=EC=9A=94=ED=95=9C=20=EB=94=94=EB=A0=89=ED=86=A0=EB=A6=AC=20?= =?UTF-8?q?=EB=B0=8F=20=ED=8C=8C=EC=9D=BC=20=EC=82=AD=EC=A0=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../global/config/NotificationConsumer.java | 16 ------ .../test/InitSubscriptionPlanRunner.java | 50 ------------------- 2 files changed, 66 deletions(-) delete mode 100644 src/main/java/com/project/global/config/NotificationConsumer.java delete mode 100644 src/main/java/com/project/producer/test/InitSubscriptionPlanRunner.java diff --git a/src/main/java/com/project/global/config/NotificationConsumer.java b/src/main/java/com/project/global/config/NotificationConsumer.java deleted file mode 100644 index 96a5fff..0000000 --- a/src/main/java/com/project/global/config/NotificationConsumer.java +++ /dev/null @@ -1,16 +0,0 @@ -package com.project.global.config; - -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.stereotype.Component; - -@Component -public class NotificationConsumer { - - @KafkaListener( - id = "notificationConsumer", - topics = "notification_topic", - groupId = "notification-consumer-test-1") - public void consume(String message) { - System.out.println("notification consumer received data : " + message); - } -} diff --git a/src/main/java/com/project/producer/test/InitSubscriptionPlanRunner.java b/src/main/java/com/project/producer/test/InitSubscriptionPlanRunner.java deleted file mode 100644 index b5353d3..0000000 --- a/src/main/java/com/project/producer/test/InitSubscriptionPlanRunner.java +++ /dev/null @@ -1,50 +0,0 @@ -package com.project.producer.test; - -import java.time.OffsetDateTime; -import java.util.concurrent.ThreadLocalRandom; - -import org.springframework.boot.CommandLineRunner; -import org.springframework.stereotype.Component; - -import com.project.producer.PlanChangeProducer; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; - -@Slf4j -@Component -@RequiredArgsConstructor -public class InitSubscriptionPlanRunner implements CommandLineRunner { - - private final PlanChangeProducer planChangeProducer; - - private static final int SUB_START = 1; - private static final int SUB_END = 10_000; - - @Override - public void run(String... args) { - - OffsetDateTime baseTime = - OffsetDateTime.now().withDayOfMonth(1).withHour(0).withMinute(0).withSecond(0); - - PlanSeed[] plans = PlanSeed.values(); - ThreadLocalRandom random = ThreadLocalRandom.current(); - - for (long subId = SUB_START; subId <= SUB_END; subId++) { - - PlanSeed plan = plans[random.nextInt(plans.length)]; - - planChangeProducer.sendPlanChangeEvent( - subId, - plan.getUnit(), - plan.getAllowance(), - baseTime, - "user" + subId + "@test.com", - "010-" - + String.format( - "%04d-%04d", random.nextInt(10000), random.nextInt(10000))); - } - - log.info("Initial Plan Seeding Completed (1 ~ 10000)"); - } -} From 7fefbf06d2725c7f78a2eddee81cbeb218a848fb Mon Sep 17 00:00:00 2001 From: andrew Date: Wed, 21 Jan 2026 17:38:28 +0900 Subject: [PATCH 02/19] =?UTF-8?q?UPLUS-105=20fix=20:=20=EB=94=94=EB=A0=89?= =?UTF-8?q?=ED=86=A0=EB=A6=AC=20=EB=B0=8F=20=ED=8C=8C=EC=9D=BC=20=EA=B5=AC?= =?UTF-8?q?=EC=A1=B0=20=EC=9E=AC=EC=A0=95=EC=9D=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/project/global/config/RedisConfig.java | 2 ++ .../{ => redis}/consumer/PlanChangeConsumer.java | 13 +++++++------ .../project/{ => redis}/consumer/UsageConsumer.java | 12 +++++++----- .../{ => redis}/consumer/util/LuaScriptLoader.java | 2 +- .../{ => redis}/consumer/util/PlanChangeUtil.java | 10 +++++----- .../{ => redis}/consumer/util/RedisUtil.java | 6 +++--- .../{ => redis}/consumer/util/UsageTimeUtil.java | 2 +- .../{ => redis}/producer/NotificationProducer.java | 2 +- .../{ => redis}/producer/PlanChangeProducer.java | 7 ++++--- .../project/{ => redis}/producer/UsageProducer.java | 5 +++-- .../producer/schema/CalculatedLimitSchema.java | 4 ++-- .../producer/schema/PlanChangeSchema.java | 4 ++-- .../producer/schema/UsageEventSchema.java | 2 +- .../project/{ => redis}/producer/test/PlanSeed.java | 2 +- .../project/{ => redis}/producer/test/PlanUnit.java | 2 +- 15 files changed, 41 insertions(+), 34 deletions(-) rename src/main/java/com/project/{ => redis}/consumer/PlanChangeConsumer.java (81%) rename src/main/java/com/project/{ => redis}/consumer/UsageConsumer.java (85%) rename src/main/java/com/project/{ => redis}/consumer/util/LuaScriptLoader.java (95%) rename src/main/java/com/project/{ => redis}/consumer/util/PlanChangeUtil.java (92%) rename src/main/java/com/project/{ => redis}/consumer/util/RedisUtil.java (95%) rename src/main/java/com/project/{ => redis}/consumer/util/UsageTimeUtil.java (95%) rename src/main/java/com/project/{ => redis}/producer/NotificationProducer.java (91%) rename src/main/java/com/project/{ => redis}/producer/PlanChangeProducer.java (88%) rename src/main/java/com/project/{ => redis}/producer/UsageProducer.java (90%) rename src/main/java/com/project/{ => redis}/producer/schema/CalculatedLimitSchema.java (58%) rename src/main/java/com/project/{ => redis}/producer/schema/PlanChangeSchema.java (82%) rename src/main/java/com/project/{ => redis}/producer/schema/UsageEventSchema.java (80%) rename src/main/java/com/project/{ => redis}/producer/test/PlanSeed.java (92%) rename src/main/java/com/project/{ => redis}/producer/test/PlanUnit.java (59%) diff --git a/src/main/java/com/project/global/config/RedisConfig.java b/src/main/java/com/project/global/config/RedisConfig.java index 3dd7f07..1a31131 100644 --- a/src/main/java/com/project/global/config/RedisConfig.java +++ b/src/main/java/com/project/global/config/RedisConfig.java @@ -2,11 +2,13 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Profile; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; +@Profile("!batch") @Configuration public class RedisConfig { diff --git a/src/main/java/com/project/consumer/PlanChangeConsumer.java b/src/main/java/com/project/redis/consumer/PlanChangeConsumer.java similarity index 81% rename from src/main/java/com/project/consumer/PlanChangeConsumer.java rename to src/main/java/com/project/redis/consumer/PlanChangeConsumer.java index 8051414..5c34656 100644 --- a/src/main/java/com/project/consumer/PlanChangeConsumer.java +++ b/src/main/java/com/project/redis/consumer/PlanChangeConsumer.java @@ -1,4 +1,4 @@ -package com.project.consumer; +package com.project.redis.consumer; import java.util.ArrayList; import java.util.List; @@ -9,12 +9,12 @@ import org.springframework.stereotype.Component; import com.fasterxml.jackson.databind.ObjectMapper; -import com.project.consumer.util.PlanChangeUtil; -import com.project.consumer.util.RedisUtil; +import com.project.redis.consumer.util.PlanChangeUtil; +import com.project.redis.consumer.util.RedisUtil; import com.project.global.exception.ApplicationException; import com.project.global.exception.code.domain.GlobalErrorCode; -import com.project.producer.schema.CalculatedLimitSchema; -import com.project.producer.schema.PlanChangeSchema; +import com.project.redis.producer.schema.CalculatedLimitSchema; +import com.project.redis.producer.schema.PlanChangeSchema; import lombok.RequiredArgsConstructor; @@ -30,7 +30,8 @@ public class PlanChangeConsumer { id = "plan-change-consumer", topics = "change_plan", groupId = "plan-change-consumer", - containerFactory = "batchKafkaListenerContainerFactory") + containerFactory = "kafkaListenerContainerFactory", + autoStartup = "false") public void consume(List> records, Acknowledgment ack) { if (records == null || records.isEmpty()) { ack.acknowledge(); diff --git a/src/main/java/com/project/consumer/UsageConsumer.java b/src/main/java/com/project/redis/consumer/UsageConsumer.java similarity index 85% rename from src/main/java/com/project/consumer/UsageConsumer.java rename to src/main/java/com/project/redis/consumer/UsageConsumer.java index f8291e0..fee4710 100644 --- a/src/main/java/com/project/consumer/UsageConsumer.java +++ b/src/main/java/com/project/redis/consumer/UsageConsumer.java @@ -1,19 +1,20 @@ -package com.project.consumer; +package com.project.redis.consumer; import java.util.ArrayList; import java.util.List; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.context.annotation.Profile; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; import com.fasterxml.jackson.databind.ObjectMapper; -import com.project.consumer.util.RedisUtil; +import com.project.redis.consumer.util.RedisUtil; import com.project.global.exception.ApplicationException; import com.project.global.exception.code.domain.GlobalErrorCode; -import com.project.producer.NotificationProducer; -import com.project.producer.schema.UsageEventSchema; +import com.project.redis.producer.NotificationProducer; +import com.project.redis.producer.schema.UsageEventSchema; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -31,7 +32,8 @@ public class UsageConsumer { id = "usage-batch-consumer", topics = "usage-data", groupId = "usage-consumer", - containerFactory = "batchKafkaListenerContainerFactory") + containerFactory = "kafkaListenerContainerFactory", + autoStartup = "false") public void consume(List> records, Acknowledgment ack) { if (records == null || records.isEmpty()) { diff --git a/src/main/java/com/project/consumer/util/LuaScriptLoader.java b/src/main/java/com/project/redis/consumer/util/LuaScriptLoader.java similarity index 95% rename from src/main/java/com/project/consumer/util/LuaScriptLoader.java rename to src/main/java/com/project/redis/consumer/util/LuaScriptLoader.java index b3fdb1d..a7af328 100644 --- a/src/main/java/com/project/consumer/util/LuaScriptLoader.java +++ b/src/main/java/com/project/redis/consumer/util/LuaScriptLoader.java @@ -1,4 +1,4 @@ -package com.project.consumer.util; +package com.project.redis.consumer.util; import java.io.InputStream; import java.nio.charset.StandardCharsets; diff --git a/src/main/java/com/project/consumer/util/PlanChangeUtil.java b/src/main/java/com/project/redis/consumer/util/PlanChangeUtil.java similarity index 92% rename from src/main/java/com/project/consumer/util/PlanChangeUtil.java rename to src/main/java/com/project/redis/consumer/util/PlanChangeUtil.java index 7e37603..50b253f 100644 --- a/src/main/java/com/project/consumer/util/PlanChangeUtil.java +++ b/src/main/java/com/project/redis/consumer/util/PlanChangeUtil.java @@ -1,6 +1,6 @@ -package com.project.consumer.util; +package com.project.redis.consumer.util; -import static com.project.consumer.util.UsageTimeUtil.toYearMonth; +import static com.project.redis.consumer.util.UsageTimeUtil.toYearMonth; import java.time.Duration; import java.time.OffsetDateTime; @@ -12,9 +12,9 @@ import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service; -import com.project.producer.schema.CalculatedLimitSchema; -import com.project.producer.schema.PlanChangeSchema; -import com.project.producer.test.PlanUnit; +import com.project.redis.producer.schema.CalculatedLimitSchema; +import com.project.redis.producer.schema.PlanChangeSchema; +import com.project.redis.producer.test.PlanUnit; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; diff --git a/src/main/java/com/project/consumer/util/RedisUtil.java b/src/main/java/com/project/redis/consumer/util/RedisUtil.java similarity index 95% rename from src/main/java/com/project/consumer/util/RedisUtil.java rename to src/main/java/com/project/redis/consumer/util/RedisUtil.java index 5b1289d..fb3bf61 100644 --- a/src/main/java/com/project/consumer/util/RedisUtil.java +++ b/src/main/java/com/project/redis/consumer/util/RedisUtil.java @@ -1,4 +1,4 @@ -package com.project.consumer.util; +package com.project.redis.consumer.util; import java.util.ArrayList; import java.util.Collections; @@ -11,8 +11,8 @@ import org.springframework.data.redis.core.types.Expiration; import org.springframework.stereotype.Service; -import com.project.producer.schema.CalculatedLimitSchema; -import com.project.producer.schema.UsageEventSchema; +import com.project.redis.producer.schema.CalculatedLimitSchema; +import com.project.redis.producer.schema.UsageEventSchema; import lombok.RequiredArgsConstructor; diff --git a/src/main/java/com/project/consumer/util/UsageTimeUtil.java b/src/main/java/com/project/redis/consumer/util/UsageTimeUtil.java similarity index 95% rename from src/main/java/com/project/consumer/util/UsageTimeUtil.java rename to src/main/java/com/project/redis/consumer/util/UsageTimeUtil.java index 27d149c..1be27d4 100644 --- a/src/main/java/com/project/consumer/util/UsageTimeUtil.java +++ b/src/main/java/com/project/redis/consumer/util/UsageTimeUtil.java @@ -1,4 +1,4 @@ -package com.project.consumer.util; +package com.project.redis.consumer.util; import java.time.Duration; import java.time.LocalTime; diff --git a/src/main/java/com/project/producer/NotificationProducer.java b/src/main/java/com/project/redis/producer/NotificationProducer.java similarity index 91% rename from src/main/java/com/project/producer/NotificationProducer.java rename to src/main/java/com/project/redis/producer/NotificationProducer.java index 8d0985b..d9ad36a 100644 --- a/src/main/java/com/project/producer/NotificationProducer.java +++ b/src/main/java/com/project/redis/producer/NotificationProducer.java @@ -1,4 +1,4 @@ -package com.project.producer; +package com.project.redis.producer; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; diff --git a/src/main/java/com/project/producer/PlanChangeProducer.java b/src/main/java/com/project/redis/producer/PlanChangeProducer.java similarity index 88% rename from src/main/java/com/project/producer/PlanChangeProducer.java rename to src/main/java/com/project/redis/producer/PlanChangeProducer.java index f2c589d..15aabcc 100644 --- a/src/main/java/com/project/producer/PlanChangeProducer.java +++ b/src/main/java/com/project/redis/producer/PlanChangeProducer.java @@ -1,8 +1,9 @@ -package com.project.producer; +package com.project.redis.producer; import java.time.OffsetDateTime; import java.util.UUID; +import org.springframework.context.annotation.Profile; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; @@ -10,8 +11,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.project.global.exception.ApplicationException; import com.project.global.exception.code.domain.GlobalErrorCode; -import com.project.producer.schema.PlanChangeSchema; -import com.project.producer.test.PlanUnit; +import com.project.redis.producer.schema.PlanChangeSchema; +import com.project.redis.producer.test.PlanUnit; import lombok.RequiredArgsConstructor; diff --git a/src/main/java/com/project/producer/UsageProducer.java b/src/main/java/com/project/redis/producer/UsageProducer.java similarity index 90% rename from src/main/java/com/project/producer/UsageProducer.java rename to src/main/java/com/project/redis/producer/UsageProducer.java index 05ad6a4..c74cb13 100644 --- a/src/main/java/com/project/producer/UsageProducer.java +++ b/src/main/java/com/project/redis/producer/UsageProducer.java @@ -1,8 +1,9 @@ -package com.project.producer; +package com.project.redis.producer; import java.time.OffsetDateTime; import java.util.UUID; +import org.springframework.context.annotation.Profile; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; @@ -10,7 +11,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.project.global.exception.ApplicationException; import com.project.global.exception.code.domain.GlobalErrorCode; -import com.project.producer.schema.UsageEventSchema; +import com.project.redis.producer.schema.UsageEventSchema; import lombok.RequiredArgsConstructor; diff --git a/src/main/java/com/project/producer/schema/CalculatedLimitSchema.java b/src/main/java/com/project/redis/producer/schema/CalculatedLimitSchema.java similarity index 58% rename from src/main/java/com/project/producer/schema/CalculatedLimitSchema.java rename to src/main/java/com/project/redis/producer/schema/CalculatedLimitSchema.java index d87ca7a..61a8343 100644 --- a/src/main/java/com/project/producer/schema/CalculatedLimitSchema.java +++ b/src/main/java/com/project/redis/producer/schema/CalculatedLimitSchema.java @@ -1,6 +1,6 @@ -package com.project.producer.schema; +package com.project.redis.producer.schema; -import com.project.producer.test.PlanUnit; +import com.project.redis.producer.test.PlanUnit; public record CalculatedLimitSchema( long subscriptionId, String yearMonth, long limit, long ttlSec, PlanUnit unit) {} diff --git a/src/main/java/com/project/producer/schema/PlanChangeSchema.java b/src/main/java/com/project/redis/producer/schema/PlanChangeSchema.java similarity index 82% rename from src/main/java/com/project/producer/schema/PlanChangeSchema.java rename to src/main/java/com/project/redis/producer/schema/PlanChangeSchema.java index a647165..1637597 100644 --- a/src/main/java/com/project/producer/schema/PlanChangeSchema.java +++ b/src/main/java/com/project/redis/producer/schema/PlanChangeSchema.java @@ -1,8 +1,8 @@ -package com.project.producer.schema; +package com.project.redis.producer.schema; import java.time.OffsetDateTime; -import com.project.producer.test.PlanUnit; +import com.project.redis.producer.test.PlanUnit; public record PlanChangeSchema( String eventId, // 멱등성/추적용 diff --git a/src/main/java/com/project/producer/schema/UsageEventSchema.java b/src/main/java/com/project/redis/producer/schema/UsageEventSchema.java similarity index 80% rename from src/main/java/com/project/producer/schema/UsageEventSchema.java rename to src/main/java/com/project/redis/producer/schema/UsageEventSchema.java index 4ee3ed3..e9b13c1 100644 --- a/src/main/java/com/project/producer/schema/UsageEventSchema.java +++ b/src/main/java/com/project/redis/producer/schema/UsageEventSchema.java @@ -1,4 +1,4 @@ -package com.project.producer.schema; +package com.project.redis.producer.schema; public record UsageEventSchema( String eventId, diff --git a/src/main/java/com/project/producer/test/PlanSeed.java b/src/main/java/com/project/redis/producer/test/PlanSeed.java similarity index 92% rename from src/main/java/com/project/producer/test/PlanSeed.java rename to src/main/java/com/project/redis/producer/test/PlanSeed.java index 2c6d45c..3efb3ec 100644 --- a/src/main/java/com/project/producer/test/PlanSeed.java +++ b/src/main/java/com/project/redis/producer/test/PlanSeed.java @@ -1,4 +1,4 @@ -package com.project.producer.test; +package com.project.redis.producer.test; import lombok.AllArgsConstructor; import lombok.Getter; diff --git a/src/main/java/com/project/producer/test/PlanUnit.java b/src/main/java/com/project/redis/producer/test/PlanUnit.java similarity index 59% rename from src/main/java/com/project/producer/test/PlanUnit.java rename to src/main/java/com/project/redis/producer/test/PlanUnit.java index 995a419..227c562 100644 --- a/src/main/java/com/project/producer/test/PlanUnit.java +++ b/src/main/java/com/project/redis/producer/test/PlanUnit.java @@ -1,4 +1,4 @@ -package com.project.producer.test; +package com.project.redis.producer.test; public enum PlanUnit { MONTH, From 8c8908164622302b03dcc36ef37305dbae1ed0c9 Mon Sep 17 00:00:00 2001 From: andrew Date: Wed, 21 Jan 2026 17:39:24 +0900 Subject: [PATCH 03/19] =?UTF-8?q?UPLUS-105=20fix=20:=20Spring=20=EC=84=A4?= =?UTF-8?q?=EC=A0=95=ED=8C=8C=EC=9D=BC=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/resources/application.yml | 37 +++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 1bf789d..ddd3459 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -15,9 +15,13 @@ spring: password: ${POSTGRES_PASSWORD:postgres} driver-class-name: org.postgresql.Driver + batch: + job: + enabled: false + jpa: hibernate: - ddl-auto: update + ddl-auto: validate show-sql: true database-platform: org.hibernate.dialect.PostgreSQLDialect properties: @@ -42,8 +46,39 @@ spring: org.apache.kafka.common.security.plain.PlainLoginModule required username='EPO6DS6OXW7GYVEE' password='cfltJj+U/TvM2TfzfqxpXIu5xDE/bPGe5sfIff5mrLZ9Usx4K9LudKBWPLIGG3QQ'; + + producer: + retries: 2147483647 + batch-size: 65536 + compression-type: lz4 + buffer-memory: 67108864 + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.apache.kafka.common.serialization.StringSerializer + + properties: + enable-idempotence: true + linger.ms: 20 + compression.type: lz4 + max.in.flight.requests.per.connection: 5 + consumer: + group-id: usage-notification-worker auto-offset-reset: earliest + enable-auto-commit: false + + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.apache.kafka.common.serialization.StringDeserializer + + max-poll-records: 1 + max-poll-interval-ms: 300000 + session-timeout-ms: 10000 + heartbeat-interval-ms: 3000 + + properties: + spring.json.trusted.packages: com.project.rdb.batch.model.dto + + listener: + auto-startup: false From 7bfdc66db1e69b4ed354745cd67979ba747272dd Mon Sep 17 00:00:00 2001 From: andrew Date: Wed, 21 Jan 2026 17:39:55 +0900 Subject: [PATCH 04/19] =?UTF-8?q?UPLUS-105=20fix=20:=20Spring=20Batch=20?= =?UTF-8?q?=EC=9D=98=EC=A1=B4=EC=84=B1=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- build.gradle | 3 +++ 1 file changed, 3 insertions(+) diff --git a/build.gradle b/build.gradle index 2c1f300..745f8b2 100644 --- a/build.gradle +++ b/build.gradle @@ -61,6 +61,9 @@ dependencies { implementation 'org.springframework.boot:spring-boot-starter-data-redis' runtimeOnly 'org.postgresql:postgresql' + // Batch + implementation 'org.springframework.boot:spring-boot-starter-batch' + // Kafka implementation 'org.apache.kafka:kafka-clients' implementation 'org.springframework.kafka:spring-kafka' From 140dc2f7009f50c2817f96ff3bef464885b48e31 Mon Sep 17 00:00:00 2001 From: andrew Date: Wed, 21 Jan 2026 17:40:17 +0900 Subject: [PATCH 05/19] =?UTF-8?q?UPLUS-105=20fix=20:=20Kafka=20Consumer=20?= =?UTF-8?q?=EC=84=A4=EC=A0=95=20=ED=8C=8C=EC=9D=BC=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/project/global/config/KafkaConfig.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/project/global/config/KafkaConfig.java b/src/main/java/com/project/global/config/KafkaConfig.java index 054d946..f96318e 100644 --- a/src/main/java/com/project/global/config/KafkaConfig.java +++ b/src/main/java/com/project/global/config/KafkaConfig.java @@ -1,8 +1,10 @@ package com.project.global.config; +import java.util.Arrays; + import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.core.env.Environment; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; @@ -11,7 +13,6 @@ import org.springframework.util.backoff.FixedBackOff; @Configuration -@EnableKafka public class KafkaConfig { @Bean @@ -36,14 +37,13 @@ public KafkaListenerContainerFactory kafkaListenerContainerFactory( factory.setConsumerFactory(consumerFactory); factory.setBatchListener(true); - - factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); + factory.setAutoStartup(false); // ⭐ 핵심 factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); DefaultErrorHandler errorHandler = new DefaultErrorHandler( - new FixedBackOff(1000L, 9) // 총 10번 시도(초기+9) + new FixedBackOff(1000L, 2) // 총 5번 시도(초기+9) ); factory.setCommonErrorHandler(errorHandler); From 5ae06797b28400d1f48edd77554f5c8025a6a688 Mon Sep 17 00:00:00 2001 From: andrew Date: Wed, 21 Jan 2026 17:40:32 +0900 Subject: [PATCH 06/19] =?UTF-8?q?UPLUS-105=20feat=20:=20=ED=95=84=EC=9A=94?= =?UTF-8?q?=ED=95=9C=20=EC=97=90=EB=9F=AC=EC=BD=94=EB=93=9C=20=EC=A0=95?= =?UTF-8?q?=EC=9D=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../global/exception/code/domain/GlobalErrorCode.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/project/global/exception/code/domain/GlobalErrorCode.java b/src/main/java/com/project/global/exception/code/domain/GlobalErrorCode.java index 05d65b5..8bba73c 100644 --- a/src/main/java/com/project/global/exception/code/domain/GlobalErrorCode.java +++ b/src/main/java/com/project/global/exception/code/domain/GlobalErrorCode.java @@ -18,7 +18,13 @@ public enum GlobalErrorCode implements BaseErrorCode { PLAN_CHANGE_EVENT_PRODUCE_INVALID( HttpStatus.BAD_REQUEST, "COMMON_007", "Kafka PlanChange 이벤트 발행 과정에서 에러가 발생했습니다"), LUA_SCRIPT_LOAD_INVALID(HttpStatus.BAD_REQUEST, "COMMON_008", "LUA 스크립트를 불러오는 과정에서 에러가 발생했습니다"), - JSON_CONVERT_INVALID(HttpStatus.BAD_REQUEST, "COMMON_009", "JSON으로 변환하는 과정에서 에러가 발생했습니다"); + JSON_CONVERT_INVALID(HttpStatus.BAD_REQUEST, "COMMON_009", "JSON으로 변환하는 과정에서 에러가 발생했습니다"), + USAGE_LOG_BATCH_FAILED(HttpStatus.BAD_REQUEST, "COMMON_010", "사용자 데이터 사용량 적재 배치 시스템이 실패하였습니다"), + USAGE_NOTIFICATION_PRODUCER_FAILED( + HttpStatus.BAD_REQUEST, "COMMON_011", "UsageNotification Produce 과정에서 에러가 발생하였습니다"), + USAGE_OUTBOX_WRITER_FAILED( + HttpStatus.BAD_REQUEST, "COMMON_012", "UsageOutbox Writer 배치 시스템이 실패하였습니다"), + PLAN_NOT_VALID(HttpStatus.BAD_REQUEST, "COMMON_013", "존재하지 않는 요금제입니다"); private final HttpStatus httpStatus; private final String customCode; From bc4d7ac3597a27e3435a72b32ace2f42815166a2 Mon Sep 17 00:00:00 2001 From: andrew Date: Wed, 21 Jan 2026 17:44:37 +0900 Subject: [PATCH 07/19] =?UTF-8?q?UPLUS-105=20feat=20:=20=EC=82=AC=EC=9A=A9?= =?UTF-8?q?=EB=9F=89=20=EC=A7=91=EA=B3=84=20=EB=B0=B0=EC=B9=98=20=EB=A1=9C?= =?UTF-8?q?=EC=A7=81=20=EA=B5=AC=EC=84=B1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../config/UsageAggregationJobConfig.java | 82 +++++++++++++++++++ .../processor/UsageDailyProcessor.java | 21 +++++ .../processor/UsageMonthlyProcessor.java | 21 +++++ .../reader/UsageLogDailyReaderConfig.java | 58 +++++++++++++ .../reader/UsageLogMonthlyReaderConfig.java | 59 +++++++++++++ .../writer/UsageSummaryDailyWriter.java | 54 ++++++++++++ .../writer/UsageSummaryMonthlyWriter.java | 55 +++++++++++++ 7 files changed, 350 insertions(+) create mode 100644 src/main/java/com/project/rdb/batch/usageaggregate/config/UsageAggregationJobConfig.java create mode 100644 src/main/java/com/project/rdb/batch/usageaggregate/processor/UsageDailyProcessor.java create mode 100644 src/main/java/com/project/rdb/batch/usageaggregate/processor/UsageMonthlyProcessor.java create mode 100644 src/main/java/com/project/rdb/batch/usageaggregate/reader/UsageLogDailyReaderConfig.java create mode 100644 src/main/java/com/project/rdb/batch/usageaggregate/reader/UsageLogMonthlyReaderConfig.java create mode 100644 src/main/java/com/project/rdb/batch/usageaggregate/writer/UsageSummaryDailyWriter.java create mode 100644 src/main/java/com/project/rdb/batch/usageaggregate/writer/UsageSummaryMonthlyWriter.java diff --git a/src/main/java/com/project/rdb/batch/usageaggregate/config/UsageAggregationJobConfig.java b/src/main/java/com/project/rdb/batch/usageaggregate/config/UsageAggregationJobConfig.java new file mode 100644 index 0000000..915b707 --- /dev/null +++ b/src/main/java/com/project/rdb/batch/usageaggregate/config/UsageAggregationJobConfig.java @@ -0,0 +1,82 @@ +package com.project.rdb.batch.usageaggregate.config; + +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.configuration.annotation.JobScope; +import org.springframework.batch.core.job.builder.JobBuilder; +import org.springframework.batch.core.launch.support.RunIdIncrementer; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.step.builder.StepBuilder; +import org.springframework.batch.item.database.JdbcCursorItemReader; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.transaction.PlatformTransactionManager; + +import com.project.rdb.batch.model.BatchStepMetricsListener; +import com.project.rdb.batch.model.dto.UsageDailyAggregation; +import com.project.rdb.batch.model.dto.UsageLogRow; +import com.project.rdb.batch.model.dto.UsageMonthlyAggregation; +import com.project.rdb.batch.usageaggregate.processor.UsageDailyProcessor; +import com.project.rdb.batch.usageaggregate.processor.UsageMonthlyProcessor; +import com.project.rdb.batch.usageaggregate.writer.UsageSummaryDailyWriter; +import com.project.rdb.batch.usageaggregate.writer.UsageSummaryMonthlyWriter; + +import lombok.RequiredArgsConstructor; + +@Configuration +@RequiredArgsConstructor +public class UsageAggregationJobConfig { + + private static final int CHUNK_SIZE = 20_000; + + private final JobRepository jobRepository; + private final PlatformTransactionManager txManager; + + @Qualifier("usageLogDailyReader") + private final JdbcCursorItemReader usageLogDailyReader; + + @Qualifier("usageLogMonthlyReader") + private final JdbcCursorItemReader usageLogMonthlyReader; + + private final UsageDailyProcessor usageDailyProcessor; + private final UsageMonthlyProcessor usageMonthlyProcessor; + + private final UsageSummaryDailyWriter usageSummaryDailyWriter; + private final UsageSummaryMonthlyWriter usageSummaryMonthlyWriter; + + private final BatchStepMetricsListener batchStepMetricsListener; + + @Bean + public Job usageAggregationJob() { + return new JobBuilder("usageAggregationJob", jobRepository) + .incrementer(new RunIdIncrementer()) + .start(dailyAggregationStep()) + .next(monthlyAggregationStep()) + .build(); + } + + @JobScope + @Bean + public Step dailyAggregationStep() { + return new StepBuilder("dailyAggregationStep", jobRepository) + .chunk(CHUNK_SIZE, txManager) + .reader(usageLogDailyReader) + .processor(usageDailyProcessor) + .writer(usageSummaryDailyWriter) + .listener(batchStepMetricsListener) + .build(); + } + + @JobScope + @Bean + public Step monthlyAggregationStep() { + return new StepBuilder("monthlyAggregationStep", jobRepository) + .chunk(CHUNK_SIZE, txManager) + .reader(usageLogMonthlyReader) + .processor(usageMonthlyProcessor) + .writer(usageSummaryMonthlyWriter) + .listener(batchStepMetricsListener) + .build(); + } +} diff --git a/src/main/java/com/project/rdb/batch/usageaggregate/processor/UsageDailyProcessor.java b/src/main/java/com/project/rdb/batch/usageaggregate/processor/UsageDailyProcessor.java new file mode 100644 index 0000000..b7a1b08 --- /dev/null +++ b/src/main/java/com/project/rdb/batch/usageaggregate/processor/UsageDailyProcessor.java @@ -0,0 +1,21 @@ +package com.project.rdb.batch.usageaggregate.processor; + +import java.time.format.DateTimeFormatter; + +import org.springframework.batch.item.ItemProcessor; +import org.springframework.stereotype.Component; + +import com.project.rdb.batch.model.dto.UsageDailyAggregation; +import com.project.rdb.batch.model.dto.UsageLogRow; + +@Component +public class UsageDailyProcessor implements ItemProcessor { + + private static final DateTimeFormatter DAY_FMT = DateTimeFormatter.ofPattern("yyyyMMdd"); + + @Override + public UsageDailyAggregation process(UsageLogRow log) { + return new UsageDailyAggregation( + log.subId(), log.eventTime().format(DAY_FMT), log.usedBytes()); + } +} diff --git a/src/main/java/com/project/rdb/batch/usageaggregate/processor/UsageMonthlyProcessor.java b/src/main/java/com/project/rdb/batch/usageaggregate/processor/UsageMonthlyProcessor.java new file mode 100644 index 0000000..0af4e1e --- /dev/null +++ b/src/main/java/com/project/rdb/batch/usageaggregate/processor/UsageMonthlyProcessor.java @@ -0,0 +1,21 @@ +package com.project.rdb.batch.usageaggregate.processor; + +import java.time.format.DateTimeFormatter; + +import org.springframework.batch.item.ItemProcessor; +import org.springframework.stereotype.Component; + +import com.project.rdb.batch.model.dto.UsageLogRow; +import com.project.rdb.batch.model.dto.UsageMonthlyAggregation; + +@Component +public class UsageMonthlyProcessor implements ItemProcessor { + + private static final DateTimeFormatter MONTH_FMT = DateTimeFormatter.ofPattern("yyyyMM"); + + @Override + public UsageMonthlyAggregation process(UsageLogRow log) { + return new UsageMonthlyAggregation( + log.subId(), log.eventTime().format(MONTH_FMT), log.usedBytes()); + } +} diff --git a/src/main/java/com/project/rdb/batch/usageaggregate/reader/UsageLogDailyReaderConfig.java b/src/main/java/com/project/rdb/batch/usageaggregate/reader/UsageLogDailyReaderConfig.java new file mode 100644 index 0000000..7c7a889 --- /dev/null +++ b/src/main/java/com/project/rdb/batch/usageaggregate/reader/UsageLogDailyReaderConfig.java @@ -0,0 +1,58 @@ +package com.project.rdb.batch.usageaggregate.reader; + +import java.time.LocalDateTime; + +import javax.sql.DataSource; + +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.item.database.JdbcCursorItemReader; +import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import com.project.rdb.batch.model.dto.UsageLogRow; + +@Configuration +public class UsageLogDailyReaderConfig { + + @Bean(name = "usageLogDailyReader") + @StepScope + public JdbcCursorItemReader usageLogDailyReader( + DataSource dataSource, + @Value("#{jobParameters['fromTime']}") LocalDateTime fromTime, + @Value("#{jobParameters['toTime']}") LocalDateTime toTime) { + String sql = + """ + SELECT + ul.sub_id, + ul.used_bytes, + ul.event_time + FROM usage_log ul + JOIN subscription_plan sp + ON sp.sub_id = ul.sub_id + WHERE ul.event_time >= ? + AND ul.event_time < ? + AND sp.allotment_amount = 5120 + ORDER BY ul.id + """; + + return new JdbcCursorItemReaderBuilder() + .name("usageLogDailyReader") + .dataSource(dataSource) + .sql(sql) + .fetchSize(1000) + .preparedStatementSetter( + ps -> { + ps.setObject(1, fromTime); + ps.setObject(2, toTime); + }) + .rowMapper( + (rs, rowNum) -> + new UsageLogRow( + rs.getLong("sub_id"), + rs.getLong("used_bytes"), + rs.getTimestamp("event_time").toLocalDateTime())) + .build(); + } +} diff --git a/src/main/java/com/project/rdb/batch/usageaggregate/reader/UsageLogMonthlyReaderConfig.java b/src/main/java/com/project/rdb/batch/usageaggregate/reader/UsageLogMonthlyReaderConfig.java new file mode 100644 index 0000000..306a885 --- /dev/null +++ b/src/main/java/com/project/rdb/batch/usageaggregate/reader/UsageLogMonthlyReaderConfig.java @@ -0,0 +1,59 @@ +package com.project.rdb.batch.usageaggregate.reader; + +import java.time.LocalDateTime; + +import javax.sql.DataSource; + +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.item.database.JdbcCursorItemReader; +import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import com.project.rdb.batch.model.dto.UsageLogRow; + +@Configuration +public class UsageLogMonthlyReaderConfig { + + @Bean(name = "usageLogMonthlyReader") + @StepScope + public JdbcCursorItemReader usageLogMonthlyReader( + DataSource dataSource, + @Value("#{jobParameters['fromTime']}") LocalDateTime fromTime, + @Value("#{jobParameters['toTime']}") LocalDateTime toTime) { + String sql = + """ + SELECT + ul.sub_id, + ul.used_bytes, + ul.event_time + FROM usage_log ul + JOIN subscription_plan sp + ON sp.sub_id = ul.sub_id + WHERE ul.event_time >= ? + AND ul.event_time < ? + AND sp.allotment_amount > 0 + AND sp.allotment_amount != 5120 + ORDER BY ul.id + """; + + return new JdbcCursorItemReaderBuilder() + .name("usageLogMonthlyReader") + .dataSource(dataSource) + .sql(sql) + .fetchSize(1000) + .preparedStatementSetter( + ps -> { + ps.setObject(1, fromTime); + ps.setObject(2, toTime); + }) + .rowMapper( + (rs, rowNum) -> + new UsageLogRow( + rs.getLong("sub_id"), + rs.getLong("used_bytes"), + rs.getTimestamp("event_time").toLocalDateTime())) + .build(); + } +} diff --git a/src/main/java/com/project/rdb/batch/usageaggregate/writer/UsageSummaryDailyWriter.java b/src/main/java/com/project/rdb/batch/usageaggregate/writer/UsageSummaryDailyWriter.java new file mode 100644 index 0000000..e33ad90 --- /dev/null +++ b/src/main/java/com/project/rdb/batch/usageaggregate/writer/UsageSummaryDailyWriter.java @@ -0,0 +1,54 @@ +package com.project.rdb.batch.usageaggregate.writer; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.springframework.batch.item.Chunk; +import org.springframework.batch.item.ItemWriter; +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; +import org.springframework.stereotype.Component; + +import com.project.rdb.batch.model.dto.UsageDailyAggregation; +import com.project.rdb.batch.model.dto.UsageDailyKey; +import com.project.rdb.batch.usageaggregate.config.Sqls; + +import lombok.RequiredArgsConstructor; + +@Component +@RequiredArgsConstructor +public class UsageSummaryDailyWriter implements ItemWriter { + + private final NamedParameterJdbcTemplate jdbcTemplate; + + @Override + public void write(Chunk chunk) throws Exception { + Map aggregated = new HashMap<>(); + + for (UsageDailyAggregation item : chunk) { + UsageDailyKey key = new UsageDailyKey(item.subId(), item.usageDate()); + + aggregated.merge(key, item.deltaBytes(), Long::sum); + } + + if (aggregated.isEmpty()) { + return; + } + + String sql = Sqls.DAILY_UPSERT; + + List> params = + aggregated.entrySet().stream() + .map( + e -> { + Map map = new HashMap<>(); + map.put("subId", e.getKey().subId()); + map.put("usageDate", e.getKey().usageDate()); + map.put("delta", e.getValue()); + return map; + }) + .toList(); + + jdbcTemplate.batchUpdate(sql, params.toArray(new Map[0])); + } +} diff --git a/src/main/java/com/project/rdb/batch/usageaggregate/writer/UsageSummaryMonthlyWriter.java b/src/main/java/com/project/rdb/batch/usageaggregate/writer/UsageSummaryMonthlyWriter.java new file mode 100644 index 0000000..e931e3b --- /dev/null +++ b/src/main/java/com/project/rdb/batch/usageaggregate/writer/UsageSummaryMonthlyWriter.java @@ -0,0 +1,55 @@ +package com.project.rdb.batch.usageaggregate.writer; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.springframework.batch.item.Chunk; +import org.springframework.batch.item.ItemWriter; +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; +import org.springframework.stereotype.Component; + +import com.project.rdb.batch.model.dto.UsageMonthlyAggregation; +import com.project.rdb.batch.model.dto.UsageMonthlyKey; +import com.project.rdb.batch.usageaggregate.config.Sqls; + +import lombok.RequiredArgsConstructor; + +@Component +@RequiredArgsConstructor +public class UsageSummaryMonthlyWriter implements ItemWriter { + + private final NamedParameterJdbcTemplate jdbcTemplate; + + @Override + public void write(Chunk chunk) { + + Map aggregated = new HashMap<>(); + + for (UsageMonthlyAggregation item : chunk) { + UsageMonthlyKey key = new UsageMonthlyKey(item.subId(), item.period()); + + aggregated.merge(key, item.deltaBytes(), Long::sum); + } + + if (aggregated.isEmpty()) { + return; + } + + String sql = Sqls.MONTHLY_UPSERT; + + List> params = + aggregated.entrySet().stream() + .map( + entry -> { + Map map = new HashMap<>(); + map.put("subId", entry.getKey().subId()); + map.put("period", entry.getKey().period()); + map.put("delta", entry.getValue()); + return map; + }) + .toList(); + + jdbcTemplate.batchUpdate(sql, params.toArray(new Map[0])); + } +} From 0317cde288a9215c900ccd2f34a0e213ed780534 Mon Sep 17 00:00:00 2001 From: andrew Date: Wed, 21 Jan 2026 17:45:30 +0900 Subject: [PATCH 08/19] =?UTF-8?q?UPLUS-105=20feat=20:=20=EC=82=AC=EC=9A=A9?= =?UTF-8?q?=EB=9F=89=20=EC=A7=91=EA=B3=84=20=EB=B0=B0=EC=B9=98=EC=97=90=20?= =?UTF-8?q?=ED=95=84=EC=9A=94=ED=95=9C=20Entity=20=EB=B0=8F=20Dto=20?= =?UTF-8?q?=EC=A0=95=EC=9D=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../model/dto/UsageDailyAggregation.java | 3 ++ .../rdb/batch/model/dto/UsageLogRow.java | 5 +++ .../model/dto/UsageMonthlyAggregation.java | 3 ++ .../rdb/batch/model/dto/UsageMonthlyKey.java | 18 ++++++++ .../rdb/batch/model/entity/UsageLog.java | 41 +++++++++++++++++++ 5 files changed, 70 insertions(+) create mode 100644 src/main/java/com/project/rdb/batch/model/dto/UsageDailyAggregation.java create mode 100644 src/main/java/com/project/rdb/batch/model/dto/UsageLogRow.java create mode 100644 src/main/java/com/project/rdb/batch/model/dto/UsageMonthlyAggregation.java create mode 100644 src/main/java/com/project/rdb/batch/model/dto/UsageMonthlyKey.java create mode 100644 src/main/java/com/project/rdb/batch/model/entity/UsageLog.java diff --git a/src/main/java/com/project/rdb/batch/model/dto/UsageDailyAggregation.java b/src/main/java/com/project/rdb/batch/model/dto/UsageDailyAggregation.java new file mode 100644 index 0000000..af7697c --- /dev/null +++ b/src/main/java/com/project/rdb/batch/model/dto/UsageDailyAggregation.java @@ -0,0 +1,3 @@ +package com.project.rdb.batch.model.dto; + +public record UsageDailyAggregation(Long subId, String usageDate, long deltaBytes) {} diff --git a/src/main/java/com/project/rdb/batch/model/dto/UsageLogRow.java b/src/main/java/com/project/rdb/batch/model/dto/UsageLogRow.java new file mode 100644 index 0000000..a45ea7d --- /dev/null +++ b/src/main/java/com/project/rdb/batch/model/dto/UsageLogRow.java @@ -0,0 +1,5 @@ +package com.project.rdb.batch.model.dto; + +import java.time.LocalDateTime; + +public record UsageLogRow(Long subId, Long usedBytes, LocalDateTime eventTime) {} diff --git a/src/main/java/com/project/rdb/batch/model/dto/UsageMonthlyAggregation.java b/src/main/java/com/project/rdb/batch/model/dto/UsageMonthlyAggregation.java new file mode 100644 index 0000000..e63dd9a --- /dev/null +++ b/src/main/java/com/project/rdb/batch/model/dto/UsageMonthlyAggregation.java @@ -0,0 +1,3 @@ +package com.project.rdb.batch.model.dto; + +public record UsageMonthlyAggregation(Long subId, String period, long deltaBytes) {} diff --git a/src/main/java/com/project/rdb/batch/model/dto/UsageMonthlyKey.java b/src/main/java/com/project/rdb/batch/model/dto/UsageMonthlyKey.java new file mode 100644 index 0000000..a6ec50f --- /dev/null +++ b/src/main/java/com/project/rdb/batch/model/dto/UsageMonthlyKey.java @@ -0,0 +1,18 @@ +package com.project.rdb.batch.model.dto; + +import java.util.Objects; + +public record UsageMonthlyKey(Long subId, String period) { + + @Override + public boolean equals(Object oj) { + if (this == oj) { + return true; + } + if (!(oj instanceof UsageMonthlyKey)) { + return false; + } + UsageMonthlyKey that = (UsageMonthlyKey) oj; + return Objects.equals(subId, that.subId) && Objects.equals(period, that.period); + } +} diff --git a/src/main/java/com/project/rdb/batch/model/entity/UsageLog.java b/src/main/java/com/project/rdb/batch/model/entity/UsageLog.java new file mode 100644 index 0000000..3b865d5 --- /dev/null +++ b/src/main/java/com/project/rdb/batch/model/entity/UsageLog.java @@ -0,0 +1,41 @@ +package com.project.rdb.batch.model.entity; + +import java.time.LocalDateTime; + +import jakarta.persistence.Column; +import jakarta.persistence.Entity; +import jakarta.persistence.GeneratedValue; +import jakarta.persistence.GenerationType; +import jakarta.persistence.Id; +import jakarta.persistence.Table; + +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +@Entity +@Table(name = "usage_log") +@Getter +@SuperBuilder +@NoArgsConstructor +public class UsageLog { + + @Id + @GeneratedValue(strategy = GenerationType.IDENTITY) + private Long id; + + @Column(name = "event_id", nullable = false, length = 64) + private String eventId; + + @Column(name = "sub_id", nullable = false) + private Long subId; + + @Column(name = "used_bytes", nullable = false) + private Long usedBytes; + + @Column(name = "event_time", nullable = false) + private LocalDateTime eventTime; + + @Column(name = "created_at", nullable = false) + private LocalDateTime createdAt; +} From a4eac93fb197dc17377ab42d70fac0d5fa081735 Mon Sep 17 00:00:00 2001 From: andrew Date: Wed, 21 Jan 2026 17:45:48 +0900 Subject: [PATCH 09/19] =?UTF-8?q?UPLUS-105=20feat=20:=20=EC=82=AC=EC=9A=A9?= =?UTF-8?q?=EB=9F=89=20=EC=A7=91=EA=B3=84=20=EB=B0=B0=EC=B9=98=EC=97=90=20?= =?UTF-8?q?=ED=95=84=EC=9A=94=ED=95=9C=20Entity=20=EB=B0=8F=20Dto=20?= =?UTF-8?q?=EC=A0=95=EC=9D=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../rdb/batch/model/dto/UsageDailyKey.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) create mode 100644 src/main/java/com/project/rdb/batch/model/dto/UsageDailyKey.java diff --git a/src/main/java/com/project/rdb/batch/model/dto/UsageDailyKey.java b/src/main/java/com/project/rdb/batch/model/dto/UsageDailyKey.java new file mode 100644 index 0000000..7e7f495 --- /dev/null +++ b/src/main/java/com/project/rdb/batch/model/dto/UsageDailyKey.java @@ -0,0 +1,17 @@ +package com.project.rdb.batch.model.dto; + +import java.util.Objects; + +public record UsageDailyKey(Long subId, String usageDate) { + @Override + public boolean equals(Object oj) { + if (this == oj) { + return true; + } + if (!(oj instanceof UsageDailyKey)) { + return false; + } + UsageDailyKey that = (UsageDailyKey) oj; + return Objects.equals(subId, that.subId) && Objects.equals(usageDate, that.usageDate); + } +} From 5d8f8487b84f59ef48ccb32662dcabd2d0da58fe Mon Sep 17 00:00:00 2001 From: andrew Date: Wed, 21 Jan 2026 17:46:44 +0900 Subject: [PATCH 10/19] =?UTF-8?q?UPLUS-105=20feat=20:=20=EC=82=AC=EC=9A=A9?= =?UTF-8?q?=EB=9F=89=20=EC=9E=84=EA=B3=84=EC=B9=98=20=EC=B4=88=EA=B3=BC=20?= =?UTF-8?q?=EA=B2=80=EC=A6=9D=20=EB=B0=B0=EC=B9=98=20=EB=A1=9C=EC=A7=81=20?= =?UTF-8?q?=EA=B5=AC=ED=98=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../config/UsageNotificationJobConfig.java | 75 +++++++++++++++++++ .../config/util/UsageNotificationPolicy.java | 48 ++++++++++++ .../processor/UsageNotificationProcessor.java | 24 ++++++ .../UsageNotificationDailyReaderConfig.java | 54 +++++++++++++ .../UsageNotificationMonthlyReaderConfig.java | 55 ++++++++++++++ .../writer/UsageNotificationWriter.java | 59 +++++++++++++++ 6 files changed, 315 insertions(+) create mode 100644 src/main/java/com/project/rdb/batch/usagenotification/config/UsageNotificationJobConfig.java create mode 100644 src/main/java/com/project/rdb/batch/usagenotification/config/util/UsageNotificationPolicy.java create mode 100644 src/main/java/com/project/rdb/batch/usagenotification/processor/UsageNotificationProcessor.java create mode 100644 src/main/java/com/project/rdb/batch/usagenotification/reader/UsageNotificationDailyReaderConfig.java create mode 100644 src/main/java/com/project/rdb/batch/usagenotification/reader/UsageNotificationMonthlyReaderConfig.java create mode 100644 src/main/java/com/project/rdb/batch/usagenotification/writer/UsageNotificationWriter.java diff --git a/src/main/java/com/project/rdb/batch/usagenotification/config/UsageNotificationJobConfig.java b/src/main/java/com/project/rdb/batch/usagenotification/config/UsageNotificationJobConfig.java new file mode 100644 index 0000000..7e3a8af --- /dev/null +++ b/src/main/java/com/project/rdb/batch/usagenotification/config/UsageNotificationJobConfig.java @@ -0,0 +1,75 @@ +package com.project.rdb.batch.usagenotification.config; + +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.configuration.annotation.JobScope; +import org.springframework.batch.core.job.builder.JobBuilder; +import org.springframework.batch.core.launch.support.RunIdIncrementer; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.step.builder.StepBuilder; +import org.springframework.batch.item.database.JdbcCursorItemReader; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.transaction.PlatformTransactionManager; + +import com.project.rdb.batch.model.BatchStepMetricsListener; +import com.project.rdb.batch.model.dto.UsageNotificationCandidate; +import com.project.rdb.batch.model.dto.UsageNotificationSource; +import com.project.rdb.batch.usagenotification.processor.UsageNotificationProcessor; +import com.project.rdb.batch.usagenotification.writer.UsageNotificationWriter; + +import lombok.RequiredArgsConstructor; + +@Configuration +@RequiredArgsConstructor +public class UsageNotificationJobConfig { + + private static final int CHUNK_SIZE = 10_000; + + private final JobRepository jobRepository; + private final PlatformTransactionManager txManager; + + @Qualifier("usageNotificationMonthlyReader") + private final JdbcCursorItemReader usageNotificationMonthlyReader; + + @Qualifier("usageNotificationDailyReader") + private final JdbcCursorItemReader usageNotificationDailyReader; + + private final UsageNotificationProcessor usageNotificationProcessor; + private final UsageNotificationWriter usageNotificationWriter; + private final BatchStepMetricsListener batchStepMetricsListener; + + @Bean + public Job usageNotificationJob() { + return new JobBuilder("usageNotificationJob", jobRepository) + .incrementer(new RunIdIncrementer()) + .start(usageNotificationMonthlyStep()) + .next(usageNotificationDailyStep()) + .build(); + } + + @Bean + @JobScope + public Step usageNotificationMonthlyStep() { + return new StepBuilder("usageNotificationMonthlyStep", jobRepository) + .chunk(CHUNK_SIZE, txManager) + .reader(usageNotificationMonthlyReader) + .processor(usageNotificationProcessor) + .writer(usageNotificationWriter) + .listener(batchStepMetricsListener) + .build(); + } + + @Bean + @JobScope + public Step usageNotificationDailyStep() { + return new StepBuilder("usageNotificationDailyStep", jobRepository) + .chunk(CHUNK_SIZE, txManager) + .reader(usageNotificationDailyReader) + .processor(usageNotificationProcessor) + .writer(usageNotificationWriter) + .listener(batchStepMetricsListener) + .build(); + } +} diff --git a/src/main/java/com/project/rdb/batch/usagenotification/config/util/UsageNotificationPolicy.java b/src/main/java/com/project/rdb/batch/usagenotification/config/util/UsageNotificationPolicy.java new file mode 100644 index 0000000..d238204 --- /dev/null +++ b/src/main/java/com/project/rdb/batch/usagenotification/config/util/UsageNotificationPolicy.java @@ -0,0 +1,48 @@ +package com.project.rdb.batch.usagenotification.config.util; + +import java.util.Optional; + +import org.springframework.stereotype.Component; + +import com.project.rdb.batch.model.dto.UsageNotificationCandidate; +import com.project.rdb.batch.model.dto.UsageNotificationSource; + +@Component +public class UsageNotificationPolicy { + + public Optional evaluate(UsageNotificationSource source) { + if (source.allotmentAmount() <= 0) { + return Optional.empty(); + } + + long totalUsedMb = source.totalUsedBytes() / (1024 * 1024); + long allotmentMb = source.allotmentAmount(); + + int percent = (int) ((totalUsedMb * 100L) / allotmentMb); + + return decideThreshold(percent) + .map( + threshold -> + new UsageNotificationCandidate( + source.subId(), + source.period(), + source.unit(), + threshold, + percent, + totalUsedMb, + allotmentMb)); + } + + private Optional decideThreshold(int percent) { + if (percent >= 100) { + return Optional.of(100); + } + if (percent >= 80) { + return Optional.of(80); + } + if (percent >= 50) { + return Optional.of(50); + } + return Optional.empty(); + } +} diff --git a/src/main/java/com/project/rdb/batch/usagenotification/processor/UsageNotificationProcessor.java b/src/main/java/com/project/rdb/batch/usagenotification/processor/UsageNotificationProcessor.java new file mode 100644 index 0000000..d13f601 --- /dev/null +++ b/src/main/java/com/project/rdb/batch/usagenotification/processor/UsageNotificationProcessor.java @@ -0,0 +1,24 @@ +package com.project.rdb.batch.usagenotification.processor; + +import org.springframework.batch.item.ItemProcessor; +import org.springframework.stereotype.Component; + +import com.project.rdb.batch.model.dto.UsageNotificationCandidate; +import com.project.rdb.batch.model.dto.UsageNotificationSource; +import com.project.rdb.batch.usagenotification.config.util.UsageNotificationPolicy; + +import lombok.RequiredArgsConstructor; + +@Component +@RequiredArgsConstructor +public class UsageNotificationProcessor + implements ItemProcessor { + + private final UsageNotificationPolicy policy; + + @Override + public UsageNotificationCandidate process(UsageNotificationSource source) { + + return policy.evaluate(source).orElse(null); + } +} diff --git a/src/main/java/com/project/rdb/batch/usagenotification/reader/UsageNotificationDailyReaderConfig.java b/src/main/java/com/project/rdb/batch/usagenotification/reader/UsageNotificationDailyReaderConfig.java new file mode 100644 index 0000000..60f4cf8 --- /dev/null +++ b/src/main/java/com/project/rdb/batch/usagenotification/reader/UsageNotificationDailyReaderConfig.java @@ -0,0 +1,54 @@ +package com.project.rdb.batch.usagenotification.reader; + +import javax.sql.DataSource; + +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.item.database.JdbcCursorItemReader; +import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import com.project.rdb.batch.model.dto.UsageNotificationSource; + +@Configuration +public class UsageNotificationDailyReaderConfig { + + @Bean(name = "usageNotificationDailyReader") + @StepScope + public JdbcCursorItemReader usageNotificationDailyReader( + DataSource dataSource, @Value("#{jobParameters['usageDate']}") String usageDate) { + + String sql = + """ + SELECT + usd.sub_id, + usd.usage_date AS period, + 'DAY' AS unit, + usd.total_used_bytes, + sp.allotment_amount + FROM usage_summary_daily usd + JOIN subscription_plan sp + ON sp.sub_id = usd.sub_id + WHERE usd.usage_date = ? + AND sp.allotment_amount = 5120 + ORDER BY usd.sub_id + """; + + return new JdbcCursorItemReaderBuilder() + .name("usageNotificationDailyReader") + .dataSource(dataSource) + .sql(sql) + .fetchSize(1000) + .preparedStatementSetter(ps -> ps.setString(1, usageDate)) + .rowMapper( + (rs, rowNum) -> + new UsageNotificationSource( + rs.getLong("sub_id"), + rs.getString("period"), + rs.getString("unit"), + rs.getLong("total_used_bytes"), + rs.getLong("allotment_amount"))) + .build(); + } +} diff --git a/src/main/java/com/project/rdb/batch/usagenotification/reader/UsageNotificationMonthlyReaderConfig.java b/src/main/java/com/project/rdb/batch/usagenotification/reader/UsageNotificationMonthlyReaderConfig.java new file mode 100644 index 0000000..d87778e --- /dev/null +++ b/src/main/java/com/project/rdb/batch/usagenotification/reader/UsageNotificationMonthlyReaderConfig.java @@ -0,0 +1,55 @@ +package com.project.rdb.batch.usagenotification.reader; + +import javax.sql.DataSource; + +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.item.database.JdbcCursorItemReader; +import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import com.project.rdb.batch.model.dto.UsageNotificationSource; + +@Configuration +public class UsageNotificationMonthlyReaderConfig { + + @Bean(name = "usageNotificationMonthlyReader") + @StepScope + public JdbcCursorItemReader usageNotificationMonthlyReader( + DataSource dataSource, @Value("#{jobParameters['period']}") String period) { + + String sql = + """ + SELECT + us.sub_id, + us.period, + 'MONTH' AS unit, + us.total_used_bytes, + sp.allotment_amount + FROM usage_summary_monthly us + JOIN subscription_plan sp + ON sp.sub_id = us.sub_id + WHERE us.period = ? + AND sp.allotment_amount > 0 + AND sp.allotment_amount != 5120 + ORDER BY us.sub_id + """; + + return new JdbcCursorItemReaderBuilder() + .name("usageNotificationMonthlyReader") + .dataSource(dataSource) + .sql(sql) + .fetchSize(1000) + .preparedStatementSetter(ps -> ps.setString(1, period)) + .rowMapper( + (rs, rowNum) -> + new UsageNotificationSource( + rs.getLong("sub_id"), + rs.getString("period"), + rs.getString("unit"), + rs.getLong("total_used_bytes"), + rs.getLong("allotment_amount"))) + .build(); + } +} diff --git a/src/main/java/com/project/rdb/batch/usagenotification/writer/UsageNotificationWriter.java b/src/main/java/com/project/rdb/batch/usagenotification/writer/UsageNotificationWriter.java new file mode 100644 index 0000000..5a980ab --- /dev/null +++ b/src/main/java/com/project/rdb/batch/usagenotification/writer/UsageNotificationWriter.java @@ -0,0 +1,59 @@ +package com.project.rdb.batch.usagenotification.writer; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.springframework.batch.item.Chunk; +import org.springframework.batch.item.ItemWriter; +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import com.project.rdb.batch.model.dto.UsageNotificationCandidate; + +import lombok.RequiredArgsConstructor; + +@Component +@RequiredArgsConstructor +public class UsageNotificationWriter implements ItemWriter { + + private final NamedParameterJdbcTemplate jdbcTemplate; + + @Transactional + @Override + public void write(Chunk chunk) { + + if (chunk.isEmpty()) { + return; + } + + String sql = + """ + INSERT INTO usage_notification_outbox + (sub_id, period, unit, threshold, percent, total_used_mb, allotment_mb, status, created_at) + VALUES + (:subId, :period, :unit, :threshold, :percent, :totalUsedMb, :allotmentMb, 'PENDING', NOW()) + ON CONFLICT (sub_id, period, unit, threshold) + DO NOTHING + """; + + List> paramList = + chunk.getItems().stream() + .map( + c -> { + Map map = new HashMap<>(); + map.put("subId", c.subId()); + map.put("period", c.period()); + map.put("unit", c.unit()); + map.put("threshold", c.threshold()); + map.put("percent", c.percent()); + map.put("totalUsedMb", c.totalUsedMb()); + map.put("allotmentMb", c.allotmentMb()); + return map; + }) + .toList(); + + jdbcTemplate.batchUpdate(sql, paramList.toArray(new Map[0])); + } +} From a6607f45e0a7d42fc9b2a062e1e7dde24d3d88bf Mon Sep 17 00:00:00 2001 From: andrew Date: Wed, 21 Jan 2026 17:47:52 +0900 Subject: [PATCH 11/19] =?UTF-8?q?UPLUS-105=20feat=20:=20=EC=82=AC=EC=9A=A9?= =?UTF-8?q?=EB=9F=89=20=EC=9E=84=EA=B3=84=EC=B9=98=20=EC=B4=88=EA=B3=BC=20?= =?UTF-8?q?=EA=B2=80=EC=A6=9D=20=EB=B0=B0=EC=B9=98=EC=97=90=20=ED=95=84?= =?UTF-8?q?=EC=9A=94=ED=95=9C=20Entity=20=EB=B0=8F=20Dto=20=EA=B5=AC?= =?UTF-8?q?=ED=98=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../model/dto/UsageNotificationCandidate.java | 10 +++++ .../model/dto/UsageNotificationEvent.java | 14 +++++++ .../model/dto/UsageNotificationSource.java | 4 ++ .../rdb/batch/model/entity/UsageSummary.java | 27 +++++++++++++ .../batch/model/entity/UsageSummaryId.java | 40 +++++++++++++++++++ .../rdb/batch/usageaggregate/config/Sqls.java | 38 ++++++++++++++++++ 6 files changed, 133 insertions(+) create mode 100644 src/main/java/com/project/rdb/batch/model/dto/UsageNotificationCandidate.java create mode 100644 src/main/java/com/project/rdb/batch/model/dto/UsageNotificationEvent.java create mode 100644 src/main/java/com/project/rdb/batch/model/dto/UsageNotificationSource.java create mode 100644 src/main/java/com/project/rdb/batch/model/entity/UsageSummary.java create mode 100644 src/main/java/com/project/rdb/batch/model/entity/UsageSummaryId.java create mode 100644 src/main/java/com/project/rdb/batch/usageaggregate/config/Sqls.java diff --git a/src/main/java/com/project/rdb/batch/model/dto/UsageNotificationCandidate.java b/src/main/java/com/project/rdb/batch/model/dto/UsageNotificationCandidate.java new file mode 100644 index 0000000..89c20b5 --- /dev/null +++ b/src/main/java/com/project/rdb/batch/model/dto/UsageNotificationCandidate.java @@ -0,0 +1,10 @@ +package com.project.rdb.batch.model.dto; + +public record UsageNotificationCandidate( + Long subId, + String period, + String unit, + int threshold, + int percent, + long totalUsedMb, + long allotmentMb) {} diff --git a/src/main/java/com/project/rdb/batch/model/dto/UsageNotificationEvent.java b/src/main/java/com/project/rdb/batch/model/dto/UsageNotificationEvent.java new file mode 100644 index 0000000..15fd16b --- /dev/null +++ b/src/main/java/com/project/rdb/batch/model/dto/UsageNotificationEvent.java @@ -0,0 +1,14 @@ +package com.project.rdb.batch.model.dto; + +import java.util.UUID; + +public record UsageNotificationEvent( + UUID eventId, + Long id, + Long subId, + String period, + String unit, + int threshold, + int percent, + long totalUsedMb, + long allotmentMb) {} diff --git a/src/main/java/com/project/rdb/batch/model/dto/UsageNotificationSource.java b/src/main/java/com/project/rdb/batch/model/dto/UsageNotificationSource.java new file mode 100644 index 0000000..1c7754c --- /dev/null +++ b/src/main/java/com/project/rdb/batch/model/dto/UsageNotificationSource.java @@ -0,0 +1,4 @@ +package com.project.rdb.batch.model.dto; + +public record UsageNotificationSource( + Long subId, String period, String unit, long totalUsedBytes, long allotmentAmount) {} diff --git a/src/main/java/com/project/rdb/batch/model/entity/UsageSummary.java b/src/main/java/com/project/rdb/batch/model/entity/UsageSummary.java new file mode 100644 index 0000000..bcbfba8 --- /dev/null +++ b/src/main/java/com/project/rdb/batch/model/entity/UsageSummary.java @@ -0,0 +1,27 @@ +package com.project.rdb.batch.model.entity; + +import java.time.LocalDateTime; + +import jakarta.persistence.Column; +import jakarta.persistence.EmbeddedId; +import jakarta.persistence.Entity; +import jakarta.persistence.Table; + +import lombok.AccessLevel; +import lombok.Getter; +import lombok.NoArgsConstructor; + +@Entity +@Table(name = "usage_summary") +@Getter +@NoArgsConstructor(access = AccessLevel.PROTECTED) +public class UsageSummary { + + @EmbeddedId private UsageSummaryId id; + + @Column(nullable = false) + private long totalUsedBytes; + + @Column(nullable = false) + private LocalDateTime updatedAt; +} diff --git a/src/main/java/com/project/rdb/batch/model/entity/UsageSummaryId.java b/src/main/java/com/project/rdb/batch/model/entity/UsageSummaryId.java new file mode 100644 index 0000000..1e68ea9 --- /dev/null +++ b/src/main/java/com/project/rdb/batch/model/entity/UsageSummaryId.java @@ -0,0 +1,40 @@ +package com.project.rdb.batch.model.entity; + +import java.io.Serializable; +import java.util.Objects; + +import jakarta.persistence.Column; +import jakarta.persistence.Embeddable; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; + +@Embeddable +@Getter +@NoArgsConstructor +@AllArgsConstructor +public class UsageSummaryId implements Serializable { + + @Column(name = "sub_id") + private Long subId; + + @Column(length = 6) + private String period; + + @Override + public boolean equals(Object oj) { + if (this == oj) { + return true; + } + if (!(oj instanceof UsageSummaryId that)) { + return false; + } + return subId.equals(that.subId) && period.equals(that.period); + } + + @Override + public int hashCode() { + return Objects.hash(subId, period); + } +} diff --git a/src/main/java/com/project/rdb/batch/usageaggregate/config/Sqls.java b/src/main/java/com/project/rdb/batch/usageaggregate/config/Sqls.java new file mode 100644 index 0000000..9a0bbc3 --- /dev/null +++ b/src/main/java/com/project/rdb/batch/usageaggregate/config/Sqls.java @@ -0,0 +1,38 @@ +package com.project.rdb.batch.usageaggregate.config; + +public class Sqls { + + private Sqls() {} + + public static final String DAILY_UPSERT = + """ + INSERT INTO usage_summary_daily ( + sub_id, usage_date, total_used_bytes, updated_at + ) + VALUES ( + :subId, :period, :delta, NOW() + ) + ON CONFLICT (sub_id, usage_date) + DO UPDATE SET + total_used_bytes = + usage_summary_daily.total_used_bytes + + EXCLUDED.total_used_bytes, + updated_at = NOW() + """; + + public static final String MONTHLY_UPSERT = + """ + INSERT INTO usage_summary_monthly ( + sub_id, period, total_used_bytes, updated_at + ) + VALUES ( + :subId, :period, :delta, NOW() + ) + ON CONFLICT (sub_id, period) + DO UPDATE SET + total_used_bytes = + usage_summary_monthly.total_used_bytes + + EXCLUDED.total_used_bytes, + updated_at = NOW() + """; +} From ba0dcea986bb50eea689f2a5195e2fd73e062b37 Mon Sep 17 00:00:00 2001 From: andrew Date: Wed, 21 Jan 2026 17:48:45 +0900 Subject: [PATCH 12/19] =?UTF-8?q?UPLUS-105=20feat=20:=20=EC=9E=84=EA=B3=84?= =?UTF-8?q?=EC=B9=98=20=EC=B4=88=EA=B3=BC=20=EC=82=AC=EC=9A=A9=EC=9E=90=20?= =?UTF-8?q?Kafka=20=EC=9D=B4=EB=B2=A4=ED=8A=B8=20=EB=B0=9C=ED=96=89=20?= =?UTF-8?q?=EB=B0=B0=EC=B9=98=20=EB=A1=9C=EC=A7=81=20=EA=B5=AC=ED=98=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../config/NotificationSendJobConfig.java | 53 +++++++++ .../processor/NotificationSendProcessor.java | 32 ++++++ .../reader/NotificationSendReaderConfig.java | 53 +++++++++ .../writer/NotificationSendWriter.java | 108 ++++++++++++++++++ 4 files changed, 246 insertions(+) create mode 100644 src/main/java/com/project/rdb/batch/notificationsend/config/NotificationSendJobConfig.java create mode 100644 src/main/java/com/project/rdb/batch/notificationsend/processor/NotificationSendProcessor.java create mode 100644 src/main/java/com/project/rdb/batch/notificationsend/reader/NotificationSendReaderConfig.java create mode 100644 src/main/java/com/project/rdb/batch/notificationsend/writer/NotificationSendWriter.java diff --git a/src/main/java/com/project/rdb/batch/notificationsend/config/NotificationSendJobConfig.java b/src/main/java/com/project/rdb/batch/notificationsend/config/NotificationSendJobConfig.java new file mode 100644 index 0000000..0c735c1 --- /dev/null +++ b/src/main/java/com/project/rdb/batch/notificationsend/config/NotificationSendJobConfig.java @@ -0,0 +1,53 @@ +package com.project.rdb.batch.notificationsend.config; + +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.configuration.annotation.JobScope; +import org.springframework.batch.core.job.builder.JobBuilder; +import org.springframework.batch.core.launch.support.RunIdIncrementer; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.step.builder.StepBuilder; +import org.springframework.batch.item.database.JdbcCursorItemReader; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.transaction.PlatformTransactionManager; + +import com.project.rdb.batch.model.dto.UsageNotificationEvent; +import com.project.rdb.batch.model.dto.UsageNotificationOutboxRow; +import com.project.rdb.batch.notificationsend.processor.NotificationSendProcessor; +import com.project.rdb.batch.notificationsend.writer.NotificationSendWriter; + +import lombok.RequiredArgsConstructor; + +@Configuration +@RequiredArgsConstructor +public class NotificationSendJobConfig { + + private static final int CHUNK_SIZE = 5000; + + private final JobRepository jobRepository; + private final PlatformTransactionManager txManager; + + private final JdbcCursorItemReader outboxReader; + private final NotificationSendProcessor notificationSendProcessor; + private final NotificationSendWriter notificationSendWriter; + + @Bean + public Job notificationSendJob() { + return new JobBuilder("notificationSendJob", jobRepository) + .incrementer(new RunIdIncrementer()) + .start(notificationSendStep()) + .build(); + } + + @JobScope + @Bean + public Step notificationSendStep() { + return new StepBuilder("notificationSendStep", jobRepository) + .chunk(CHUNK_SIZE, txManager) + .reader(outboxReader) + .processor(notificationSendProcessor) + .writer(notificationSendWriter) + .build(); + } +} diff --git a/src/main/java/com/project/rdb/batch/notificationsend/processor/NotificationSendProcessor.java b/src/main/java/com/project/rdb/batch/notificationsend/processor/NotificationSendProcessor.java new file mode 100644 index 0000000..69a5b6d --- /dev/null +++ b/src/main/java/com/project/rdb/batch/notificationsend/processor/NotificationSendProcessor.java @@ -0,0 +1,32 @@ +package com.project.rdb.batch.notificationsend.processor; + +import java.util.UUID; + +import org.springframework.batch.item.ItemProcessor; +import org.springframework.stereotype.Component; + +import com.project.rdb.batch.model.dto.UsageNotificationEvent; +import com.project.rdb.batch.model.dto.UsageNotificationOutboxRow; + +import lombok.extern.slf4j.Slf4j; + +@Component +@Slf4j +public class NotificationSendProcessor + implements ItemProcessor { + + @Override + public UsageNotificationEvent process(UsageNotificationOutboxRow item) { + + return new UsageNotificationEvent( + UUID.randomUUID(), + item.id(), + item.subId(), + item.period(), + item.unit(), + item.threshold(), + item.percent(), + item.totalUsedMb(), + item.allotmentMb()); + } +} diff --git a/src/main/java/com/project/rdb/batch/notificationsend/reader/NotificationSendReaderConfig.java b/src/main/java/com/project/rdb/batch/notificationsend/reader/NotificationSendReaderConfig.java new file mode 100644 index 0000000..d1bcdb6 --- /dev/null +++ b/src/main/java/com/project/rdb/batch/notificationsend/reader/NotificationSendReaderConfig.java @@ -0,0 +1,53 @@ +package com.project.rdb.batch.notificationsend.reader; + +import javax.sql.DataSource; + +import org.springframework.batch.item.database.JdbcCursorItemReader; +import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import com.project.rdb.batch.model.dto.UsageNotificationOutboxRow; + +@Configuration +public class NotificationSendReaderConfig { + + @Bean(name = "notificationSendOutboxReader") + public JdbcCursorItemReader notificationSendOutboxReader( + DataSource dataSource) { + + String sql = + """ + SELECT + id, + sub_id, + period, + unit, + threshold, + percent, + total_used_mb, + allotment_mb + FROM usage_notification_outbox + WHERE status = 'PENDING' + ORDER BY id + """; + + return new JdbcCursorItemReaderBuilder() + .name("notificationSendOutboxReader") + .dataSource(dataSource) + .sql(sql) + .fetchSize(1_000) + .rowMapper( + (rs, rowNum) -> + new UsageNotificationOutboxRow( + rs.getLong("id"), + rs.getLong("sub_id"), + rs.getString("period"), + rs.getString("unit"), + rs.getInt("threshold"), + rs.getInt("percent"), + rs.getLong("total_used_mb"), + rs.getLong("allotment_mb"))) + .build(); + } +} diff --git a/src/main/java/com/project/rdb/batch/notificationsend/writer/NotificationSendWriter.java b/src/main/java/com/project/rdb/batch/notificationsend/writer/NotificationSendWriter.java new file mode 100644 index 0000000..0258adb --- /dev/null +++ b/src/main/java/com/project/rdb/batch/notificationsend/writer/NotificationSendWriter.java @@ -0,0 +1,108 @@ +package com.project.rdb.batch.notificationsend.writer; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +import org.springframework.batch.item.Chunk; +import org.springframework.batch.item.ItemWriter; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; +import org.springframework.stereotype.Component; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.project.rdb.batch.model.dto.NotificationSendTask; +import com.project.rdb.batch.model.dto.UsageNotificationEvent; +import com.project.rdb.batch.model.repository.UsageNotificationOutboxRepository; + +import lombok.RequiredArgsConstructor; + +@Component +@RequiredArgsConstructor +public class NotificationSendWriter implements ItemWriter { + + private final KafkaTemplate kafkaTemplate; + private final UsageNotificationOutboxRepository repository; + private final ObjectMapper objectMapper; + private static final int MAX_FAILURE_REASON_LENGTH = 255; + + @Override + public void write(Chunk items) { + + // usageNotificationOutboxId 추출 + List ids = items.getItems().stream().map(UsageNotificationEvent::id).toList(); + + // PROCESSING Status 먼저 DB 반영 + repository.markProcessing(ids); + + // Kafka 비동기 발행 + List tasks = new ArrayList<>(); + + Map failedReasons = new HashMap<>(); + + for (UsageNotificationEvent event : items) { + try { + String payload = objectMapper.writeValueAsString(event); + + CompletableFuture> future = + kafkaTemplate.send("notification-usage", event.subId().toString(), payload); + + tasks.add(new NotificationSendTask(event, future)); + } catch (Exception e) { + failedReasons.put(event.id(), safeFailureReason(e)); + } + } + + // Chunk 단위 ACK 동기화 + List successIds = new ArrayList<>(); + + for (NotificationSendTask task : tasks) { + try { + task.future().join(); + successIds.add(task.event().id()); + } catch (Exception ex) { + Throwable cause = ex.getCause() != null ? ex.getCause() : ex; + + failedReasons.put(task.event().id(), safeFailureReason(cause)); + } + } + + // 최종 상태 DB 반영 + if (!successIds.isEmpty()) { + repository.markSent(successIds); + } + + if (!failedReasons.isEmpty()) { + repository.markFailedWithReasons(failedReasons); + } + } + + private String safeFailureReason(Throwable ex) { + if (ex == null) { + return "UNKNOWN_ERROR"; + } + + Throwable root = ex; + while (root.getCause() != null) { + root = root.getCause(); + } + + String exceptionName = root.getClass().getSimpleName(); + String message = root.getMessage(); + + String result; + if (message == null || message.isBlank()) { + result = exceptionName; + } else { + result = exceptionName + ": " + message; + } + + if (result.length() > MAX_FAILURE_REASON_LENGTH) { + result = result.substring(0, MAX_FAILURE_REASON_LENGTH); + } + + return result; + } +} From 316ef4a90404ba8b726c94a0b56c216b22770347 Mon Sep 17 00:00:00 2001 From: andrew Date: Wed, 21 Jan 2026 17:49:06 +0900 Subject: [PATCH 13/19] =?UTF-8?q?UPLUS-105=20feat=20:=20=EC=9E=84=EA=B3=84?= =?UTF-8?q?=EC=B9=98=20=EC=B4=88=EA=B3=BC=20=EC=82=AC=EC=9A=A9=EC=9E=90=20?= =?UTF-8?q?Kafka=20=EC=9D=B4=EB=B2=A4=ED=8A=B8=20=EB=B0=9C=ED=96=89=20?= =?UTF-8?q?=EB=B0=B0=EC=B9=98=EC=97=90=20=ED=95=84=EC=9A=94=ED=95=9C=20Ent?= =?UTF-8?q?ity=20=EB=B0=8F=20Dto=20=EA=B5=AC=ED=98=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../batch/model/dto/NotificationSendTask.java | 8 +++ .../model/dto/UsageNotificationOutboxRow.java | 11 ++++ .../rdb/batch/model/entity/OutboxStatus.java | 8 +++ .../model/entity/UsageNotificationOutbox.java | 61 +++++++++++++++++++ .../UsageNotificationOutboxRepository.java | 58 ++++++++++++++++++ 5 files changed, 146 insertions(+) create mode 100644 src/main/java/com/project/rdb/batch/model/dto/NotificationSendTask.java create mode 100644 src/main/java/com/project/rdb/batch/model/dto/UsageNotificationOutboxRow.java create mode 100644 src/main/java/com/project/rdb/batch/model/entity/OutboxStatus.java create mode 100644 src/main/java/com/project/rdb/batch/model/entity/UsageNotificationOutbox.java create mode 100644 src/main/java/com/project/rdb/batch/model/repository/UsageNotificationOutboxRepository.java diff --git a/src/main/java/com/project/rdb/batch/model/dto/NotificationSendTask.java b/src/main/java/com/project/rdb/batch/model/dto/NotificationSendTask.java new file mode 100644 index 0000000..c4a6e6b --- /dev/null +++ b/src/main/java/com/project/rdb/batch/model/dto/NotificationSendTask.java @@ -0,0 +1,8 @@ +package com.project.rdb.batch.model.dto; + +import java.util.concurrent.CompletableFuture; + +import org.springframework.kafka.support.SendResult; + +public record NotificationSendTask( + UsageNotificationEvent event, CompletableFuture> future) {} diff --git a/src/main/java/com/project/rdb/batch/model/dto/UsageNotificationOutboxRow.java b/src/main/java/com/project/rdb/batch/model/dto/UsageNotificationOutboxRow.java new file mode 100644 index 0000000..3dd49c3 --- /dev/null +++ b/src/main/java/com/project/rdb/batch/model/dto/UsageNotificationOutboxRow.java @@ -0,0 +1,11 @@ +package com.project.rdb.batch.model.dto; + +public record UsageNotificationOutboxRow( + Long id, + Long subId, + String period, + String unit, + int threshold, + int percent, + long totalUsedMb, + long allotmentMb) {} diff --git a/src/main/java/com/project/rdb/batch/model/entity/OutboxStatus.java b/src/main/java/com/project/rdb/batch/model/entity/OutboxStatus.java new file mode 100644 index 0000000..3106464 --- /dev/null +++ b/src/main/java/com/project/rdb/batch/model/entity/OutboxStatus.java @@ -0,0 +1,8 @@ +package com.project.rdb.batch.model.entity; + +public enum OutboxStatus { + PENDING, + SENT, + PROCESSING, + FAILED +} diff --git a/src/main/java/com/project/rdb/batch/model/entity/UsageNotificationOutbox.java b/src/main/java/com/project/rdb/batch/model/entity/UsageNotificationOutbox.java new file mode 100644 index 0000000..0aa412a --- /dev/null +++ b/src/main/java/com/project/rdb/batch/model/entity/UsageNotificationOutbox.java @@ -0,0 +1,61 @@ +package com.project.rdb.batch.model.entity; + +import java.time.LocalDateTime; + +import jakarta.persistence.Column; +import jakarta.persistence.Entity; +import jakarta.persistence.EnumType; +import jakarta.persistence.Enumerated; +import jakarta.persistence.GeneratedValue; +import jakarta.persistence.GenerationType; +import jakarta.persistence.Id; +import jakarta.persistence.Table; + +import lombok.AccessLevel; +import lombok.Getter; +import lombok.NoArgsConstructor; + +@Entity +@Table(name = "usage_notification_outbox") +@Getter +@NoArgsConstructor(access = AccessLevel.PROTECTED) +public class UsageNotificationOutbox { + + @Id + @GeneratedValue(strategy = GenerationType.IDENTITY) + private Long id; + + @Column(name = "sub_id", nullable = false) + private Long subId; + + @Column(nullable = false, length = 8) + private String period; + + @Column(nullable = false, length = 10) + private String unit; + + @Column(nullable = false) + private int threshold; + + @Column(nullable = false) + private int percent; + + @Column(name = "total_used_mb", nullable = false) + private Long totalUsedMb; + + @Column(name = "allotment_mb", nullable = false) + private Long allotmentMb; + + @Enumerated(EnumType.STRING) + @Column(nullable = false, length = 10) + private OutboxStatus status; + + @Column(name = "created_at", nullable = false) + private LocalDateTime createdAt; + + @Column(name = "sent_at") + private LocalDateTime sentAt; + + @Column(name = "failure_reason") + private String failureReason; +} diff --git a/src/main/java/com/project/rdb/batch/model/repository/UsageNotificationOutboxRepository.java b/src/main/java/com/project/rdb/batch/model/repository/UsageNotificationOutboxRepository.java new file mode 100644 index 0000000..acb535f --- /dev/null +++ b/src/main/java/com/project/rdb/batch/model/repository/UsageNotificationOutboxRepository.java @@ -0,0 +1,58 @@ +package com.project.rdb.batch.model.repository; + +import java.util.List; +import java.util.Map; + +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Repository; + +import lombok.RequiredArgsConstructor; + +@Repository +@RequiredArgsConstructor +public class UsageNotificationOutboxRepository { + + private final JdbcTemplate jdbcTemplate; + + public void markProcessing(List ids) { + jdbcTemplate.batchUpdate( + "UPDATE usage_notification_outbox SET status = 'PROCESSING' WHERE id = ?", + ids, + ids.size(), + (ps, id) -> ps.setLong(1, id)); + } + + public void markSent(List ids) { + jdbcTemplate.batchUpdate( + "UPDATE usage_notification_outbox SET status = 'SENT', sent_at = now() WHERE id =" + + " ?", + ids, + ids.size(), + (ps, id) -> ps.setLong(1, id)); + } + + public void markFailed(List ids, String reason) { + jdbcTemplate.batchUpdate( + "UPDATE usage_notification_outbox SET status = 'FAILED', failure_reason = ? WHERE" + + " id = ?", + ids, + ids.size(), + (ps, id) -> { + ps.setString(1, reason); + ps.setLong(2, id); + }); + } + + public void markFailedWithReasons(Map reasons) { + jdbcTemplate.batchUpdate( + "UPDATE usage_notification_outbox " + + "SET status = 'FAILED', failure_reason = ? " + + "WHERE id = ?", + reasons.entrySet(), + reasons.size(), + (ps, entry) -> { + ps.setString(1, entry.getValue()); + ps.setLong(2, entry.getKey()); + }); + } +} From 5c98e607e86291441c350839876ba44a484e91f7 Mon Sep 17 00:00:00 2001 From: andrew Date: Wed, 21 Jan 2026 17:50:37 +0900 Subject: [PATCH 14/19] =?UTF-8?q?UPLUS-105=20feat=20:=20=EC=9E=84=EA=B3=84?= =?UTF-8?q?=EC=B9=98=20=EC=B4=88=EA=B3=BC=20=EC=82=AC=EC=9A=A9=EC=9E=90=20?= =?UTF-8?q?Kafka=20Consumer=20=EB=A1=9C=EC=A7=81=20=EA=B5=AC=ED=98=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../consumer/NotificationSendConsumer.java | 48 +++++++++++++++++ .../NotificationSendDedupService.java | 22 ++++++++ .../kafka/consumer/UsageEventConsumer.java | 52 +++++++++++++++++++ 3 files changed, 122 insertions(+) create mode 100644 src/main/java/com/project/rdb/kafka/consumer/NotificationSendConsumer.java create mode 100644 src/main/java/com/project/rdb/kafka/consumer/NotificationSendDedupService.java create mode 100644 src/main/java/com/project/rdb/kafka/consumer/UsageEventConsumer.java diff --git a/src/main/java/com/project/rdb/kafka/consumer/NotificationSendConsumer.java b/src/main/java/com/project/rdb/kafka/consumer/NotificationSendConsumer.java new file mode 100644 index 0000000..a613d55 --- /dev/null +++ b/src/main/java/com/project/rdb/kafka/consumer/NotificationSendConsumer.java @@ -0,0 +1,48 @@ +package com.project.rdb.kafka.consumer; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.stereotype.Component; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.project.rdb.batch.model.dto.UsageNotificationEvent; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Component +@RequiredArgsConstructor +public class NotificationSendConsumer { + + private final NotificationSendDedupService dedupService; + private final ObjectMapper objectMapper; + + @KafkaListener( + topics = "notification-usage", + containerFactory = "kafkaListenerContainerFactory") + public void consume(ConsumerRecord record, Acknowledgment ack) { + log.info("🔥 CONSUME START offset={}, value={}", record.offset(), record.value()); + + try { + UsageNotificationEvent event = + objectMapper.readValue(record.value(), UsageNotificationEvent.class); + + String eventId = event.eventId().toString(); + + if (!dedupService.tryAcquire(eventId)) { + log.info("[SKIP] duplicated eventId={}", eventId); + ack.acknowledge(); + return; + } + + log.info("[SUCCESS] finished eventId={}", eventId); + + ack.acknowledge(); + } catch (Exception e) { + log.error("[CONSUME FAIL]", e); + ack.acknowledge(); // 지금 구조상 스킵이 맞음 + } + } +} diff --git a/src/main/java/com/project/rdb/kafka/consumer/NotificationSendDedupService.java b/src/main/java/com/project/rdb/kafka/consumer/NotificationSendDedupService.java new file mode 100644 index 0000000..7d7b95c --- /dev/null +++ b/src/main/java/com/project/rdb/kafka/consumer/NotificationSendDedupService.java @@ -0,0 +1,22 @@ +package com.project.rdb.kafka.consumer; + +import java.time.Duration; + +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Service; + +import lombok.RequiredArgsConstructor; + +@Service +@RequiredArgsConstructor +public class NotificationSendDedupService { + + private final StringRedisTemplate redisTemplate; + private static final Duration TTL = Duration.ofDays(7); + + public boolean tryAcquire(String eventId) { + Boolean success = + redisTemplate.opsForValue().setIfAbsent("notification:event:" + eventId, "1", TTL); + return Boolean.TRUE.equals(success); + } +} diff --git a/src/main/java/com/project/rdb/kafka/consumer/UsageEventConsumer.java b/src/main/java/com/project/rdb/kafka/consumer/UsageEventConsumer.java new file mode 100644 index 0000000..0fc6366 --- /dev/null +++ b/src/main/java/com/project/rdb/kafka/consumer/UsageEventConsumer.java @@ -0,0 +1,52 @@ +package com.project.rdb.kafka.consumer; + +// @Profile("!batch") +// @Slf4j +// @Component +// @ConditionalOnProperty( +// name = "kafka.consumer.enabled", +// havingValue = "true" +// ) +// @RequiredArgsConstructor +// public class UsageEventConsumer { +// +// private final ObjectMapper objectMapper; +// private final UsageLogRepository usageLogRepository; +// +// @KafkaListener( +// id = "usage-log-consumer", +// topics = "usage-data", +// groupId = "usage-log-consumer", +// containerFactory = "batchKafkaListenerContainerFactory" +// ) +// public void consume(List> records, Acknowledgment ack) { +// if (records == null || records.isEmpty()) { +// ack.acknowledge(); +// return; +// } +// +// try { +// for (ConsumerRecord rec : records) { +// UsageEventSchema e = objectMapper.readValue(rec.value(), UsageEventSchema.class); +// +// UsageLog row = UsageLog.builder() +// .eventId(e.eventId()) +// .subId(e.subscriptionId()) +// .usedBytes(e.usageBytes()) +// .eventTime(LocalDateTime.parse(e.timeStamp())) +// .build(); +// +// try { +// usageLogRepository.save(row); +// } catch (DataIntegrityViolationException dup) { +// log.error("Duplicate record: {}", dup); +// } +// } +// +// ack.acknowledge(); +// } catch (Exception ex) { +// log.error("usage-log batch failed", ex); +// throw new ApplicationException(GlobalErrorCode.USAGE_LOG_BATCH_FAILED); +// } +// } +// } From 208f44f2a7490e711922811ec50704e95dfe396f Mon Sep 17 00:00:00 2001 From: andrew Date: Wed, 21 Jan 2026 17:50:52 +0900 Subject: [PATCH 15/19] =?UTF-8?q?UPLUS-105=20feat=20:=20=EB=B0=B0=EC=B9=98?= =?UTF-8?q?=20Job=20=EC=8B=A4=ED=96=89=ED=8C=8C=EC=9D=BC=20=EC=9E=84?= =?UTF-8?q?=EC=8B=9C=20=EA=B5=AC=ED=98=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/project/rdb/BatchJobRunner.java | 55 +++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 src/main/java/com/project/rdb/BatchJobRunner.java diff --git a/src/main/java/com/project/rdb/BatchJobRunner.java b/src/main/java/com/project/rdb/BatchJobRunner.java new file mode 100644 index 0000000..ba14d2f --- /dev/null +++ b/src/main/java/com/project/rdb/BatchJobRunner.java @@ -0,0 +1,55 @@ +// package com.project.rdb; +// +// import org.springframework.batch.core.Job; +// import org.springframework.batch.core.JobParameters; +// import org.springframework.batch.core.JobParametersBuilder; +// import org.springframework.batch.core.launch.JobLauncher; +// import org.springframework.boot.CommandLineRunner; +// import org.springframework.stereotype.Component; +// +// import lombok.RequiredArgsConstructor; +// +// @Component +// @RequiredArgsConstructor +// public class BatchJobRunner implements CommandLineRunner { +// +// private final JobLauncher jobLauncher; +// private final Job usageAggregationJob; +// private final Job usageNotificationJob; +// private final Job notificationSendJob; +// +// ////// @Override +// ////// public void run(String... args) throws Exception { +// ////// JobParameters params = new JobParametersBuilder() +// ////// .addString("fromTime", "2025-12-01T00:00:00") +// ////// .addString("toTime", "2025-12-31T11:59:59") +// ////// .addLong("run.id", System.currentTimeMillis()) +// ////// .toJobParameters(); +// ////// +// ////// jobLauncher.run(usageAggregationJob, params); +// ////// } +// //// +// ////// @Override +// ////// public void run(String... args) throws Exception { +// ////// +// ////// JobParameters params = new JobParametersBuilder() +// ////// // 월 요금제 Step용 +// ////// .addString("period", "202512") +// ////// // 일 요금제 Step용 +// ////// .addString("usageDate", "20251201") +// ////// .addLong("run.id", System.currentTimeMillis()) +// ////// .toJobParameters(); +// ////// +// ////// jobLauncher.run(usageNotificationJob, params); +// ////// } +// // +// @Override +// public void run(String... args) throws Exception { +// JobParameters params = +// new JobParametersBuilder() +// .addLong("runAt", System.currentTimeMillis()) // 중복 실행 방지 +// .toJobParameters(); +// +// jobLauncher.run(notificationSendJob, params); +// } +// } From bc9f28f6dfd132b675820c540384469fcff98e43 Mon Sep 17 00:00:00 2001 From: andrew Date: Wed, 21 Jan 2026 17:51:57 +0900 Subject: [PATCH 16/19] =?UTF-8?q?UPLUS-105=20feat=20:=20=EB=B0=B0=EC=B9=98?= =?UTF-8?q?=20=EC=9E=91=EC=97=85=20=EA=B8=B0=EB=A1=9D=20=ED=85=8C=EC=9D=B4?= =?UTF-8?q?=EB=B8=94=20=EB=B0=8F=20=EB=A1=9C=EC=A7=81=20=EA=B5=AC=ED=98=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../batch/model/BatchStepMetricsListener.java | 83 +++++++++++++++++++ .../model/entity/BatchExecutionReport.java | 52 ++++++++++++ .../BatchExecutionReportRepository.java | 7 ++ 3 files changed, 142 insertions(+) create mode 100644 src/main/java/com/project/rdb/batch/model/BatchStepMetricsListener.java create mode 100644 src/main/java/com/project/rdb/batch/model/entity/BatchExecutionReport.java create mode 100644 src/main/java/com/project/rdb/batch/model/repository/BatchExecutionReportRepository.java diff --git a/src/main/java/com/project/rdb/batch/model/BatchStepMetricsListener.java b/src/main/java/com/project/rdb/batch/model/BatchStepMetricsListener.java new file mode 100644 index 0000000..48e15af --- /dev/null +++ b/src/main/java/com/project/rdb/batch/model/BatchStepMetricsListener.java @@ -0,0 +1,83 @@ +package com.project.rdb.batch.model; + +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.time.Duration; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.Objects; + +import org.springframework.batch.core.ExitStatus; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.StepExecution; +import org.springframework.batch.core.StepExecutionListener; +import org.springframework.stereotype.Component; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.project.rdb.batch.model.entity.BatchExecutionReport; +import com.project.rdb.batch.model.repository.BatchExecutionReportRepository; + +import lombok.RequiredArgsConstructor; + +@Component +@RequiredArgsConstructor +public class BatchStepMetricsListener implements StepExecutionListener { + private final BatchExecutionReportRepository reportRepository; + private final ObjectMapper objectMapper; + + @Override + public ExitStatus afterStep(StepExecution stepExecution) { + + JobExecution jobExecution = stepExecution.getJobExecution(); + + LocalDateTime start = + Objects.requireNonNull(stepExecution.getStartTime()) + .atZone(ZoneId.of("Asia/Seoul")) + .toLocalDateTime(); + + LocalDateTime end = + Objects.requireNonNull(stepExecution.getEndTime()) + .atZone(ZoneId.of("Asia/Seoul")) + .toLocalDateTime(); + + long durationMs = Duration.between(start, end).toMillis(); + + long readCount = stepExecution.getReadCount(); + BigDecimal tps = + durationMs > 0 + ? BigDecimal.valueOf(readCount) + .multiply(BigDecimal.valueOf(1000)) + .divide(BigDecimal.valueOf(durationMs), 2, RoundingMode.HALF_UP) + : BigDecimal.ZERO; + + String paramsJson; + try { + paramsJson = + objectMapper.writeValueAsString( + jobExecution.getJobParameters().getParameters()); + } catch (Exception e) { + paramsJson = "{}"; + } + + reportRepository.save( + BatchExecutionReport.builder() + .jobName(jobExecution.getJobInstance().getJobName()) + .stepName(stepExecution.getStepName()) + .status(stepExecution.getStatus().toString()) + .startedAt(start) + .endedAt(end) + .durationMs(durationMs) + .readCount(readCount) + .writeCount(stepExecution.getWriteCount()) + .filterCount(stepExecution.getFilterCount()) + .skipCount(stepExecution.getSkipCount()) + .commitCount(stepExecution.getCommitCount()) + .rollbackCount(stepExecution.getRollbackCount()) + .tps(tps) + .jobParameters(paramsJson) + .createdAt(LocalDateTime.now()) + .build()); + + return stepExecution.getExitStatus(); + } +} diff --git a/src/main/java/com/project/rdb/batch/model/entity/BatchExecutionReport.java b/src/main/java/com/project/rdb/batch/model/entity/BatchExecutionReport.java new file mode 100644 index 0000000..e2c4b36 --- /dev/null +++ b/src/main/java/com/project/rdb/batch/model/entity/BatchExecutionReport.java @@ -0,0 +1,52 @@ +package com.project.rdb.batch.model.entity; + +import java.math.BigDecimal; +import java.time.LocalDateTime; + +import jakarta.persistence.Column; +import jakarta.persistence.Entity; +import jakarta.persistence.GeneratedValue; +import jakarta.persistence.GenerationType; +import jakarta.persistence.Id; +import jakarta.persistence.Table; + +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; + +@Entity +@Table(name = "batch_execution_report") +@Getter +@NoArgsConstructor(access = AccessLevel.PROTECTED) +@AllArgsConstructor +@Builder +public class BatchExecutionReport { + @Id + @GeneratedValue(strategy = GenerationType.IDENTITY) + private Long id; + + private String jobName; + private String stepName; + + private String status; + + private LocalDateTime startedAt; + private LocalDateTime endedAt; + private long durationMs; + + private long readCount; + private long writeCount; + private long filterCount; + private long skipCount; + private long commitCount; + private long rollbackCount; + + private BigDecimal tps; + + @Column(columnDefinition = "jsonb") + private String jobParameters; + + private LocalDateTime createdAt; +} diff --git a/src/main/java/com/project/rdb/batch/model/repository/BatchExecutionReportRepository.java b/src/main/java/com/project/rdb/batch/model/repository/BatchExecutionReportRepository.java new file mode 100644 index 0000000..196f464 --- /dev/null +++ b/src/main/java/com/project/rdb/batch/model/repository/BatchExecutionReportRepository.java @@ -0,0 +1,7 @@ +package com.project.rdb.batch.model.repository; + +import org.springframework.data.jpa.repository.JpaRepository; + +import com.project.rdb.batch.model.entity.BatchExecutionReport; + +public interface BatchExecutionReportRepository extends JpaRepository {} From 4c2ff1c491f766146ceed9324a188232fd56bfdb Mon Sep 17 00:00:00 2001 From: andrew Date: Wed, 21 Jan 2026 17:54:36 +0900 Subject: [PATCH 17/19] =?UTF-8?q?UPLUS-105=20feat=20:=20=EC=95=88=EC=93=B0?= =?UTF-8?q?=EB=8A=94=20import=20=EC=A0=9C=EA=B1=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/project/global/config/KafkaConfig.java | 3 --- .../project/rdb/batch/model/dto/UsageDailyKey.java | 3 +-- .../project/rdb/batch/model/dto/UsageMonthlyKey.java | 3 +-- .../UsageNotificationOutboxRepository.java | 12 ------------ .../writer/UsageSummaryDailyWriter.java | 2 +- .../com/project/redis/consumer/UsageConsumer.java | 1 - .../project/redis/producer/PlanChangeProducer.java | 1 - .../com/project/redis/producer/UsageProducer.java | 1 - 8 files changed, 3 insertions(+), 23 deletions(-) diff --git a/src/main/java/com/project/global/config/KafkaConfig.java b/src/main/java/com/project/global/config/KafkaConfig.java index f96318e..47465c4 100644 --- a/src/main/java/com/project/global/config/KafkaConfig.java +++ b/src/main/java/com/project/global/config/KafkaConfig.java @@ -1,10 +1,7 @@ package com.project.global.config; -import java.util.Arrays; - import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.core.env.Environment; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; diff --git a/src/main/java/com/project/rdb/batch/model/dto/UsageDailyKey.java b/src/main/java/com/project/rdb/batch/model/dto/UsageDailyKey.java index 7e7f495..38de2ff 100644 --- a/src/main/java/com/project/rdb/batch/model/dto/UsageDailyKey.java +++ b/src/main/java/com/project/rdb/batch/model/dto/UsageDailyKey.java @@ -8,10 +8,9 @@ public boolean equals(Object oj) { if (this == oj) { return true; } - if (!(oj instanceof UsageDailyKey)) { + if (!(oj instanceof UsageDailyKey that)) { return false; } - UsageDailyKey that = (UsageDailyKey) oj; return Objects.equals(subId, that.subId) && Objects.equals(usageDate, that.usageDate); } } diff --git a/src/main/java/com/project/rdb/batch/model/dto/UsageMonthlyKey.java b/src/main/java/com/project/rdb/batch/model/dto/UsageMonthlyKey.java index a6ec50f..8089e34 100644 --- a/src/main/java/com/project/rdb/batch/model/dto/UsageMonthlyKey.java +++ b/src/main/java/com/project/rdb/batch/model/dto/UsageMonthlyKey.java @@ -9,10 +9,9 @@ public boolean equals(Object oj) { if (this == oj) { return true; } - if (!(oj instanceof UsageMonthlyKey)) { + if (!(oj instanceof UsageMonthlyKey that)) { return false; } - UsageMonthlyKey that = (UsageMonthlyKey) oj; return Objects.equals(subId, that.subId) && Objects.equals(period, that.period); } } diff --git a/src/main/java/com/project/rdb/batch/model/repository/UsageNotificationOutboxRepository.java b/src/main/java/com/project/rdb/batch/model/repository/UsageNotificationOutboxRepository.java index acb535f..c881e3f 100644 --- a/src/main/java/com/project/rdb/batch/model/repository/UsageNotificationOutboxRepository.java +++ b/src/main/java/com/project/rdb/batch/model/repository/UsageNotificationOutboxRepository.java @@ -31,18 +31,6 @@ public void markSent(List ids) { (ps, id) -> ps.setLong(1, id)); } - public void markFailed(List ids, String reason) { - jdbcTemplate.batchUpdate( - "UPDATE usage_notification_outbox SET status = 'FAILED', failure_reason = ? WHERE" - + " id = ?", - ids, - ids.size(), - (ps, id) -> { - ps.setString(1, reason); - ps.setLong(2, id); - }); - } - public void markFailedWithReasons(Map reasons) { jdbcTemplate.batchUpdate( "UPDATE usage_notification_outbox " diff --git a/src/main/java/com/project/rdb/batch/usageaggregate/writer/UsageSummaryDailyWriter.java b/src/main/java/com/project/rdb/batch/usageaggregate/writer/UsageSummaryDailyWriter.java index e33ad90..d291f0f 100644 --- a/src/main/java/com/project/rdb/batch/usageaggregate/writer/UsageSummaryDailyWriter.java +++ b/src/main/java/com/project/rdb/batch/usageaggregate/writer/UsageSummaryDailyWriter.java @@ -22,7 +22,7 @@ public class UsageSummaryDailyWriter implements ItemWriter chunk) throws Exception { + public void write(Chunk chunk) { Map aggregated = new HashMap<>(); for (UsageDailyAggregation item : chunk) { diff --git a/src/main/java/com/project/redis/consumer/UsageConsumer.java b/src/main/java/com/project/redis/consumer/UsageConsumer.java index fee4710..8eb1927 100644 --- a/src/main/java/com/project/redis/consumer/UsageConsumer.java +++ b/src/main/java/com/project/redis/consumer/UsageConsumer.java @@ -4,7 +4,6 @@ import java.util.List; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.springframework.context.annotation.Profile; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; diff --git a/src/main/java/com/project/redis/producer/PlanChangeProducer.java b/src/main/java/com/project/redis/producer/PlanChangeProducer.java index 15aabcc..f775574 100644 --- a/src/main/java/com/project/redis/producer/PlanChangeProducer.java +++ b/src/main/java/com/project/redis/producer/PlanChangeProducer.java @@ -3,7 +3,6 @@ import java.time.OffsetDateTime; import java.util.UUID; -import org.springframework.context.annotation.Profile; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; diff --git a/src/main/java/com/project/redis/producer/UsageProducer.java b/src/main/java/com/project/redis/producer/UsageProducer.java index c74cb13..8075288 100644 --- a/src/main/java/com/project/redis/producer/UsageProducer.java +++ b/src/main/java/com/project/redis/producer/UsageProducer.java @@ -3,7 +3,6 @@ import java.time.OffsetDateTime; import java.util.UUID; -import org.springframework.context.annotation.Profile; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; From 12c1183c61b9b33331c02d04c4a4f9f2a0e09264 Mon Sep 17 00:00:00 2001 From: andrew Date: Wed, 21 Jan 2026 17:55:24 +0900 Subject: [PATCH 18/19] =?UTF-8?q?UPLUS-105=20feat=20:=20spotless=20?= =?UTF-8?q?=EA=B2=80=EC=A6=9D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/project/redis/consumer/PlanChangeConsumer.java | 4 ++-- src/main/java/com/project/redis/consumer/UsageConsumer.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/project/redis/consumer/PlanChangeConsumer.java b/src/main/java/com/project/redis/consumer/PlanChangeConsumer.java index 5c34656..2e1df69 100644 --- a/src/main/java/com/project/redis/consumer/PlanChangeConsumer.java +++ b/src/main/java/com/project/redis/consumer/PlanChangeConsumer.java @@ -9,10 +9,10 @@ import org.springframework.stereotype.Component; import com.fasterxml.jackson.databind.ObjectMapper; -import com.project.redis.consumer.util.PlanChangeUtil; -import com.project.redis.consumer.util.RedisUtil; import com.project.global.exception.ApplicationException; import com.project.global.exception.code.domain.GlobalErrorCode; +import com.project.redis.consumer.util.PlanChangeUtil; +import com.project.redis.consumer.util.RedisUtil; import com.project.redis.producer.schema.CalculatedLimitSchema; import com.project.redis.producer.schema.PlanChangeSchema; diff --git a/src/main/java/com/project/redis/consumer/UsageConsumer.java b/src/main/java/com/project/redis/consumer/UsageConsumer.java index 8eb1927..fb6fc64 100644 --- a/src/main/java/com/project/redis/consumer/UsageConsumer.java +++ b/src/main/java/com/project/redis/consumer/UsageConsumer.java @@ -9,9 +9,9 @@ import org.springframework.stereotype.Component; import com.fasterxml.jackson.databind.ObjectMapper; -import com.project.redis.consumer.util.RedisUtil; import com.project.global.exception.ApplicationException; import com.project.global.exception.code.domain.GlobalErrorCode; +import com.project.redis.consumer.util.RedisUtil; import com.project.redis.producer.NotificationProducer; import com.project.redis.producer.schema.UsageEventSchema; From 59322c71d3fdc03439381a65bd5e2f163bffb083 Mon Sep 17 00:00:00 2001 From: andrew Date: Wed, 21 Jan 2026 18:33:24 +0900 Subject: [PATCH 19/19] =?UTF-8?q?UPLUS-105=20feat=20:=20=ED=99=98=EA=B2=BD?= =?UTF-8?q?=EB=B3=80=EC=88=98=20=EC=A0=81=EC=9A=A9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/resources/application.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index ddd3459..22864ee 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -37,15 +37,15 @@ spring: url: redis://${REDIS_USERNAME:default}:${REDIS_PASSWORD:}@${REDIS_ENDPOINT:localhost:6379} kafka: - bootstrap-servers: pkc-oxqxx9.us-east-1.aws.confluent.cloud:9092 + bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS} properties: security.protocol: SASL_SSL sasl.mechanism: PLAIN sasl.jaas.config: > org.apache.kafka.common.security.plain.PlainLoginModule required - username='EPO6DS6OXW7GYVEE' - password='cfltJj+U/TvM2TfzfqxpXIu5xDE/bPGe5sfIff5mrLZ9Usx4K9LudKBWPLIGG3QQ'; + username='${KAFKA_API_KEY}' + password='${KAFKA_API_SECRET}'; producer: retries: 2147483647