diff --git a/gradlew b/gradlew old mode 100644 new mode 100755 diff --git a/src/main/java/com/project/consumer/PlanChangeConsumer.java b/src/main/java/com/project/consumer/PlanChangeConsumer.java new file mode 100644 index 0000000..8051414 --- /dev/null +++ b/src/main/java/com/project/consumer/PlanChangeConsumer.java @@ -0,0 +1,53 @@ +package com.project.consumer; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.stereotype.Component; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.project.consumer.util.PlanChangeUtil; +import com.project.consumer.util.RedisUtil; +import com.project.global.exception.ApplicationException; +import com.project.global.exception.code.domain.GlobalErrorCode; +import com.project.producer.schema.CalculatedLimitSchema; +import com.project.producer.schema.PlanChangeSchema; + +import lombok.RequiredArgsConstructor; + +@Component +@RequiredArgsConstructor +public class PlanChangeConsumer { + + private final ObjectMapper objectMapper; + private final PlanChangeUtil planChangeUtil; + private final RedisUtil redisUtil; + + @KafkaListener( + id = "plan-change-consumer", + topics = "change_plan", + groupId = "plan-change-consumer", + containerFactory = "batchKafkaListenerContainerFactory") + public void consume(List> records, Acknowledgment ack) { + if (records == null || records.isEmpty()) { + ack.acknowledge(); + return; + } + List events = new ArrayList<>(records.size()); + try { + for (ConsumerRecord rec : records) { + events.add(objectMapper.readValue(rec.value(), PlanChangeSchema.class)); + } + + List limits = planChangeUtil.calculate(events); + redisUtil.writePlanChangeBatch(limits); + + ack.acknowledge(); + } catch (Exception e) { + throw new ApplicationException(GlobalErrorCode.PLAN_CHANGE_EVENT_PRODUCE_INVALID); + } + } +} diff --git a/src/main/java/com/project/consumer/UsageConsumer.java b/src/main/java/com/project/consumer/UsageConsumer.java new file mode 100644 index 0000000..f8291e0 --- /dev/null +++ b/src/main/java/com/project/consumer/UsageConsumer.java @@ -0,0 +1,67 @@ +package com.project.consumer; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.stereotype.Component; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.project.consumer.util.RedisUtil; +import com.project.global.exception.ApplicationException; +import com.project.global.exception.code.domain.GlobalErrorCode; +import com.project.producer.NotificationProducer; +import com.project.producer.schema.UsageEventSchema; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Component +@RequiredArgsConstructor +public class UsageConsumer { + + private final ObjectMapper objectMapper; + private final RedisUtil redisUtil; + private final NotificationProducer notificationProducer; + + @KafkaListener( + id = "usage-batch-consumer", + topics = "usage-data", + groupId = "usage-consumer", + containerFactory = "batchKafkaListenerContainerFactory") + public void consume(List> records, Acknowledgment ack) { + + if (records == null || records.isEmpty()) { + ack.acknowledge(); + return; + } + + List events = new ArrayList<>(records.size()); + + try { + + for (ConsumerRecord rec : records) { + UsageEventSchema schema = + objectMapper.readValue(rec.value(), UsageEventSchema.class); + events.add(schema); + } + + List notifications = redisUtil.applyUsageBatch(events); + + // 임계치 넘은 것만 notification_topic으로 발행 + for (String notification : notifications) { + notificationProducer.sendNotification(notification); + } + + // 성공한 배치 ack + ack.acknowledge(); + } catch (Exception e) { + log.error("usage batch failed", e); + // 여기서 ack 안 하면 같은 배치가 재시도됨 (at-least-once) + throw new ApplicationException(GlobalErrorCode.NOTIFICATION_EVENT_PRODUCE_INVALID); + } + } +} diff --git a/src/main/java/com/project/consumer/util/LuaScriptLoader.java b/src/main/java/com/project/consumer/util/LuaScriptLoader.java new file mode 100644 index 0000000..b3fdb1d --- /dev/null +++ b/src/main/java/com/project/consumer/util/LuaScriptLoader.java @@ -0,0 +1,29 @@ +package com.project.consumer.util; + +import java.io.InputStream; +import java.nio.charset.StandardCharsets; + +import com.project.global.exception.ApplicationException; +import com.project.global.exception.code.domain.GlobalErrorCode; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class LuaScriptLoader { + + public static String load(String path) { + try (InputStream is = LuaScriptLoader.class.getClassLoader().getResourceAsStream(path)) { + + if (is == null) { + throw new IllegalStateException("Lua script not found: " + path); + } + + return new String(is.readAllBytes(), StandardCharsets.UTF_8); + } catch (Exception e) { + log.error("Failed to load lua script"); + throw new ApplicationException(GlobalErrorCode.LUA_SCRIPT_LOAD_INVALID); + } + } + + private LuaScriptLoader() {} +} diff --git a/src/main/java/com/project/consumer/util/PlanChangeUtil.java b/src/main/java/com/project/consumer/util/PlanChangeUtil.java new file mode 100644 index 0000000..7e37603 --- /dev/null +++ b/src/main/java/com/project/consumer/util/PlanChangeUtil.java @@ -0,0 +1,108 @@ +package com.project.consumer.util; + +import static com.project.consumer.util.UsageTimeUtil.toYearMonth; + +import java.time.Duration; +import java.time.OffsetDateTime; +import java.time.YearMonth; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.List; + +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Service; + +import com.project.producer.schema.CalculatedLimitSchema; +import com.project.producer.schema.PlanChangeSchema; +import com.project.producer.test.PlanUnit; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Service +@RequiredArgsConstructor +public class PlanChangeUtil { + + private final StringRedisTemplate redisTemplate; + + public List calculate(List events) { + + List results = new ArrayList<>(); + + String yearMonth; + long finalLimit; + String processedKey; + Long added; + + for (PlanChangeSchema event : events) { + yearMonth = toYearMonth(event.changedAt().toString()); + + processedKey = "processed:plan:" + yearMonth + ":" + event.subscriptionId(); + added = redisTemplate.opsForSet().add(processedKey, event.eventId()); + + redisTemplate.expire( + processedKey, + Duration.ofSeconds( + UsageTimeUtil.ttlToNextMonthWithBufferSec( + event.changedAt().toString(), 2))); + + if (added == null || added == 0L) { + // 이미 처리된 요금제 변경 이벤트 → skip + continue; + } + + String unitKey = "plan:unit:" + event.subscriptionId(); + String prevUnit = redisTemplate.opsForValue().get(unitKey); + + if (event.unit().equals(PlanUnit.ULTIMATE)) { + finalLimit = -1L; + } else if (event.unit().equals(PlanUnit.DAY)) { + finalLimit = event.allowanceAmount(); + } else { + finalLimit = getMonthFinalLimit(yearMonth, event, prevUnit); + } + + long ttlSec = + UsageTimeUtil.ttlToNextMonthWithBufferSec(event.changedAt().toString(), 2); + + results.add( + new CalculatedLimitSchema( + event.subscriptionId(), yearMonth, finalLimit, ttlSec, event.unit())); + } + + return results; + } + + private long getPreviousLimit(String key) { + String value = redisTemplate.opsForValue().get(key); + return value == null ? 0L : Long.parseLong(value); + } + + // 사용자가 월 기준 중도에 요금제를 변경했을 경우 사용 가능 데이터 집계 로직 + private long getMonthFinalLimit(String yearMonth, PlanChangeSchema event, String prevUnit) { + OffsetDateTime kstTime = + event.changedAt().atZoneSameInstant(ZoneId.of("Asia/Seoul")).toOffsetDateTime(); + + YearMonth month = YearMonth.from(kstTime); + int totalDays = month.lengthOfMonth(); + int changeDay = kstTime.getDayOfMonth(); + + int daysBefore = changeDay - 1; + int daysAfter = totalDays - daysBefore; + + // 만약 전 요금제가 무제한 or DAY 요금제 였다면 새로 추가된 요금제의 데이터 양만 계산 + if (!"MONTH".equals(prevUnit)) { + return event.allowanceAmount() * daysAfter / totalDays; + } + + String limitKey = "limit:" + yearMonth + ":" + event.subscriptionId(); + + long prevLimit = getPreviousLimit(limitKey); + + long allowanceBefore = prevLimit * daysBefore / totalDays; + long allowanceAfter = event.allowanceAmount() * daysAfter / totalDays; + + return allowanceBefore + allowanceAfter; + } +} diff --git a/src/main/java/com/project/consumer/util/RedisUtil.java b/src/main/java/com/project/consumer/util/RedisUtil.java new file mode 100644 index 0000000..5b1289d --- /dev/null +++ b/src/main/java/com/project/consumer/util/RedisUtil.java @@ -0,0 +1,90 @@ +package com.project.consumer.util; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.springframework.data.redis.connection.RedisStringCommands; +import org.springframework.data.redis.core.RedisCallback; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.data.redis.core.script.DefaultRedisScript; +import org.springframework.data.redis.core.types.Expiration; +import org.springframework.stereotype.Service; + +import com.project.producer.schema.CalculatedLimitSchema; +import com.project.producer.schema.UsageEventSchema; + +import lombok.RequiredArgsConstructor; + +@Service +@RequiredArgsConstructor +public class RedisUtil { + + private final StringRedisTemplate redisTemplate; + + private final DefaultRedisScript> script = + new DefaultRedisScript<>( + LuaScriptLoader.load("lua/usage_batch.lua"), + (Class>) (Class) List.class); + + public List applyUsageBatch(List events) { + if (events == null || events.isEmpty()) { + return List.of(); + } + + List args = new ArrayList<>(); + args.add(String.valueOf(events.size())); + + for (UsageEventSchema e : events) { + + args.add(String.valueOf(e.subscriptionId())); + args.add(e.eventId()); + args.add(String.valueOf(e.usageBytes())); + args.add(e.timeStamp()); + } + + Object result = redisTemplate.execute(script, Collections.emptyList(), args.toArray()); + return result == null ? List.of() : (List) result; + } + + public void writePlanChangeBatch(List limits) { + redisTemplate.executePipelined( + (RedisCallback) + connection -> { + for (CalculatedLimitSchema limit : limits) { + String ke = + "limit:" + limit.yearMonth() + ":" + limit.subscriptionId(); + + byte[] key = redisTemplate.getStringSerializer().serialize(ke); + byte[] value = + redisTemplate + .getStringSerializer() + .serialize(String.valueOf(limit.limit())); + + connection + .stringCommands() + .set( + key, + value, + Expiration.seconds(limit.ttlSec()), + RedisStringCommands.SetOption.UPSERT); + + String unitKey = "plan:unit:" + limit.subscriptionId(); + byte[] uk = redisTemplate.getStringSerializer().serialize(unitKey); + byte[] uv = + redisTemplate + .getStringSerializer() + .serialize(limit.unit().name()); + + connection + .stringCommands() + .set( + uk, + uv, + Expiration.seconds(limit.ttlSec()), + RedisStringCommands.SetOption.UPSERT); + } + return null; + }); + } +} diff --git a/src/main/java/com/project/consumer/util/UsageTimeUtil.java b/src/main/java/com/project/consumer/util/UsageTimeUtil.java new file mode 100644 index 0000000..27d149c --- /dev/null +++ b/src/main/java/com/project/consumer/util/UsageTimeUtil.java @@ -0,0 +1,30 @@ +package com.project.consumer.util; + +import java.time.Duration; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.format.DateTimeFormatter; + +public class UsageTimeUtil { + + private static final DateTimeFormatter ISO = DateTimeFormatter.ISO_OFFSET_DATE_TIME; + + private UsageTimeUtil() {} + + public static String toYearMonth(String isoTs) { + OffsetDateTime odt = OffsetDateTime.parse(isoTs, ISO); + return odt.format(DateTimeFormatter.ofPattern("yyyyMM")); + } + + public static long ttlToNextMonthWithBufferSec(String isoTs, int bufferDays) { + OffsetDateTime now = OffsetDateTime.parse(isoTs, ISO); + + OffsetDateTime nextMonthStart = + now.withDayOfMonth(1).with(LocalTime.MIDNIGHT).plusMonths(1); + + Duration base = Duration.between(now, nextMonthStart); + Duration buffer = Duration.ofDays(bufferDays); + + return Math.max(base.plus(buffer).getSeconds(), 3600); + } +} diff --git a/src/main/java/com/project/example/controller/ExampleController.java b/src/main/java/com/project/example/controller/ExampleController.java deleted file mode 100644 index 2966fc0..0000000 --- a/src/main/java/com/project/example/controller/ExampleController.java +++ /dev/null @@ -1,33 +0,0 @@ -package com.project.example.controller; - -import org.springframework.http.ResponseEntity; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.PathVariable; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestBody; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; - -import com.project.example.controller.dto.SaveExampleRequest; -import com.project.example.infra.entity.ExampleEntity; -import com.project.example.service.ExampleService; - -import lombok.RequiredArgsConstructor; - -@RestController -@RequestMapping("/example") -@RequiredArgsConstructor -public class ExampleController { - - private final ExampleService exampleService; - - @GetMapping("/{exampleId}") - public ResponseEntity find(@PathVariable Long exampleId) { - return ResponseEntity.ok(exampleService.find(exampleId)); - } - - @PostMapping - public void save(@RequestBody SaveExampleRequest request) { - exampleService.save(request); - } -} diff --git a/src/main/java/com/project/example/controller/dto/SaveExampleRequest.java b/src/main/java/com/project/example/controller/dto/SaveExampleRequest.java deleted file mode 100644 index 9dc4e29..0000000 --- a/src/main/java/com/project/example/controller/dto/SaveExampleRequest.java +++ /dev/null @@ -1,3 +0,0 @@ -package com.project.example.controller.dto; - -public record SaveExampleRequest(String exampleName, String exampleContent) {} diff --git a/src/main/java/com/project/example/infra/entity/ExampleEntity.java b/src/main/java/com/project/example/infra/entity/ExampleEntity.java deleted file mode 100644 index 7f6bbb2..0000000 --- a/src/main/java/com/project/example/infra/entity/ExampleEntity.java +++ /dev/null @@ -1,33 +0,0 @@ -package com.project.example.infra.entity; - -import jakarta.persistence.Entity; -import jakarta.persistence.GeneratedValue; -import jakarta.persistence.Id; -import jakarta.persistence.Table; - -import com.project.example.controller.dto.SaveExampleRequest; - -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.NoArgsConstructor; - -@Entity -@Table(name = "EXAMPLE") -@Builder -@NoArgsConstructor -@AllArgsConstructor -public class ExampleEntity { - - @Id @GeneratedValue private Long exampleId; - - private String exampleName; - - private String exampleContent; - - public static ExampleEntity create(SaveExampleRequest request) { - return ExampleEntity.builder() - .exampleName(request.exampleName()) - .exampleContent(request.exampleContent()) - .build(); - } -} diff --git a/src/main/java/com/project/example/infra/repository/ExampleJpaRepository.java b/src/main/java/com/project/example/infra/repository/ExampleJpaRepository.java deleted file mode 100644 index 36cfb05..0000000 --- a/src/main/java/com/project/example/infra/repository/ExampleJpaRepository.java +++ /dev/null @@ -1,7 +0,0 @@ -package com.project.example.infra.repository; - -import org.springframework.data.jpa.repository.JpaRepository; - -import com.project.example.infra.entity.ExampleEntity; - -public interface ExampleJpaRepository extends JpaRepository {} diff --git a/src/main/java/com/project/example/infra/repository/ExampleRepository.java b/src/main/java/com/project/example/infra/repository/ExampleRepository.java deleted file mode 100644 index 9bc2fcb..0000000 --- a/src/main/java/com/project/example/infra/repository/ExampleRepository.java +++ /dev/null @@ -1,10 +0,0 @@ -package com.project.example.infra.repository; - -import com.project.example.infra.entity.ExampleEntity; - -public interface ExampleRepository { - - ExampleEntity find(Long exampleId); - - void save(ExampleEntity exampleEntity); -} diff --git a/src/main/java/com/project/example/infra/repository/ExampleRepositoryImpl.java b/src/main/java/com/project/example/infra/repository/ExampleRepositoryImpl.java deleted file mode 100644 index cad16a1..0000000 --- a/src/main/java/com/project/example/infra/repository/ExampleRepositoryImpl.java +++ /dev/null @@ -1,26 +0,0 @@ -package com.project.example.infra.repository; - -import org.springframework.stereotype.Repository; - -import com.project.example.infra.entity.ExampleEntity; -import com.project.global.exception.ApplicationException; -import com.project.global.exception.code.domain.ExampleErrorCode; - -import lombok.RequiredArgsConstructor; - -@Repository -@RequiredArgsConstructor -public class ExampleRepositoryImpl implements ExampleRepository { - - private final ExampleJpaRepository exampleJpaRepository; - - public ExampleEntity find(Long exampleId) { - return exampleJpaRepository - .findById(exampleId) - .orElseThrow(() -> new ApplicationException(ExampleErrorCode.EXAMPLE_NOT_FOUND)); - } - - public void save(ExampleEntity example) { - exampleJpaRepository.save(example); - } -} diff --git a/src/main/java/com/project/example/service/ExampleService.java b/src/main/java/com/project/example/service/ExampleService.java deleted file mode 100644 index e16a063..0000000 --- a/src/main/java/com/project/example/service/ExampleService.java +++ /dev/null @@ -1,28 +0,0 @@ -package com.project.example.service; - -import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; - -import com.project.example.controller.dto.SaveExampleRequest; -import com.project.example.infra.entity.ExampleEntity; -import com.project.example.infra.repository.ExampleRepository; - -import lombok.RequiredArgsConstructor; - -@Service -@RequiredArgsConstructor -public class ExampleService { - - private final ExampleRepository exampleRepository; - - @Transactional - public ExampleEntity find(Long exampleId) { - return exampleRepository.find(exampleId); - } - - @Transactional - public void save(SaveExampleRequest request) { - ExampleEntity exampleEntity = ExampleEntity.create(request); - exampleRepository.save(exampleEntity); - } -} diff --git a/src/main/java/com/project/global/config/KafkaConfig.java b/src/main/java/com/project/global/config/KafkaConfig.java index 1558140..054d946 100644 --- a/src/main/java/com/project/global/config/KafkaConfig.java +++ b/src/main/java/com/project/global/config/KafkaConfig.java @@ -6,6 +6,9 @@ import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.listener.ContainerProperties; +import org.springframework.kafka.listener.DefaultErrorHandler; +import org.springframework.util.backoff.FixedBackOff; @Configuration @EnableKafka @@ -16,8 +19,35 @@ public KafkaListenerContainerFactory kafkaListenerContainerFactory( ConsumerFactory consumerFactory) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory); - factory.setAutoStartup(true); // 기본값이 true + + factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); + + return factory; + } + + @Bean + public ConcurrentKafkaListenerContainerFactory + batchKafkaListenerContainerFactory(ConsumerFactory consumerFactory) { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + + factory.setConsumerFactory(consumerFactory); + + factory.setBatchListener(true); + + factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); + + factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); + + DefaultErrorHandler errorHandler = + new DefaultErrorHandler( + new FixedBackOff(1000L, 9) // 총 10번 시도(초기+9) + ); + + factory.setCommonErrorHandler(errorHandler); + return factory; } } diff --git a/src/main/java/com/project/global/config/KafkaTestRunner.java b/src/main/java/com/project/global/config/KafkaTestRunner.java deleted file mode 100644 index 4e84472..0000000 --- a/src/main/java/com/project/global/config/KafkaTestRunner.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.project.global.config; - -import org.springframework.boot.CommandLineRunner; -import org.springframework.stereotype.Component; - -import lombok.RequiredArgsConstructor; - -@Component -@RequiredArgsConstructor -public class KafkaTestRunner implements CommandLineRunner { - - private final Producer producer; - - @Override - public void run(String... args) throws Exception { - producer.sendUsageMessage("key", "사용량 알림입니다"); - } -} diff --git a/src/main/java/com/project/global/config/Producer.java b/src/main/java/com/project/global/config/Producer.java deleted file mode 100644 index 0081bef..0000000 --- a/src/main/java/com/project/global/config/Producer.java +++ /dev/null @@ -1,21 +0,0 @@ -package com.project.global.config; - -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.stereotype.Component; - -import lombok.RequiredArgsConstructor; - -@Component -@RequiredArgsConstructor -public class Producer { - - private final KafkaTemplate kafkaTemplate; - - public void sendUsageMessage(String key, String value) { - kafkaTemplate.send("usage_topic", key, value); - } - - public void sendNotificationMessage(String key, String value) { - kafkaTemplate.send("notification_topic", key, value); - } -} diff --git a/src/main/java/com/project/global/config/UsageConsumer.java b/src/main/java/com/project/global/config/UsageConsumer.java deleted file mode 100644 index 76772c2..0000000 --- a/src/main/java/com/project/global/config/UsageConsumer.java +++ /dev/null @@ -1,19 +0,0 @@ -package com.project.global.config; - -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.stereotype.Component; - -import lombok.RequiredArgsConstructor; - -@Component -@RequiredArgsConstructor -public class UsageConsumer { - - private final Producer producer; - - @KafkaListener(id = "usageConsumer", topics = "usage_topic", groupId = "usage-consumer") - public void consume(String message) { - System.out.println("usage consumer received data : " + message); - producer.sendNotificationMessage("key", "데이터 사용량 누적 임계치 경고 알림입니다"); - } -} diff --git a/src/main/java/com/project/global/exception/code/domain/GlobalErrorCode.java b/src/main/java/com/project/global/exception/code/domain/GlobalErrorCode.java index 07945f5..05d65b5 100644 --- a/src/main/java/com/project/global/exception/code/domain/GlobalErrorCode.java +++ b/src/main/java/com/project/global/exception/code/domain/GlobalErrorCode.java @@ -13,7 +13,12 @@ public enum GlobalErrorCode implements BaseErrorCode { METHOD_NOT_ALLOWED(HttpStatus.METHOD_NOT_ALLOWED, "COMMON_003", "지원하지 않은 Http Method 입니다."), INTERNAL_SERVER_ERROR(HttpStatus.INTERNAL_SERVER_ERROR, "COMMON_004", "서버 에러가 발생했습니다."), BLOCKED_API(HttpStatus.METHOD_NOT_ALLOWED, "COMMON_005", "운영 환경에서 사용할 수 없는 API 입니다."), - JSON_CONVERT_DISABLE(HttpStatus.BAD_REQUEST, "USER_005", "JSON으로 변환하는 과정에서 오류가 발생하였습니다"); + NOTIFICATION_EVENT_PRODUCE_INVALID( + HttpStatus.BAD_REQUEST, "COMMON_006", "Kafka Notification 이벤트 발행 과정에서 에러가 발생했습니다"), + PLAN_CHANGE_EVENT_PRODUCE_INVALID( + HttpStatus.BAD_REQUEST, "COMMON_007", "Kafka PlanChange 이벤트 발행 과정에서 에러가 발생했습니다"), + LUA_SCRIPT_LOAD_INVALID(HttpStatus.BAD_REQUEST, "COMMON_008", "LUA 스크립트를 불러오는 과정에서 에러가 발생했습니다"), + JSON_CONVERT_INVALID(HttpStatus.BAD_REQUEST, "COMMON_009", "JSON으로 변환하는 과정에서 에러가 발생했습니다"); private final HttpStatus httpStatus; private final String customCode; diff --git a/src/main/java/com/project/producer/NotificationProducer.java b/src/main/java/com/project/producer/NotificationProducer.java new file mode 100644 index 0000000..8d0985b --- /dev/null +++ b/src/main/java/com/project/producer/NotificationProducer.java @@ -0,0 +1,16 @@ +package com.project.producer; + +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Component; + +import lombok.RequiredArgsConstructor; + +@Component +@RequiredArgsConstructor +public class NotificationProducer { + private final KafkaTemplate kafkaTemplate; + + public void sendNotification(String payload) { + kafkaTemplate.send("notification_topic", payload); + } +} diff --git a/src/main/java/com/project/producer/PlanChangeProducer.java b/src/main/java/com/project/producer/PlanChangeProducer.java new file mode 100644 index 0000000..f2c589d --- /dev/null +++ b/src/main/java/com/project/producer/PlanChangeProducer.java @@ -0,0 +1,51 @@ +package com.project.producer; + +import java.time.OffsetDateTime; +import java.util.UUID; + +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Component; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.project.global.exception.ApplicationException; +import com.project.global.exception.code.domain.GlobalErrorCode; +import com.project.producer.schema.PlanChangeSchema; +import com.project.producer.test.PlanUnit; + +import lombok.RequiredArgsConstructor; + +@Component +@RequiredArgsConstructor +public class PlanChangeProducer { + + private final KafkaTemplate kafkaTemplate; + private final ObjectMapper objectMapper; + + public void sendPlanChangeEvent( + long subscriptionId, + PlanUnit unit, // MONTH / DAILY / UNLIMITED + long allowanceAmount, + OffsetDateTime changedAt, + String email, + String phone) { + PlanChangeSchema event = + new PlanChangeSchema( + UUID.randomUUID().toString(), + subscriptionId, + unit, + allowanceAmount, + changedAt, + email, + phone); + + try { + String value = objectMapper.writeValueAsString(event); + String key = String.valueOf(subscriptionId); + + kafkaTemplate.send("change_plan", key, value); + } catch (JsonProcessingException e) { + throw new ApplicationException(GlobalErrorCode.PLAN_CHANGE_EVENT_PRODUCE_INVALID); + } + } +} diff --git a/src/main/java/com/project/producer/UsageProducer.java b/src/main/java/com/project/producer/UsageProducer.java new file mode 100644 index 0000000..05ad6a4 --- /dev/null +++ b/src/main/java/com/project/producer/UsageProducer.java @@ -0,0 +1,43 @@ +package com.project.producer; + +import java.time.OffsetDateTime; +import java.util.UUID; + +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Component; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.project.global.exception.ApplicationException; +import com.project.global.exception.code.domain.GlobalErrorCode; +import com.project.producer.schema.UsageEventSchema; + +import lombok.RequiredArgsConstructor; + +@Component +@RequiredArgsConstructor +public class UsageProducer { + private final KafkaTemplate kafkaTemplate; + private final ObjectMapper objectMapper; + + public void sendUsageEvent( + long subscriptionId, long usageBytes, String yearMonth, long ttlSec) { + UsageEventSchema schema = + new UsageEventSchema( + UUID.randomUUID().toString(), + subscriptionId, + usageBytes, + OffsetDateTime.now().toString(), + yearMonth, + ttlSec); + + try { + String value = objectMapper.writeValueAsString(schema); + String key = String.valueOf(subscriptionId); + + kafkaTemplate.send("usage-data", key, value); + } catch (JsonProcessingException e) { + throw new ApplicationException(GlobalErrorCode.JSON_CONVERT_INVALID); + } + } +} diff --git a/src/main/java/com/project/producer/schema/CalculatedLimitSchema.java b/src/main/java/com/project/producer/schema/CalculatedLimitSchema.java new file mode 100644 index 0000000..d87ca7a --- /dev/null +++ b/src/main/java/com/project/producer/schema/CalculatedLimitSchema.java @@ -0,0 +1,6 @@ +package com.project.producer.schema; + +import com.project.producer.test.PlanUnit; + +public record CalculatedLimitSchema( + long subscriptionId, String yearMonth, long limit, long ttlSec, PlanUnit unit) {} diff --git a/src/main/java/com/project/producer/schema/PlanChangeSchema.java b/src/main/java/com/project/producer/schema/PlanChangeSchema.java new file mode 100644 index 0000000..a647165 --- /dev/null +++ b/src/main/java/com/project/producer/schema/PlanChangeSchema.java @@ -0,0 +1,14 @@ +package com.project.producer.schema; + +import java.time.OffsetDateTime; + +import com.project.producer.test.PlanUnit; + +public record PlanChangeSchema( + String eventId, // 멱등성/추적용 + long subscriptionId, // 회선 ID + PlanUnit unit, // MONTHLY | DAILY | UNLIMITED + long allowanceAmount, // 월 제공량 or 일 제공량 (bytes 기준) + OffsetDateTime changedAt, // 요금제 변경 시점 + String email, // 사용자 이메일 + String phone) {} diff --git a/src/main/java/com/project/producer/schema/UsageEventSchema.java b/src/main/java/com/project/producer/schema/UsageEventSchema.java new file mode 100644 index 0000000..4ee3ed3 --- /dev/null +++ b/src/main/java/com/project/producer/schema/UsageEventSchema.java @@ -0,0 +1,9 @@ +package com.project.producer.schema; + +public record UsageEventSchema( + String eventId, + long subscriptionId, + long usageBytes, + String timeStamp, + String event, + long ttlSec) {} diff --git a/src/main/java/com/project/producer/test/InitSubscriptionPlanRunner.java b/src/main/java/com/project/producer/test/InitSubscriptionPlanRunner.java new file mode 100644 index 0000000..b5353d3 --- /dev/null +++ b/src/main/java/com/project/producer/test/InitSubscriptionPlanRunner.java @@ -0,0 +1,50 @@ +package com.project.producer.test; + +import java.time.OffsetDateTime; +import java.util.concurrent.ThreadLocalRandom; + +import org.springframework.boot.CommandLineRunner; +import org.springframework.stereotype.Component; + +import com.project.producer.PlanChangeProducer; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Component +@RequiredArgsConstructor +public class InitSubscriptionPlanRunner implements CommandLineRunner { + + private final PlanChangeProducer planChangeProducer; + + private static final int SUB_START = 1; + private static final int SUB_END = 10_000; + + @Override + public void run(String... args) { + + OffsetDateTime baseTime = + OffsetDateTime.now().withDayOfMonth(1).withHour(0).withMinute(0).withSecond(0); + + PlanSeed[] plans = PlanSeed.values(); + ThreadLocalRandom random = ThreadLocalRandom.current(); + + for (long subId = SUB_START; subId <= SUB_END; subId++) { + + PlanSeed plan = plans[random.nextInt(plans.length)]; + + planChangeProducer.sendPlanChangeEvent( + subId, + plan.getUnit(), + plan.getAllowance(), + baseTime, + "user" + subId + "@test.com", + "010-" + + String.format( + "%04d-%04d", random.nextInt(10000), random.nextInt(10000))); + } + + log.info("Initial Plan Seeding Completed (1 ~ 10000)"); + } +} diff --git a/src/main/java/com/project/producer/test/PlanSeed.java b/src/main/java/com/project/producer/test/PlanSeed.java new file mode 100644 index 0000000..2c6d45c --- /dev/null +++ b/src/main/java/com/project/producer/test/PlanSeed.java @@ -0,0 +1,18 @@ +package com.project.producer.test; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +@Getter +@AllArgsConstructor +public enum PlanSeed { + FIVE_G_SIGNATURE("5G 시그니처", -1, PlanUnit.ULTIMATE), + FIVE_G_STANDARD("5G 스탠다드", 153600, PlanUnit.MONTH), + FIVE_G_BASIC_PLUS("5G 베이직+", 24576, PlanUnit.MONTH), + LTE_33("LTE 데이터 33", 1536, PlanUnit.MONTH), + LTE_DIRECT_45("LTE 다이렉트 45", 5120, PlanUnit.DAY); + + private final String name; + private final long allowance; + private final PlanUnit unit; +} diff --git a/src/main/java/com/project/producer/test/PlanUnit.java b/src/main/java/com/project/producer/test/PlanUnit.java new file mode 100644 index 0000000..995a419 --- /dev/null +++ b/src/main/java/com/project/producer/test/PlanUnit.java @@ -0,0 +1,7 @@ +package com.project.producer.test; + +public enum PlanUnit { + MONTH, + DAY, + ULTIMATE +} diff --git a/src/main/resources/lua/usage_batch.lua b/src/main/resources/lua/usage_batch.lua new file mode 100644 index 0000000..0a5e401 --- /dev/null +++ b/src/main/resources/lua/usage_batch.lua @@ -0,0 +1,72 @@ +local out = {} +local n = tonumber(ARGV[1]) +local idx = 2 + +for i = 1, n do + local subId = ARGV[idx]; idx = idx + 1 + local eventId = ARGV[idx]; idx = idx + 1 + local bytes = tonumber(ARGV[idx]); idx = idx + 1 + local ts = ARGV[idx]; idx = idx + 1 + + local monthKeyTime = string.sub(ts, 1, 7):gsub('-', '') + local limitKey = 'limit:' .. monthKeyTime .. ':' .. subId + + local unit = redis.call('GET', 'plan:unit:' .. subId) or 'MONTH' + + local usageKeyTime + local thKeyTime + local ttlSec + + if unit == 'DAY' then + usageKeyTime = string.sub(ts, 1, 10):gsub('-', '') + thKeyTime = usageKeyTime + ttlSec = 86400 * 2 + else + usageKeyTime = monthKeyTime + thKeyTime = monthKeyTime + ttlSec = redis.call('TTL', limitKey) + if ttlSec < 0 then ttlSec = 86400 * 35 end + end + + local usageKey = 'usage:' .. usageKeyTime .. ':' .. subId + local processedKey = 'processed:usage:' .. usageKeyTime .. ':' .. subId + local thKey = 'th:' .. thKeyTime .. ':' .. subId + + if redis.call('SISMEMBER', processedKey, eventId) == 0 then + redis.call('SADD', processedKey, eventId) + if redis.call('TTL', processedKey) < 0 then + redis.call('EXPIRE', processedKey, ttlSec) + end + + local newTotal = redis.call('INCRBY', usageKey, bytes) + if redis.call('TTL', usageKey) < 0 then + redis.call('EXPIRE', usageKey, ttlSec) + end + + local limit = tonumber(redis.call('GET', limitKey) or '0') + if limit > 0 then + local percent = math.floor((newTotal * 100) / limit) + + local prev = tonumber(redis.call('GET', thKey) or '0') + local next = prev + + if percent >= 100 and prev < 100 then next = 100 + elseif percent >= 80 and prev < 80 then next = 80 + elseif percent >= 50 and prev < 50 then next = 50 + end + + if next ~= prev then + redis.call('SET', thKey, tostring(next)) + if redis.call('TTL', thKey) < 0 then + redis.call('EXPIRE', thKey, ttlSec) + end + + table.insert(out, + thKeyTime .. '|' .. subId .. '|' .. next .. '|' .. percent .. '|' .. + newTotal .. '|' .. limit .. '|' .. eventId .. '|' .. ts) + end + end + end +end + +return out \ No newline at end of file