diff --git a/src/main/java/com/project/controller/BatchController.java b/src/main/java/com/project/controller/BatchController.java new file mode 100644 index 0000000..67a9561 --- /dev/null +++ b/src/main/java/com/project/controller/BatchController.java @@ -0,0 +1,29 @@ +package com.project.controller; + +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import com.project.rdb.batch.orchestrator.UsageOrchestrator; + +import lombok.RequiredArgsConstructor; + +@RestController +@RequestMapping("/api/batch") +@RequiredArgsConstructor +public class BatchController { + + private final UsageOrchestrator usageOrchestrator; + + @PostMapping("/usage/run") + public ResponseEntity runUsagePipeline() { + try { + usageOrchestrator.run(); + return ResponseEntity.ok("✅ usage batch pipeline started"); + } catch (Exception e) { + return ResponseEntity.internalServerError() + .body("❌ batch execution failed: " + e.getMessage()); + } + } +} diff --git a/src/main/java/com/project/global/exception/code/domain/GlobalErrorCode.java b/src/main/java/com/project/global/exception/code/domain/GlobalErrorCode.java index 8bba73c..0c006b9 100644 --- a/src/main/java/com/project/global/exception/code/domain/GlobalErrorCode.java +++ b/src/main/java/com/project/global/exception/code/domain/GlobalErrorCode.java @@ -24,7 +24,8 @@ public enum GlobalErrorCode implements BaseErrorCode { HttpStatus.BAD_REQUEST, "COMMON_011", "UsageNotification Produce 과정에서 에러가 발생하였습니다"), USAGE_OUTBOX_WRITER_FAILED( HttpStatus.BAD_REQUEST, "COMMON_012", "UsageOutbox Writer 배치 시스템이 실패하였습니다"), - PLAN_NOT_VALID(HttpStatus.BAD_REQUEST, "COMMON_013", "존재하지 않는 요금제입니다"); + PLAN_NOT_VALID(HttpStatus.BAD_REQUEST, "COMMON_013", "존재하지 않는 요금제입니다"), + BATCH_NOT_FINISHED(HttpStatus.BAD_REQUEST, "COMMON_014", "이전 배치 작업이 끝나지 않았습니다"); private final HttpStatus httpStatus; private final String customCode; diff --git a/src/main/java/com/project/rdb/BatchJobRunner.java b/src/main/java/com/project/rdb/BatchJobRunner.java deleted file mode 100644 index ba14d2f..0000000 --- a/src/main/java/com/project/rdb/BatchJobRunner.java +++ /dev/null @@ -1,55 +0,0 @@ -// package com.project.rdb; -// -// import org.springframework.batch.core.Job; -// import org.springframework.batch.core.JobParameters; -// import org.springframework.batch.core.JobParametersBuilder; -// import org.springframework.batch.core.launch.JobLauncher; -// import org.springframework.boot.CommandLineRunner; -// import org.springframework.stereotype.Component; -// -// import lombok.RequiredArgsConstructor; -// -// @Component -// @RequiredArgsConstructor -// public class BatchJobRunner implements CommandLineRunner { -// -// private final JobLauncher jobLauncher; -// private final Job usageAggregationJob; -// private final Job usageNotificationJob; -// private final Job notificationSendJob; -// -// ////// @Override -// ////// public void run(String... args) throws Exception { -// ////// JobParameters params = new JobParametersBuilder() -// ////// .addString("fromTime", "2025-12-01T00:00:00") -// ////// .addString("toTime", "2025-12-31T11:59:59") -// ////// .addLong("run.id", System.currentTimeMillis()) -// ////// .toJobParameters(); -// ////// -// ////// jobLauncher.run(usageAggregationJob, params); -// ////// } -// //// -// ////// @Override -// ////// public void run(String... args) throws Exception { -// ////// -// ////// JobParameters params = new JobParametersBuilder() -// ////// // 월 요금제 Step용 -// ////// .addString("period", "202512") -// ////// // 일 요금제 Step용 -// ////// .addString("usageDate", "20251201") -// ////// .addLong("run.id", System.currentTimeMillis()) -// ////// .toJobParameters(); -// ////// -// ////// jobLauncher.run(usageNotificationJob, params); -// ////// } -// // -// @Override -// public void run(String... args) throws Exception { -// JobParameters params = -// new JobParametersBuilder() -// .addLong("runAt", System.currentTimeMillis()) // 중복 실행 방지 -// .toJobParameters(); -// -// jobLauncher.run(notificationSendJob, params); -// } -// } diff --git a/src/main/java/com/project/rdb/batch/model/dto/NotificationSendTask.java b/src/main/java/com/project/rdb/batch/model/dto/NotificationSendTask.java deleted file mode 100644 index c4a6e6b..0000000 --- a/src/main/java/com/project/rdb/batch/model/dto/NotificationSendTask.java +++ /dev/null @@ -1,8 +0,0 @@ -package com.project.rdb.batch.model.dto; - -import java.util.concurrent.CompletableFuture; - -import org.springframework.kafka.support.SendResult; - -public record NotificationSendTask( - UsageNotificationEvent event, CompletableFuture> future) {} diff --git a/src/main/java/com/project/rdb/batch/model/dto/UsageNotificationEvent.java b/src/main/java/com/project/rdb/batch/model/dto/UsageNotificationEvent.java deleted file mode 100644 index 15fd16b..0000000 --- a/src/main/java/com/project/rdb/batch/model/dto/UsageNotificationEvent.java +++ /dev/null @@ -1,14 +0,0 @@ -package com.project.rdb.batch.model.dto; - -import java.util.UUID; - -public record UsageNotificationEvent( - UUID eventId, - Long id, - Long subId, - String period, - String unit, - int threshold, - int percent, - long totalUsedMb, - long allotmentMb) {} diff --git a/src/main/java/com/project/rdb/batch/model/dto/UsageNotificationOutboxRow.java b/src/main/java/com/project/rdb/batch/model/dto/UsageNotificationOutboxRow.java deleted file mode 100644 index 3dd49c3..0000000 --- a/src/main/java/com/project/rdb/batch/model/dto/UsageNotificationOutboxRow.java +++ /dev/null @@ -1,11 +0,0 @@ -package com.project.rdb.batch.model.dto; - -public record UsageNotificationOutboxRow( - Long id, - Long subId, - String period, - String unit, - int threshold, - int percent, - long totalUsedMb, - long allotmentMb) {} diff --git a/src/main/java/com/project/rdb/batch/model/dto/UsageNotificationSource.java b/src/main/java/com/project/rdb/batch/model/dto/UsageNotificationSource.java deleted file mode 100644 index 1c7754c..0000000 --- a/src/main/java/com/project/rdb/batch/model/dto/UsageNotificationSource.java +++ /dev/null @@ -1,4 +0,0 @@ -package com.project.rdb.batch.model.dto; - -public record UsageNotificationSource( - Long subId, String period, String unit, long totalUsedBytes, long allotmentAmount) {} diff --git a/src/main/java/com/project/rdb/batch/model/entity/UsageNotificationOutbox.java b/src/main/java/com/project/rdb/batch/model/entity/UsageNotificationOutbox.java index 0aa412a..6904aae 100644 --- a/src/main/java/com/project/rdb/batch/model/entity/UsageNotificationOutbox.java +++ b/src/main/java/com/project/rdb/batch/model/entity/UsageNotificationOutbox.java @@ -38,7 +38,7 @@ public class UsageNotificationOutbox { private int threshold; @Column(nullable = false) - private int percent; + private double percent; @Column(name = "total_used_mb", nullable = false) private Long totalUsedMb; diff --git a/src/main/java/com/project/rdb/batch/notificationsend/config/NotificationSendJobConfig.java b/src/main/java/com/project/rdb/batch/notificationsend/config/NotificationSendJobConfig.java index 0c735c1..8416ce7 100644 --- a/src/main/java/com/project/rdb/batch/notificationsend/config/NotificationSendJobConfig.java +++ b/src/main/java/com/project/rdb/batch/notificationsend/config/NotificationSendJobConfig.java @@ -12,10 +12,10 @@ import org.springframework.context.annotation.Configuration; import org.springframework.transaction.PlatformTransactionManager; -import com.project.rdb.batch.model.dto.UsageNotificationEvent; -import com.project.rdb.batch.model.dto.UsageNotificationOutboxRow; +import com.project.rdb.batch.notificationsend.dto.NotificationMessage; import com.project.rdb.batch.notificationsend.processor.NotificationSendProcessor; import com.project.rdb.batch.notificationsend.writer.NotificationSendWriter; +import com.project.rdb.batch.usagenotification.dto.UsageNotificationOutboxRow; import lombok.RequiredArgsConstructor; @@ -44,7 +44,7 @@ public Job notificationSendJob() { @Bean public Step notificationSendStep() { return new StepBuilder("notificationSendStep", jobRepository) - .chunk(CHUNK_SIZE, txManager) + .chunk(CHUNK_SIZE, txManager) .reader(outboxReader) .processor(notificationSendProcessor) .writer(notificationSendWriter) diff --git a/src/main/java/com/project/rdb/batch/notificationsend/dto/NotificationMessage.java b/src/main/java/com/project/rdb/batch/notificationsend/dto/NotificationMessage.java new file mode 100644 index 0000000..f11bd65 --- /dev/null +++ b/src/main/java/com/project/rdb/batch/notificationsend/dto/NotificationMessage.java @@ -0,0 +1,11 @@ +package com.project.rdb.batch.notificationsend.dto; + +import java.util.Map; +import java.util.UUID; + +public record NotificationMessage( + UUID eventId, + Long id, + Long templateGroupId, + Map subscriptionInfo, + Map variables) {} diff --git a/src/main/java/com/project/rdb/batch/notificationsend/dto/NotificationSendTask.java b/src/main/java/com/project/rdb/batch/notificationsend/dto/NotificationSendTask.java new file mode 100644 index 0000000..168c8d6 --- /dev/null +++ b/src/main/java/com/project/rdb/batch/notificationsend/dto/NotificationSendTask.java @@ -0,0 +1,8 @@ +package com.project.rdb.batch.notificationsend.dto; + +import java.util.concurrent.CompletableFuture; + +import org.springframework.kafka.support.SendResult; + +public record NotificationSendTask( + NotificationMessage event, CompletableFuture> future) {} diff --git a/src/main/java/com/project/rdb/batch/notificationsend/processor/NotificationSendProcessor.java b/src/main/java/com/project/rdb/batch/notificationsend/processor/NotificationSendProcessor.java index 69a5b6d..4895564 100644 --- a/src/main/java/com/project/rdb/batch/notificationsend/processor/NotificationSendProcessor.java +++ b/src/main/java/com/project/rdb/batch/notificationsend/processor/NotificationSendProcessor.java @@ -1,32 +1,40 @@ package com.project.rdb.batch.notificationsend.processor; +import java.util.Map; import java.util.UUID; import org.springframework.batch.item.ItemProcessor; import org.springframework.stereotype.Component; -import com.project.rdb.batch.model.dto.UsageNotificationEvent; -import com.project.rdb.batch.model.dto.UsageNotificationOutboxRow; +import com.project.rdb.batch.notificationsend.dto.NotificationMessage; +import com.project.rdb.batch.usagenotification.dto.UsageNotificationOutboxRow; import lombok.extern.slf4j.Slf4j; @Component @Slf4j public class NotificationSendProcessor - implements ItemProcessor { + implements ItemProcessor { @Override - public UsageNotificationEvent process(UsageNotificationOutboxRow item) { + public NotificationMessage process(UsageNotificationOutboxRow item) { - return new UsageNotificationEvent( + return new NotificationMessage( UUID.randomUUID(), item.id(), - item.subId(), - item.period(), - item.unit(), - item.threshold(), - item.percent(), - item.totalUsedMb(), - item.allotmentMb()); + 101L, + Map.of( + "subId", item.subId(), + "phoneNumber", item.phoneNumber(), + "email", item.email()), + Map.of( + "period", item.period(), + "threshold", item.threshold(), + "percent", item.percent(), + "totalUsedMb", item.totalUsedMb(), + "allotmentMb", item.allotmentMb(), + "phoneNumber", item.phoneNumber(), + "email", item.email(), + "createdAt", item.createdAt())); } } diff --git a/src/main/java/com/project/rdb/batch/notificationsend/reader/NotificationSendReaderConfig.java b/src/main/java/com/project/rdb/batch/notificationsend/reader/NotificationSendReaderConfig.java index d1bcdb6..141e1f8 100644 --- a/src/main/java/com/project/rdb/batch/notificationsend/reader/NotificationSendReaderConfig.java +++ b/src/main/java/com/project/rdb/batch/notificationsend/reader/NotificationSendReaderConfig.java @@ -7,7 +7,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import com.project.rdb.batch.model.dto.UsageNotificationOutboxRow; +import com.project.rdb.batch.usagenotification.dto.UsageNotificationOutboxRow; @Configuration public class NotificationSendReaderConfig { @@ -19,17 +19,24 @@ public JdbcCursorItemReader notificationSendOutboxRe String sql = """ SELECT - id, - sub_id, - period, - unit, - threshold, - percent, - total_used_mb, - allotment_mb - FROM usage_notification_outbox - WHERE status = 'PENDING' - ORDER BY id + o.id, + o.sub_id, + o.period, + o.plan_name, + o.threshold, + o.percent, + o.total_used_mb, + o.allotment_mb, + s.phone_number, + c.email_enc, + o.created_at + FROM usage_notification_outbox o + JOIN subscription s + ON s.sub_id = o.sub_id + JOIN customer c + ON c.customer_id = s.customer_id + WHERE o.status = 'PENDING' + ORDER BY o.id """; return new JdbcCursorItemReaderBuilder() @@ -43,11 +50,14 @@ public JdbcCursorItemReader notificationSendOutboxRe rs.getLong("id"), rs.getLong("sub_id"), rs.getString("period"), - rs.getString("unit"), + rs.getString("plan_name"), rs.getInt("threshold"), - rs.getInt("percent"), + rs.getDouble("percent"), rs.getLong("total_used_mb"), - rs.getLong("allotment_mb"))) + rs.getLong("allotment_mb"), + rs.getString("phone_number"), + rs.getString("email_enc"), + rs.getTimestamp("created_at").toLocalDateTime())) .build(); } } diff --git a/src/main/java/com/project/rdb/batch/notificationsend/writer/NotificationSendWriter.java b/src/main/java/com/project/rdb/batch/notificationsend/writer/NotificationSendWriter.java index 0258adb..a61a1a5 100644 --- a/src/main/java/com/project/rdb/batch/notificationsend/writer/NotificationSendWriter.java +++ b/src/main/java/com/project/rdb/batch/notificationsend/writer/NotificationSendWriter.java @@ -13,15 +13,15 @@ import org.springframework.stereotype.Component; import com.fasterxml.jackson.databind.ObjectMapper; -import com.project.rdb.batch.model.dto.NotificationSendTask; -import com.project.rdb.batch.model.dto.UsageNotificationEvent; import com.project.rdb.batch.model.repository.UsageNotificationOutboxRepository; +import com.project.rdb.batch.notificationsend.dto.NotificationMessage; +import com.project.rdb.batch.notificationsend.dto.NotificationSendTask; import lombok.RequiredArgsConstructor; @Component @RequiredArgsConstructor -public class NotificationSendWriter implements ItemWriter { +public class NotificationSendWriter implements ItemWriter { private final KafkaTemplate kafkaTemplate; private final UsageNotificationOutboxRepository repository; @@ -29,10 +29,10 @@ public class NotificationSendWriter implements ItemWriter items) { + public void write(Chunk items) { // usageNotificationOutboxId 추출 - List ids = items.getItems().stream().map(UsageNotificationEvent::id).toList(); + List ids = items.getItems().stream().map(NotificationMessage::id).toList(); // PROCESSING Status 먼저 DB 반영 repository.markProcessing(ids); @@ -42,12 +42,13 @@ public void write(Chunk items) { Map failedReasons = new HashMap<>(); - for (UsageNotificationEvent event : items) { + for (NotificationMessage event : items) { try { String payload = objectMapper.writeValueAsString(event); + String key = String.valueOf(event.subscriptionInfo().get("subId")); CompletableFuture> future = - kafkaTemplate.send("notification-usage", event.subId().toString(), payload); + kafkaTemplate.send("noti-tp", key, payload); tasks.add(new NotificationSendTask(event, future)); } catch (Exception e) { diff --git a/src/main/java/com/project/rdb/batch/orchestrator/BatchTimeWindowService.java b/src/main/java/com/project/rdb/batch/orchestrator/BatchTimeWindowService.java new file mode 100644 index 0000000..1bf5312 --- /dev/null +++ b/src/main/java/com/project/rdb/batch/orchestrator/BatchTimeWindowService.java @@ -0,0 +1,41 @@ +package com.project.rdb.batch.orchestrator; + +import java.time.LocalDateTime; +import java.util.Map; + +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import lombok.RequiredArgsConstructor; + +@Service +@RequiredArgsConstructor +public class BatchTimeWindowService { + + private final NamedParameterJdbcTemplate jdbc; + + public LocalDateTime resolve(String jobName) { + return jdbc.queryForObject( + """ + SELECT last_processed_at + FROM batch_watermark + WHERE job_name = :jobName + """, + Map.of("jobName", jobName), + LocalDateTime.class); + } + + @Transactional + public void update(String jobName, LocalDateTime newFrom) { + jdbc.update( + """ + UPDATE batch_watermark + SET last_processed_at = :newFrom + WHERE job_name = :jobName + """, + Map.of( + "jobName", jobName, + "newFrom", newFrom)); + } +} diff --git a/src/main/java/com/project/rdb/batch/orchestrator/UsageOrchestrator.java b/src/main/java/com/project/rdb/batch/orchestrator/UsageOrchestrator.java new file mode 100644 index 0000000..992e6ea --- /dev/null +++ b/src/main/java/com/project/rdb/batch/orchestrator/UsageOrchestrator.java @@ -0,0 +1,91 @@ +package com.project.rdb.batch.orchestrator; + +import java.time.LocalDateTime; + +import org.springframework.batch.core.Job; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobParameters; +import org.springframework.batch.core.JobParametersBuilder; +import org.springframework.batch.core.launch.JobLauncher; +import org.springframework.stereotype.Component; + +import lombok.RequiredArgsConstructor; + +@Component +@RequiredArgsConstructor +public class UsageOrchestrator { + + private final JobLauncher jobLauncher; + private final Job usageAggregationJob; + private final Job usageNotificationJob; + private final Job notificationSendJob; + + private final BatchTimeWindowService timeWindowService; + + public void run() throws Exception { + LocalDateTime aggregationStart = LocalDateTime.now(); + LocalDateTime aggFrom = timeWindowService.resolve("usage-aggregation"); + + JobParameters aggregationParams = + new JobParametersBuilder() + .addLocalDateTime("fromTime", aggFrom) + .addLocalDateTime("toTime", aggregationStart) + .addLong("run.id", System.currentTimeMillis()) + .toJobParameters(); + + JobExecution aggregationExec = jobLauncher.run(usageAggregationJob, aggregationParams); + + if (aggregationExec.getStatus().isUnsuccessful()) { + return; + } + + // ✅ 성공 시에만 watermark 이동 + timeWindowService.update("usage-aggregation", aggregationStart); + + /* =============================== + * 2️⃣ 알림 대상 추출 배치 + * =============================== */ + + LocalDateTime notificationStart = LocalDateTime.now(); + + LocalDateTime notificationFrom = timeWindowService.resolve("usage-notification"); + + JobParameters notificationParams = + new JobParametersBuilder() + .addLocalDateTime("fromTime", notificationFrom) + .addLocalDateTime("toTime", notificationStart) + .addLong("run.id", System.currentTimeMillis()) + .toJobParameters(); + + JobExecution notificationExec = jobLauncher.run(usageNotificationJob, notificationParams); + + if (notificationExec.getStatus().isUnsuccessful()) { + return; + } + + timeWindowService.update("usage-notification", notificationStart); + + /* =============================== + * 3️⃣ 알림 발송 배치 + * =============================== */ + + LocalDateTime sendStart = LocalDateTime.now(); + + LocalDateTime sendFrom = timeWindowService.resolve("notification-send"); + + JobParameters sendParams = + new JobParametersBuilder() + .addLocalDateTime("fromTime", sendFrom) + .addLocalDateTime("toTime", sendStart) + .addLong("run.id", System.currentTimeMillis()) + .toJobParameters(); + + JobExecution sendExec = jobLauncher.run(notificationSendJob, sendParams); + + if (sendExec.getStatus().isUnsuccessful()) { + return; + } + + timeWindowService.update("notification-send", sendStart); + } +} diff --git a/src/main/java/com/project/rdb/batch/orchestrator/dto/TimeWindow.java b/src/main/java/com/project/rdb/batch/orchestrator/dto/TimeWindow.java new file mode 100644 index 0000000..4a0be88 --- /dev/null +++ b/src/main/java/com/project/rdb/batch/orchestrator/dto/TimeWindow.java @@ -0,0 +1,5 @@ +package com.project.rdb.batch.orchestrator.dto; + +import java.time.LocalDateTime; + +public record TimeWindow(LocalDateTime from, LocalDateTime to) {} diff --git a/src/main/java/com/project/rdb/batch/usageaggregate/config/Sqls.java b/src/main/java/com/project/rdb/batch/usageaggregate/config/Sqls.java index 9a0bbc3..9ba6505 100644 --- a/src/main/java/com/project/rdb/batch/usageaggregate/config/Sqls.java +++ b/src/main/java/com/project/rdb/batch/usageaggregate/config/Sqls.java @@ -10,7 +10,7 @@ INSERT INTO usage_summary_daily ( sub_id, usage_date, total_used_bytes, updated_at ) VALUES ( - :subId, :period, :delta, NOW() + :subId, :usageDate, :delta, NOW() ) ON CONFLICT (sub_id, usage_date) DO UPDATE SET diff --git a/src/main/java/com/project/rdb/batch/usageaggregate/config/UsageAggregationJobConfig.java b/src/main/java/com/project/rdb/batch/usageaggregate/config/UsageAggregationJobConfig.java index 915b707..8e7cb2b 100644 --- a/src/main/java/com/project/rdb/batch/usageaggregate/config/UsageAggregationJobConfig.java +++ b/src/main/java/com/project/rdb/batch/usageaggregate/config/UsageAggregationJobConfig.java @@ -14,9 +14,9 @@ import org.springframework.transaction.PlatformTransactionManager; import com.project.rdb.batch.model.BatchStepMetricsListener; -import com.project.rdb.batch.model.dto.UsageDailyAggregation; -import com.project.rdb.batch.model.dto.UsageLogRow; -import com.project.rdb.batch.model.dto.UsageMonthlyAggregation; +import com.project.rdb.batch.usageaggregate.dto.UsageDailyAggregation; +import com.project.rdb.batch.usageaggregate.dto.UsageLogRow; +import com.project.rdb.batch.usageaggregate.dto.UsageMonthlyAggregation; import com.project.rdb.batch.usageaggregate.processor.UsageDailyProcessor; import com.project.rdb.batch.usageaggregate.processor.UsageMonthlyProcessor; import com.project.rdb.batch.usageaggregate.writer.UsageSummaryDailyWriter; diff --git a/src/main/java/com/project/rdb/batch/model/dto/UsageDailyAggregation.java b/src/main/java/com/project/rdb/batch/usageaggregate/dto/UsageDailyAggregation.java similarity index 63% rename from src/main/java/com/project/rdb/batch/model/dto/UsageDailyAggregation.java rename to src/main/java/com/project/rdb/batch/usageaggregate/dto/UsageDailyAggregation.java index af7697c..200ee75 100644 --- a/src/main/java/com/project/rdb/batch/model/dto/UsageDailyAggregation.java +++ b/src/main/java/com/project/rdb/batch/usageaggregate/dto/UsageDailyAggregation.java @@ -1,3 +1,3 @@ -package com.project.rdb.batch.model.dto; +package com.project.rdb.batch.usageaggregate.dto; public record UsageDailyAggregation(Long subId, String usageDate, long deltaBytes) {} diff --git a/src/main/java/com/project/rdb/batch/model/dto/UsageDailyKey.java b/src/main/java/com/project/rdb/batch/usageaggregate/dto/UsageDailyKey.java similarity index 88% rename from src/main/java/com/project/rdb/batch/model/dto/UsageDailyKey.java rename to src/main/java/com/project/rdb/batch/usageaggregate/dto/UsageDailyKey.java index 38de2ff..4c84c1e 100644 --- a/src/main/java/com/project/rdb/batch/model/dto/UsageDailyKey.java +++ b/src/main/java/com/project/rdb/batch/usageaggregate/dto/UsageDailyKey.java @@ -1,4 +1,4 @@ -package com.project.rdb.batch.model.dto; +package com.project.rdb.batch.usageaggregate.dto; import java.util.Objects; diff --git a/src/main/java/com/project/rdb/batch/model/dto/UsageLogRow.java b/src/main/java/com/project/rdb/batch/usageaggregate/dto/UsageLogRow.java similarity index 69% rename from src/main/java/com/project/rdb/batch/model/dto/UsageLogRow.java rename to src/main/java/com/project/rdb/batch/usageaggregate/dto/UsageLogRow.java index a45ea7d..10541e3 100644 --- a/src/main/java/com/project/rdb/batch/model/dto/UsageLogRow.java +++ b/src/main/java/com/project/rdb/batch/usageaggregate/dto/UsageLogRow.java @@ -1,4 +1,4 @@ -package com.project.rdb.batch.model.dto; +package com.project.rdb.batch.usageaggregate.dto; import java.time.LocalDateTime; diff --git a/src/main/java/com/project/rdb/batch/model/dto/UsageMonthlyAggregation.java b/src/main/java/com/project/rdb/batch/usageaggregate/dto/UsageMonthlyAggregation.java similarity index 63% rename from src/main/java/com/project/rdb/batch/model/dto/UsageMonthlyAggregation.java rename to src/main/java/com/project/rdb/batch/usageaggregate/dto/UsageMonthlyAggregation.java index e63dd9a..22d4387 100644 --- a/src/main/java/com/project/rdb/batch/model/dto/UsageMonthlyAggregation.java +++ b/src/main/java/com/project/rdb/batch/usageaggregate/dto/UsageMonthlyAggregation.java @@ -1,3 +1,3 @@ -package com.project.rdb.batch.model.dto; +package com.project.rdb.batch.usageaggregate.dto; public record UsageMonthlyAggregation(Long subId, String period, long deltaBytes) {} diff --git a/src/main/java/com/project/rdb/batch/model/dto/UsageMonthlyKey.java b/src/main/java/com/project/rdb/batch/usageaggregate/dto/UsageMonthlyKey.java similarity index 88% rename from src/main/java/com/project/rdb/batch/model/dto/UsageMonthlyKey.java rename to src/main/java/com/project/rdb/batch/usageaggregate/dto/UsageMonthlyKey.java index 8089e34..bf5be6f 100644 --- a/src/main/java/com/project/rdb/batch/model/dto/UsageMonthlyKey.java +++ b/src/main/java/com/project/rdb/batch/usageaggregate/dto/UsageMonthlyKey.java @@ -1,4 +1,4 @@ -package com.project.rdb.batch.model.dto; +package com.project.rdb.batch.usageaggregate.dto; import java.util.Objects; diff --git a/src/main/java/com/project/rdb/batch/usageaggregate/processor/UsageDailyProcessor.java b/src/main/java/com/project/rdb/batch/usageaggregate/processor/UsageDailyProcessor.java index b7a1b08..dd8f233 100644 --- a/src/main/java/com/project/rdb/batch/usageaggregate/processor/UsageDailyProcessor.java +++ b/src/main/java/com/project/rdb/batch/usageaggregate/processor/UsageDailyProcessor.java @@ -5,8 +5,8 @@ import org.springframework.batch.item.ItemProcessor; import org.springframework.stereotype.Component; -import com.project.rdb.batch.model.dto.UsageDailyAggregation; -import com.project.rdb.batch.model.dto.UsageLogRow; +import com.project.rdb.batch.usageaggregate.dto.UsageDailyAggregation; +import com.project.rdb.batch.usageaggregate.dto.UsageLogRow; @Component public class UsageDailyProcessor implements ItemProcessor { diff --git a/src/main/java/com/project/rdb/batch/usageaggregate/processor/UsageMonthlyProcessor.java b/src/main/java/com/project/rdb/batch/usageaggregate/processor/UsageMonthlyProcessor.java index 0af4e1e..d2ab8fa 100644 --- a/src/main/java/com/project/rdb/batch/usageaggregate/processor/UsageMonthlyProcessor.java +++ b/src/main/java/com/project/rdb/batch/usageaggregate/processor/UsageMonthlyProcessor.java @@ -5,8 +5,8 @@ import org.springframework.batch.item.ItemProcessor; import org.springframework.stereotype.Component; -import com.project.rdb.batch.model.dto.UsageLogRow; -import com.project.rdb.batch.model.dto.UsageMonthlyAggregation; +import com.project.rdb.batch.usageaggregate.dto.UsageLogRow; +import com.project.rdb.batch.usageaggregate.dto.UsageMonthlyAggregation; @Component public class UsageMonthlyProcessor implements ItemProcessor { diff --git a/src/main/java/com/project/rdb/batch/usageaggregate/reader/UsageLogDailyReaderConfig.java b/src/main/java/com/project/rdb/batch/usageaggregate/reader/UsageLogDailyReaderConfig.java index 7c7a889..fcb1eec 100644 --- a/src/main/java/com/project/rdb/batch/usageaggregate/reader/UsageLogDailyReaderConfig.java +++ b/src/main/java/com/project/rdb/batch/usageaggregate/reader/UsageLogDailyReaderConfig.java @@ -11,7 +11,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import com.project.rdb.batch.model.dto.UsageLogRow; +import com.project.rdb.batch.usageaggregate.dto.UsageLogRow; @Configuration public class UsageLogDailyReaderConfig { diff --git a/src/main/java/com/project/rdb/batch/usageaggregate/reader/UsageLogMonthlyReaderConfig.java b/src/main/java/com/project/rdb/batch/usageaggregate/reader/UsageLogMonthlyReaderConfig.java index 306a885..fa0c5c0 100644 --- a/src/main/java/com/project/rdb/batch/usageaggregate/reader/UsageLogMonthlyReaderConfig.java +++ b/src/main/java/com/project/rdb/batch/usageaggregate/reader/UsageLogMonthlyReaderConfig.java @@ -11,7 +11,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import com.project.rdb.batch.model.dto.UsageLogRow; +import com.project.rdb.batch.usageaggregate.dto.UsageLogRow; @Configuration public class UsageLogMonthlyReaderConfig { diff --git a/src/main/java/com/project/rdb/batch/usageaggregate/writer/UsageSummaryDailyWriter.java b/src/main/java/com/project/rdb/batch/usageaggregate/writer/UsageSummaryDailyWriter.java index d291f0f..7d11148 100644 --- a/src/main/java/com/project/rdb/batch/usageaggregate/writer/UsageSummaryDailyWriter.java +++ b/src/main/java/com/project/rdb/batch/usageaggregate/writer/UsageSummaryDailyWriter.java @@ -9,9 +9,9 @@ import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; import org.springframework.stereotype.Component; -import com.project.rdb.batch.model.dto.UsageDailyAggregation; -import com.project.rdb.batch.model.dto.UsageDailyKey; import com.project.rdb.batch.usageaggregate.config.Sqls; +import com.project.rdb.batch.usageaggregate.dto.UsageDailyAggregation; +import com.project.rdb.batch.usageaggregate.dto.UsageDailyKey; import lombok.RequiredArgsConstructor; diff --git a/src/main/java/com/project/rdb/batch/usageaggregate/writer/UsageSummaryMonthlyWriter.java b/src/main/java/com/project/rdb/batch/usageaggregate/writer/UsageSummaryMonthlyWriter.java index e931e3b..8eefc74 100644 --- a/src/main/java/com/project/rdb/batch/usageaggregate/writer/UsageSummaryMonthlyWriter.java +++ b/src/main/java/com/project/rdb/batch/usageaggregate/writer/UsageSummaryMonthlyWriter.java @@ -9,9 +9,9 @@ import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; import org.springframework.stereotype.Component; -import com.project.rdb.batch.model.dto.UsageMonthlyAggregation; -import com.project.rdb.batch.model.dto.UsageMonthlyKey; import com.project.rdb.batch.usageaggregate.config.Sqls; +import com.project.rdb.batch.usageaggregate.dto.UsageMonthlyAggregation; +import com.project.rdb.batch.usageaggregate.dto.UsageMonthlyKey; import lombok.RequiredArgsConstructor; diff --git a/src/main/java/com/project/rdb/batch/usagenotification/config/UsageNotificationJobConfig.java b/src/main/java/com/project/rdb/batch/usagenotification/config/UsageNotificationJobConfig.java index 7e3a8af..dfe5eda 100644 --- a/src/main/java/com/project/rdb/batch/usagenotification/config/UsageNotificationJobConfig.java +++ b/src/main/java/com/project/rdb/batch/usagenotification/config/UsageNotificationJobConfig.java @@ -14,8 +14,8 @@ import org.springframework.transaction.PlatformTransactionManager; import com.project.rdb.batch.model.BatchStepMetricsListener; -import com.project.rdb.batch.model.dto.UsageNotificationCandidate; -import com.project.rdb.batch.model.dto.UsageNotificationSource; +import com.project.rdb.batch.usagenotification.dto.UsageNotificationCandidate; +import com.project.rdb.batch.usagenotification.dto.UsageNotificationSource; import com.project.rdb.batch.usagenotification.processor.UsageNotificationProcessor; import com.project.rdb.batch.usagenotification.writer.UsageNotificationWriter; diff --git a/src/main/java/com/project/rdb/batch/usagenotification/config/util/UsageNotificationPolicy.java b/src/main/java/com/project/rdb/batch/usagenotification/config/util/UsageNotificationPolicy.java index d238204..75b044d 100644 --- a/src/main/java/com/project/rdb/batch/usagenotification/config/util/UsageNotificationPolicy.java +++ b/src/main/java/com/project/rdb/batch/usagenotification/config/util/UsageNotificationPolicy.java @@ -4,8 +4,8 @@ import org.springframework.stereotype.Component; -import com.project.rdb.batch.model.dto.UsageNotificationCandidate; -import com.project.rdb.batch.model.dto.UsageNotificationSource; +import com.project.rdb.batch.usagenotification.dto.UsageNotificationCandidate; +import com.project.rdb.batch.usagenotification.dto.UsageNotificationSource; @Component public class UsageNotificationPolicy { @@ -18,15 +18,16 @@ public Optional evaluate(UsageNotificationSource sou long totalUsedMb = source.totalUsedBytes() / (1024 * 1024); long allotmentMb = source.allotmentAmount(); - int percent = (int) ((totalUsedMb * 100L) / allotmentMb); + double percent = (double) (totalUsedMb * 100L) / allotmentMb; - return decideThreshold(percent) + return decideThreshold((int) percent) .map( threshold -> new UsageNotificationCandidate( source.subId(), source.period(), source.unit(), + source.planName(), threshold, percent, totalUsedMb, diff --git a/src/main/java/com/project/rdb/batch/model/dto/UsageNotificationCandidate.java b/src/main/java/com/project/rdb/batch/usagenotification/dto/UsageNotificationCandidate.java similarity index 64% rename from src/main/java/com/project/rdb/batch/model/dto/UsageNotificationCandidate.java rename to src/main/java/com/project/rdb/batch/usagenotification/dto/UsageNotificationCandidate.java index 89c20b5..fea5d23 100644 --- a/src/main/java/com/project/rdb/batch/model/dto/UsageNotificationCandidate.java +++ b/src/main/java/com/project/rdb/batch/usagenotification/dto/UsageNotificationCandidate.java @@ -1,10 +1,11 @@ -package com.project.rdb.batch.model.dto; +package com.project.rdb.batch.usagenotification.dto; public record UsageNotificationCandidate( Long subId, String period, String unit, + String planName, int threshold, - int percent, + double percent, long totalUsedMb, long allotmentMb) {} diff --git a/src/main/java/com/project/rdb/batch/usagenotification/dto/UsageNotificationOutboxRow.java b/src/main/java/com/project/rdb/batch/usagenotification/dto/UsageNotificationOutboxRow.java new file mode 100644 index 0000000..2d81818 --- /dev/null +++ b/src/main/java/com/project/rdb/batch/usagenotification/dto/UsageNotificationOutboxRow.java @@ -0,0 +1,16 @@ +package com.project.rdb.batch.usagenotification.dto; + +import java.time.LocalDateTime; + +public record UsageNotificationOutboxRow( + Long id, + Long subId, + String period, + String planName, + int threshold, + double percent, + long totalUsedMb, + long allotmentMb, + String phoneNumber, + String email, + LocalDateTime createdAt) {} diff --git a/src/main/java/com/project/rdb/batch/usagenotification/dto/UsageNotificationSource.java b/src/main/java/com/project/rdb/batch/usagenotification/dto/UsageNotificationSource.java new file mode 100644 index 0000000..4c6b178 --- /dev/null +++ b/src/main/java/com/project/rdb/batch/usagenotification/dto/UsageNotificationSource.java @@ -0,0 +1,9 @@ +package com.project.rdb.batch.usagenotification.dto; + +public record UsageNotificationSource( + Long subId, + String period, + String unit, + String planName, + long totalUsedBytes, + long allotmentAmount) {} diff --git a/src/main/java/com/project/rdb/batch/usagenotification/processor/UsageNotificationProcessor.java b/src/main/java/com/project/rdb/batch/usagenotification/processor/UsageNotificationProcessor.java index d13f601..3b54019 100644 --- a/src/main/java/com/project/rdb/batch/usagenotification/processor/UsageNotificationProcessor.java +++ b/src/main/java/com/project/rdb/batch/usagenotification/processor/UsageNotificationProcessor.java @@ -3,9 +3,9 @@ import org.springframework.batch.item.ItemProcessor; import org.springframework.stereotype.Component; -import com.project.rdb.batch.model.dto.UsageNotificationCandidate; -import com.project.rdb.batch.model.dto.UsageNotificationSource; import com.project.rdb.batch.usagenotification.config.util.UsageNotificationPolicy; +import com.project.rdb.batch.usagenotification.dto.UsageNotificationCandidate; +import com.project.rdb.batch.usagenotification.dto.UsageNotificationSource; import lombok.RequiredArgsConstructor; diff --git a/src/main/java/com/project/rdb/batch/usagenotification/reader/UsageNotificationDailyReaderConfig.java b/src/main/java/com/project/rdb/batch/usagenotification/reader/UsageNotificationDailyReaderConfig.java index 60f4cf8..b40b5f2 100644 --- a/src/main/java/com/project/rdb/batch/usagenotification/reader/UsageNotificationDailyReaderConfig.java +++ b/src/main/java/com/project/rdb/batch/usagenotification/reader/UsageNotificationDailyReaderConfig.java @@ -1,5 +1,7 @@ package com.project.rdb.batch.usagenotification.reader; +import java.time.LocalDateTime; + import javax.sql.DataSource; import org.springframework.batch.core.configuration.annotation.StepScope; @@ -9,7 +11,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import com.project.rdb.batch.model.dto.UsageNotificationSource; +import com.project.rdb.batch.usagenotification.dto.UsageNotificationSource; @Configuration public class UsageNotificationDailyReaderConfig { @@ -17,7 +19,9 @@ public class UsageNotificationDailyReaderConfig { @Bean(name = "usageNotificationDailyReader") @StepScope public JdbcCursorItemReader usageNotificationDailyReader( - DataSource dataSource, @Value("#{jobParameters['usageDate']}") String usageDate) { + DataSource dataSource, + @Value("#{jobParameters['fromTime']}") LocalDateTime fromTime, + @Value("#{jobParameters['toTime']}") LocalDateTime toTime) { String sql = """ @@ -25,12 +29,14 @@ public JdbcCursorItemReader usageNotificationDailyReade usd.sub_id, usd.usage_date AS period, 'DAY' AS unit, + sp.plan_name, usd.total_used_bytes, sp.allotment_amount FROM usage_summary_daily usd JOIN subscription_plan sp ON sp.sub_id = usd.sub_id - WHERE usd.usage_date = ? + WHERE usd.updated_at >= ? + AND usd.updated_at < ? AND sp.allotment_amount = 5120 ORDER BY usd.sub_id """; @@ -40,13 +46,18 @@ public JdbcCursorItemReader usageNotificationDailyReade .dataSource(dataSource) .sql(sql) .fetchSize(1000) - .preparedStatementSetter(ps -> ps.setString(1, usageDate)) + .preparedStatementSetter( + ps -> { + ps.setObject(1, fromTime); + ps.setObject(2, toTime); + }) .rowMapper( (rs, rowNum) -> new UsageNotificationSource( rs.getLong("sub_id"), rs.getString("period"), rs.getString("unit"), + rs.getString("plan_name"), rs.getLong("total_used_bytes"), rs.getLong("allotment_amount"))) .build(); diff --git a/src/main/java/com/project/rdb/batch/usagenotification/reader/UsageNotificationMonthlyReaderConfig.java b/src/main/java/com/project/rdb/batch/usagenotification/reader/UsageNotificationMonthlyReaderConfig.java index d87778e..15b725b 100644 --- a/src/main/java/com/project/rdb/batch/usagenotification/reader/UsageNotificationMonthlyReaderConfig.java +++ b/src/main/java/com/project/rdb/batch/usagenotification/reader/UsageNotificationMonthlyReaderConfig.java @@ -1,5 +1,7 @@ package com.project.rdb.batch.usagenotification.reader; +import java.time.LocalDateTime; + import javax.sql.DataSource; import org.springframework.batch.core.configuration.annotation.StepScope; @@ -9,7 +11,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import com.project.rdb.batch.model.dto.UsageNotificationSource; +import com.project.rdb.batch.usagenotification.dto.UsageNotificationSource; @Configuration public class UsageNotificationMonthlyReaderConfig { @@ -17,23 +19,27 @@ public class UsageNotificationMonthlyReaderConfig { @Bean(name = "usageNotificationMonthlyReader") @StepScope public JdbcCursorItemReader usageNotificationMonthlyReader( - DataSource dataSource, @Value("#{jobParameters['period']}") String period) { + DataSource dataSource, + @Value("#{jobParameters['fromTime']}") LocalDateTime fromTime, + @Value("#{jobParameters['toTime']}") LocalDateTime toTime) { String sql = """ SELECT - us.sub_id, - us.period, + usm.sub_id, + usm.period, 'MONTH' AS unit, - us.total_used_bytes, + sp.plan_name, + usm.total_used_bytes, sp.allotment_amount - FROM usage_summary_monthly us + FROM usage_summary_monthly usm JOIN subscription_plan sp - ON sp.sub_id = us.sub_id - WHERE us.period = ? + ON sp.sub_id = usm.sub_id + WHERE usm.updated_at >= ? + AND usm.updated_at < ? AND sp.allotment_amount > 0 AND sp.allotment_amount != 5120 - ORDER BY us.sub_id + ORDER BY usm.sub_id """; return new JdbcCursorItemReaderBuilder() @@ -41,13 +47,18 @@ public JdbcCursorItemReader usageNotificationMonthlyRea .dataSource(dataSource) .sql(sql) .fetchSize(1000) - .preparedStatementSetter(ps -> ps.setString(1, period)) + .preparedStatementSetter( + ps -> { + ps.setObject(1, fromTime); + ps.setObject(2, toTime); + }) .rowMapper( (rs, rowNum) -> new UsageNotificationSource( rs.getLong("sub_id"), rs.getString("period"), rs.getString("unit"), + rs.getString("plan_name"), rs.getLong("total_used_bytes"), rs.getLong("allotment_amount"))) .build(); diff --git a/src/main/java/com/project/rdb/batch/usagenotification/writer/UsageNotificationWriter.java b/src/main/java/com/project/rdb/batch/usagenotification/writer/UsageNotificationWriter.java index 5a980ab..021631c 100644 --- a/src/main/java/com/project/rdb/batch/usagenotification/writer/UsageNotificationWriter.java +++ b/src/main/java/com/project/rdb/batch/usagenotification/writer/UsageNotificationWriter.java @@ -10,7 +10,7 @@ import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; -import com.project.rdb.batch.model.dto.UsageNotificationCandidate; +import com.project.rdb.batch.usagenotification.dto.UsageNotificationCandidate; import lombok.RequiredArgsConstructor; @@ -31,9 +31,17 @@ public void write(Chunk chunk) { String sql = """ INSERT INTO usage_notification_outbox - (sub_id, period, unit, threshold, percent, total_used_mb, allotment_mb, status, created_at) + (sub_id, period, + plan_name, unit, + threshold, percent, + total_used_mb, allotment_mb, + status, created_at) VALUES - (:subId, :period, :unit, :threshold, :percent, :totalUsedMb, :allotmentMb, 'PENDING', NOW()) + (:subId, :period, + :planName, :unit, + :threshold, :percent, + :totalUsedMb, :allotmentMb, + 'PENDING', NOW()) ON CONFLICT (sub_id, period, unit, threshold) DO NOTHING """; @@ -45,6 +53,7 @@ ON CONFLICT (sub_id, period, unit, threshold) Map map = new HashMap<>(); map.put("subId", c.subId()); map.put("period", c.period()); + map.put("planName", c.planName()); map.put("unit", c.unit()); map.put("threshold", c.threshold()); map.put("percent", c.percent()); diff --git a/src/main/java/com/project/rdb/kafka/consumer/NotificationSendConsumer.java b/src/main/java/com/project/rdb/kafka/consumer/NotificationSendConsumer.java index a613d55..8b9030a 100644 --- a/src/main/java/com/project/rdb/kafka/consumer/NotificationSendConsumer.java +++ b/src/main/java/com/project/rdb/kafka/consumer/NotificationSendConsumer.java @@ -1,12 +1,13 @@ package com.project.rdb.kafka.consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.context.annotation.Profile; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; import com.fasterxml.jackson.databind.ObjectMapper; -import com.project.rdb.batch.model.dto.UsageNotificationEvent; +import com.project.rdb.batch.notificationsend.dto.NotificationMessage; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -21,13 +22,15 @@ public class NotificationSendConsumer { @KafkaListener( topics = "notification-usage", - containerFactory = "kafkaListenerContainerFactory") + containerFactory = "kafkaListenerContainerFactory", + autoStartup = "false") + @Profile("notification-worker") public void consume(ConsumerRecord record, Acknowledgment ack) { log.info("🔥 CONSUME START offset={}, value={}", record.offset(), record.value()); try { - UsageNotificationEvent event = - objectMapper.readValue(record.value(), UsageNotificationEvent.class); + NotificationMessage event = + objectMapper.readValue(record.value(), NotificationMessage.class); String eventId = event.eventId().toString(); diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 22864ee..e3f90b2 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -16,12 +16,14 @@ spring: driver-class-name: org.postgresql.Driver batch: + jdbc: + initialize-schema: always job: enabled: false jpa: hibernate: - ddl-auto: validate + ddl-auto: none show-sql: true database-platform: org.hibernate.dialect.PostgreSQLDialect properties: @@ -37,15 +39,15 @@ spring: url: redis://${REDIS_USERNAME:default}:${REDIS_PASSWORD:}@${REDIS_ENDPOINT:localhost:6379} kafka: - bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS} + bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:pkc-oxqxx9.us-east-1.aws.confluent.cloud:9092} properties: security.protocol: SASL_SSL sasl.mechanism: PLAIN sasl.jaas.config: > org.apache.kafka.common.security.plain.PlainLoginModule required - username='${KAFKA_API_KEY}' - password='${KAFKA_API_SECRET}'; + username='${KAFKA_API_KEY:EPO6DS6OXW7GYVEE}' + password='${KAFKA_API_SECRET:cfltJj+U/TvM2TfzfqxpXIu5xDE/bPGe5sfIff5mrLZ9Usx4K9LudKBWPLIGG3QQ}'; producer: retries: 2147483647