diff --git a/build.gradle b/build.gradle index b8f353d..a02b561 100644 --- a/build.gradle +++ b/build.gradle @@ -51,6 +51,7 @@ dependencies { implementation 'org.springframework.boot:spring-boot-starter-data-jdbc' implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'org.springframework.boot:spring-boot-starter-actuator' + implementation 'org.springframework.boot:spring-boot-starter-data-jpa' // Monitoring & Observability - OpenTelemetry (traces, logs) implementation 'io.opentelemetry.instrumentation:opentelemetry-spring-boot-starter' diff --git a/gradlew b/gradlew old mode 100644 new mode 100755 diff --git a/src/main/java/com/template/worker/batch/model/BatchStepMetricsListener.java b/src/main/java/com/template/worker/batch/model/BatchStepMetricsListener.java new file mode 100644 index 0000000..69e869e --- /dev/null +++ b/src/main/java/com/template/worker/batch/model/BatchStepMetricsListener.java @@ -0,0 +1,83 @@ +package com.template.worker.batch.model; + +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.time.Duration; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.Objects; + +import org.springframework.batch.core.ExitStatus; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.StepExecution; +import org.springframework.batch.core.StepExecutionListener; +import org.springframework.stereotype.Component; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.template.worker.batch.model.entity.BatchExecutionReport; +import com.template.worker.batch.model.repository.BatchExecutionReportRepository; + +import lombok.RequiredArgsConstructor; + +@Component +@RequiredArgsConstructor +public class BatchStepMetricsListener implements StepExecutionListener { + private final BatchExecutionReportRepository reportRepository; + private final ObjectMapper objectMapper; + + @Override + public ExitStatus afterStep(StepExecution stepExecution) { + + JobExecution jobExecution = stepExecution.getJobExecution(); + + LocalDateTime start = + Objects.requireNonNull(stepExecution.getStartTime()) + .atZone(ZoneId.of("Asia/Seoul")) + .toLocalDateTime(); + + LocalDateTime end = + Objects.requireNonNull(stepExecution.getEndTime()) + .atZone(ZoneId.of("Asia/Seoul")) + .toLocalDateTime(); + + long durationMs = Duration.between(start, end).toMillis(); + + long readCount = stepExecution.getReadCount(); + BigDecimal tps = + durationMs > 0 + ? BigDecimal.valueOf(readCount) + .multiply(BigDecimal.valueOf(1000)) + .divide(BigDecimal.valueOf(durationMs), 2, RoundingMode.HALF_UP) + : BigDecimal.ZERO; + + String paramsJson; + try { + paramsJson = + objectMapper.writeValueAsString( + jobExecution.getJobParameters().getParameters()); + } catch (Exception e) { + paramsJson = "{}"; + } + + reportRepository.save( + BatchExecutionReport.builder() + .jobName(jobExecution.getJobInstance().getJobName()) + .stepName(stepExecution.getStepName()) + .status(stepExecution.getStatus().toString()) + .startedAt(start) + .endedAt(end) + .durationMs(durationMs) + .readCount(readCount) + .writeCount(stepExecution.getWriteCount()) + .filterCount(stepExecution.getFilterCount()) + .skipCount(stepExecution.getSkipCount()) + .commitCount(stepExecution.getCommitCount()) + .rollbackCount(stepExecution.getRollbackCount()) + .tps(tps) + .jobParameters(paramsJson) + .createdAt(LocalDateTime.now()) + .build()); + + return stepExecution.getExitStatus(); + } +} diff --git a/src/main/java/com/template/worker/batch/model/entity/BatchExecutionReport.java b/src/main/java/com/template/worker/batch/model/entity/BatchExecutionReport.java new file mode 100644 index 0000000..1f563bf --- /dev/null +++ b/src/main/java/com/template/worker/batch/model/entity/BatchExecutionReport.java @@ -0,0 +1,52 @@ +package com.template.worker.batch.model.entity; + +import java.math.BigDecimal; +import java.time.LocalDateTime; + +import jakarta.persistence.Column; +import jakarta.persistence.Entity; +import jakarta.persistence.GeneratedValue; +import jakarta.persistence.GenerationType; +import jakarta.persistence.Id; +import jakarta.persistence.Table; + +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; + +@Entity +@Table(name = "batch_execution_report") +@Getter +@NoArgsConstructor(access = AccessLevel.PROTECTED) +@AllArgsConstructor +@Builder +public class BatchExecutionReport { + @Id + @GeneratedValue(strategy = GenerationType.IDENTITY) + private Long id; + + private String jobName; + private String stepName; + + private String status; + + private LocalDateTime startedAt; + private LocalDateTime endedAt; + private long durationMs; + + private long readCount; + private long writeCount; + private long filterCount; + private long skipCount; + private long commitCount; + private long rollbackCount; + + private BigDecimal tps; + + @Column(columnDefinition = "jsonb") + private String jobParameters; + + private LocalDateTime createdAt; +} diff --git a/src/main/java/com/template/worker/batch/model/repository/BatchExecutionReportRepository.java b/src/main/java/com/template/worker/batch/model/repository/BatchExecutionReportRepository.java new file mode 100644 index 0000000..9e11fdf --- /dev/null +++ b/src/main/java/com/template/worker/batch/model/repository/BatchExecutionReportRepository.java @@ -0,0 +1,7 @@ +package com.template.worker.batch.model.repository; + +import org.springframework.data.jpa.repository.JpaRepository; + +import com.template.worker.batch.model.entity.BatchExecutionReport; + +public interface BatchExecutionReportRepository extends JpaRepository {} diff --git a/src/main/java/com/template/worker/batch/model/repository/UsageNotificationOutboxRepository.java b/src/main/java/com/template/worker/batch/model/repository/UsageNotificationOutboxRepository.java new file mode 100644 index 0000000..1064ab5 --- /dev/null +++ b/src/main/java/com/template/worker/batch/model/repository/UsageNotificationOutboxRepository.java @@ -0,0 +1,46 @@ +package com.template.worker.batch.model.repository; + +import java.util.List; +import java.util.Map; + +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Repository; + +import lombok.RequiredArgsConstructor; + +@Repository +@RequiredArgsConstructor +public class UsageNotificationOutboxRepository { + + private final JdbcTemplate jdbcTemplate; + + public void markProcessing(List ids) { + jdbcTemplate.batchUpdate( + "UPDATE usage_notification_outbox SET status = 'PROCESSING' WHERE id = ?", + ids, + ids.size(), + (ps, id) -> ps.setLong(1, id)); + } + + public void markSent(List ids) { + jdbcTemplate.batchUpdate( + "UPDATE usage_notification_outbox SET status = 'SENT', sent_at = now() WHERE id =" + + " ?", + ids, + ids.size(), + (ps, id) -> ps.setLong(1, id)); + } + + public void markFailedWithReasons(Map reasons) { + jdbcTemplate.batchUpdate( + "UPDATE usage_notification_outbox " + + "SET status = 'FAILED', failure_reason = ? " + + "WHERE id = ?", + reasons.entrySet(), + reasons.size(), + (ps, entry) -> { + ps.setString(1, entry.getValue()); + ps.setLong(2, entry.getKey()); + }); + } +} diff --git a/src/main/java/com/template/worker/batch/notificationsend/config/NotificationSendJobConfig.java b/src/main/java/com/template/worker/batch/notificationsend/config/NotificationSendJobConfig.java new file mode 100644 index 0000000..e1532ec --- /dev/null +++ b/src/main/java/com/template/worker/batch/notificationsend/config/NotificationSendJobConfig.java @@ -0,0 +1,55 @@ +package com.template.worker.batch.notificationsend.config; + +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.configuration.annotation.JobScope; +import org.springframework.batch.core.job.builder.JobBuilder; +import org.springframework.batch.core.launch.support.RunIdIncrementer; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.step.builder.StepBuilder; +import org.springframework.batch.item.database.JdbcCursorItemReader; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.transaction.PlatformTransactionManager; + +import com.template.worker.batch.notificationsend.dto.NotificationMessage; +import com.template.worker.batch.notificationsend.processor.NotificationSendProcessor; +import com.template.worker.batch.notificationsend.writer.NotificationSendWriter; +import com.template.worker.batch.usagenotification.dto.UsageNotificationOutboxRow; + +import lombok.RequiredArgsConstructor; + +@Configuration +@RequiredArgsConstructor +public class NotificationSendJobConfig { + + @Value("${spring.batch.jobs.usage.chunk-size}") + private int chunkSize; + + private final JobRepository jobRepository; + private final PlatformTransactionManager txManager; + + private final JdbcCursorItemReader outboxReader; + private final NotificationSendProcessor notificationSendProcessor; + private final NotificationSendWriter notificationSendWriter; + + @Bean + public Job notificationSendJob() { + return new JobBuilder("notificationSendJob", jobRepository) + .incrementer(new RunIdIncrementer()) + .start(notificationSendStep()) + .build(); + } + + @JobScope + @Bean + public Step notificationSendStep() { + return new StepBuilder("notificationSendStep", jobRepository) + .chunk(chunkSize, txManager) + .reader(outboxReader) + .processor(notificationSendProcessor) + .writer(notificationSendWriter) + .build(); + } +} diff --git a/src/main/java/com/template/worker/batch/notificationsend/dto/NotificationMessage.java b/src/main/java/com/template/worker/batch/notificationsend/dto/NotificationMessage.java new file mode 100644 index 0000000..c7790a2 --- /dev/null +++ b/src/main/java/com/template/worker/batch/notificationsend/dto/NotificationMessage.java @@ -0,0 +1,11 @@ +package com.template.worker.batch.notificationsend.dto; + +import java.util.Map; +import java.util.UUID; + +public record NotificationMessage( + UUID eventId, + Long id, + Long templateGroupId, + Map subscriptionInfo, + Map variables) {} diff --git a/src/main/java/com/template/worker/batch/notificationsend/dto/NotificationSendTask.java b/src/main/java/com/template/worker/batch/notificationsend/dto/NotificationSendTask.java new file mode 100644 index 0000000..03b72b4 --- /dev/null +++ b/src/main/java/com/template/worker/batch/notificationsend/dto/NotificationSendTask.java @@ -0,0 +1,8 @@ +package com.template.worker.batch.notificationsend.dto; + +import java.util.concurrent.CompletableFuture; + +import org.springframework.kafka.support.SendResult; + +public record NotificationSendTask( + NotificationMessage event, CompletableFuture> future) {} diff --git a/src/main/java/com/template/worker/batch/notificationsend/processor/NotificationSendProcessor.java b/src/main/java/com/template/worker/batch/notificationsend/processor/NotificationSendProcessor.java new file mode 100644 index 0000000..450a8b5 --- /dev/null +++ b/src/main/java/com/template/worker/batch/notificationsend/processor/NotificationSendProcessor.java @@ -0,0 +1,40 @@ +package com.template.worker.batch.notificationsend.processor; + +import java.util.Map; +import java.util.UUID; + +import org.springframework.batch.item.ItemProcessor; +import org.springframework.stereotype.Component; + +import com.template.worker.batch.notificationsend.dto.NotificationMessage; +import com.template.worker.batch.usagenotification.dto.UsageNotificationOutboxRow; + +import lombok.extern.slf4j.Slf4j; + +@Component +@Slf4j +public class NotificationSendProcessor + implements ItemProcessor { + + @Override + public NotificationMessage process(UsageNotificationOutboxRow item) { + + return new NotificationMessage( + UUID.randomUUID(), + item.id(), + 101L, + Map.of( + "subId", item.subId(), + "phoneNumber", item.phoneNumber(), + "email", item.email()), + Map.of( + "period", item.period(), + "threshold", item.threshold(), + "percent", item.percent(), + "totalUsedMb", item.totalUsedMb(), + "allotmentMb", item.allotmentMb(), + "phoneNumber", item.phoneNumber(), + "email", item.email(), + "createdAt", item.createdAt())); + } +} diff --git a/src/main/java/com/template/worker/batch/notificationsend/reader/NotificationSendReaderConfig.java b/src/main/java/com/template/worker/batch/notificationsend/reader/NotificationSendReaderConfig.java new file mode 100644 index 0000000..be52ba8 --- /dev/null +++ b/src/main/java/com/template/worker/batch/notificationsend/reader/NotificationSendReaderConfig.java @@ -0,0 +1,67 @@ +package com.template.worker.batch.notificationsend.reader; + +import javax.sql.DataSource; + +import org.springframework.batch.item.database.JdbcCursorItemReader; +import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import com.template.worker.batch.usagenotification.dto.UsageNotificationOutboxRow; + +@Configuration +public class NotificationSendReaderConfig { + + @Value("${spring.batch.jobs.usage.fetch-size}") + private int fetchSize; + + @Bean(name = "notificationSendOutboxReader") + public JdbcCursorItemReader notificationSendOutboxReader( + DataSource dataSource) { + + String sql = + """ + SELECT + o.id, + o.sub_id, + o.period, + o.plan_name, + o.threshold, + o.percent, + o.total_used_mb, + o.allotment_mb, + s.phone_number, + c.email_enc, + o.created_at + FROM usage_notification_outbox o + JOIN subscription s + ON s.sub_id = o.sub_id + JOIN customer c + ON c.customer_id = s.customer_id + WHERE o.status = 'PENDING' + ORDER BY o.id + """; + + return new JdbcCursorItemReaderBuilder() + .name("notificationSendOutboxReader") + .dataSource(dataSource) + .sql(sql) + .fetchSize(fetchSize) + .rowMapper( + (rs, rowNum) -> + new UsageNotificationOutboxRow( + rs.getLong("id"), + rs.getLong("sub_id"), + rs.getString("period"), + rs.getString("plan_name"), + rs.getInt("threshold"), + rs.getDouble("percent"), + rs.getLong("total_used_mb"), + rs.getLong("allotment_mb"), + rs.getString("phone_number"), + rs.getString("email_enc"), + rs.getTimestamp("created_at").toLocalDateTime())) + .build(); + } +} diff --git a/src/main/java/com/template/worker/batch/notificationsend/writer/NotificationSendWriter.java b/src/main/java/com/template/worker/batch/notificationsend/writer/NotificationSendWriter.java new file mode 100644 index 0000000..e0ebfcf --- /dev/null +++ b/src/main/java/com/template/worker/batch/notificationsend/writer/NotificationSendWriter.java @@ -0,0 +1,109 @@ +package com.template.worker.batch.notificationsend.writer; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +import org.springframework.batch.item.Chunk; +import org.springframework.batch.item.ItemWriter; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; +import org.springframework.stereotype.Component; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.template.worker.batch.model.repository.UsageNotificationOutboxRepository; +import com.template.worker.batch.notificationsend.dto.NotificationMessage; +import com.template.worker.batch.notificationsend.dto.NotificationSendTask; + +import lombok.RequiredArgsConstructor; + +@Component +@RequiredArgsConstructor +public class NotificationSendWriter implements ItemWriter { + + private final KafkaTemplate kafkaTemplate; + private final UsageNotificationOutboxRepository repository; + private final ObjectMapper objectMapper; + private static final int MAX_FAILURE_REASON_LENGTH = 255; + + @Override + public void write(Chunk items) { + + // usageNotificationOutboxId 추출 + List ids = items.getItems().stream().map(NotificationMessage::id).toList(); + + // PROCESSING Status 먼저 DB 반영 + repository.markProcessing(ids); + + // Kafka 비동기 발행 + List tasks = new ArrayList<>(); + + Map failedReasons = new HashMap<>(); + + for (NotificationMessage event : items) { + try { + String payload = objectMapper.writeValueAsString(event); + String key = String.valueOf(event.subscriptionInfo().get("subId")); + + CompletableFuture> future = + kafkaTemplate.send("usage-noti", key, payload); + + tasks.add(new NotificationSendTask(event, future)); + } catch (Exception e) { + failedReasons.put(event.id(), safeFailureReason(e)); + } + } + + // Chunk 단위 ACK 동기화 + List successIds = new ArrayList<>(); + + for (NotificationSendTask task : tasks) { + try { + task.future().join(); + successIds.add(task.event().id()); + } catch (Exception ex) { + Throwable cause = ex.getCause() != null ? ex.getCause() : ex; + + failedReasons.put(task.event().id(), safeFailureReason(cause)); + } + } + + // 최종 상태 DB 반영 + if (!successIds.isEmpty()) { + repository.markSent(successIds); + } + + if (!failedReasons.isEmpty()) { + repository.markFailedWithReasons(failedReasons); + } + } + + private String safeFailureReason(Throwable ex) { + if (ex == null) { + return "UNKNOWN_ERROR"; + } + + Throwable root = ex; + while (root.getCause() != null) { + root = root.getCause(); + } + + String exceptionName = root.getClass().getSimpleName(); + String message = root.getMessage(); + + String result; + if (message == null || message.isBlank()) { + result = exceptionName; + } else { + result = exceptionName + ": " + message; + } + + if (result.length() > MAX_FAILURE_REASON_LENGTH) { + result = result.substring(0, MAX_FAILURE_REASON_LENGTH); + } + + return result; + } +} diff --git a/src/main/java/com/template/worker/batch/orchestrator/BatchTimeWindowService.java b/src/main/java/com/template/worker/batch/orchestrator/BatchTimeWindowService.java new file mode 100644 index 0000000..f87d49f --- /dev/null +++ b/src/main/java/com/template/worker/batch/orchestrator/BatchTimeWindowService.java @@ -0,0 +1,41 @@ +package com.template.worker.batch.orchestrator; + +import java.time.LocalDateTime; +import java.util.Map; + +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import lombok.RequiredArgsConstructor; + +@Service +@RequiredArgsConstructor +public class BatchTimeWindowService { + + private final NamedParameterJdbcTemplate jdbc; + + public LocalDateTime resolve(String jobName) { + return jdbc.queryForObject( + """ + SELECT last_processed_at + FROM batch_watermark + WHERE job_name = :jobName + """, + Map.of("jobName", jobName), + LocalDateTime.class); + } + + @Transactional + public void update(String jobName, LocalDateTime newFrom) { + jdbc.update( + """ + UPDATE batch_watermark + SET last_processed_at = :newFrom + WHERE job_name = :jobName + """, + Map.of( + "jobName", jobName, + "newFrom", newFrom)); + } +} diff --git a/src/main/java/com/template/worker/batch/orchestrator/UsageOrchestrator.java b/src/main/java/com/template/worker/batch/orchestrator/UsageOrchestrator.java new file mode 100644 index 0000000..dbac5f0 --- /dev/null +++ b/src/main/java/com/template/worker/batch/orchestrator/UsageOrchestrator.java @@ -0,0 +1,81 @@ +package com.template.worker.batch.orchestrator; + +import java.time.LocalDateTime; + +import org.springframework.batch.core.Job; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobParameters; +import org.springframework.batch.core.JobParametersBuilder; +import org.springframework.batch.core.launch.JobLauncher; +import org.springframework.stereotype.Component; + +import lombok.RequiredArgsConstructor; + +@Component +@RequiredArgsConstructor +public class UsageOrchestrator { + + private final JobLauncher jobLauncher; + private final Job usageAggregationJob; + private final Job usageNotificationJob; + private final Job notificationSendJob; + + private final BatchTimeWindowService timeWindowService; + + public void run() throws Exception { + LocalDateTime aggregationStart = LocalDateTime.now(); + LocalDateTime aggFrom = timeWindowService.resolve("usage-aggregation"); + + JobParameters aggregationParams = + new JobParametersBuilder() + .addLocalDateTime("fromTime", aggFrom) + .addLocalDateTime("toTime", aggregationStart) + .addLong("run.id", System.currentTimeMillis()) + .toJobParameters(); + + JobExecution aggregationExec = jobLauncher.run(usageAggregationJob, aggregationParams); + + if (aggregationExec.getStatus().isUnsuccessful()) { + return; + } + timeWindowService.update("usage-aggregation", aggregationStart); + + LocalDateTime notificationStart = LocalDateTime.now(); + + LocalDateTime notificationFrom = timeWindowService.resolve("usage-notification"); + + JobParameters notificationParams = + new JobParametersBuilder() + .addLocalDateTime("fromTime", notificationFrom) + .addLocalDateTime("toTime", notificationStart) + .addLong("run.id", System.currentTimeMillis()) + .toJobParameters(); + + JobExecution notificationExec = jobLauncher.run(usageNotificationJob, notificationParams); + + if (notificationExec.getStatus().isUnsuccessful()) { + return; + } + + timeWindowService.update("usage-notification", notificationStart); + + LocalDateTime sendStart = LocalDateTime.now(); + + LocalDateTime sendFrom = timeWindowService.resolve("notification-send"); + + JobParameters sendParams = + new JobParametersBuilder() + .addLocalDateTime("fromTime", sendFrom) + .addLocalDateTime("toTime", sendStart) + .addLong("run.id", System.currentTimeMillis()) + .toJobParameters(); + + JobExecution sendExec = jobLauncher.run(notificationSendJob, sendParams); + + if (sendExec.getStatus().isUnsuccessful()) { + return; + } + + timeWindowService.update("notification-send", sendStart); + } +} diff --git a/src/main/java/com/template/worker/batch/orchestrator/UsageOrchestratorJobConfig.java b/src/main/java/com/template/worker/batch/orchestrator/UsageOrchestratorJobConfig.java new file mode 100644 index 0000000..8384f0f --- /dev/null +++ b/src/main/java/com/template/worker/batch/orchestrator/UsageOrchestratorJobConfig.java @@ -0,0 +1,41 @@ +package com.template.worker.batch.orchestrator; + +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.job.builder.JobBuilder; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.step.builder.StepBuilder; +import org.springframework.batch.repeat.RepeatStatus; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.transaction.PlatformTransactionManager; + +import lombok.RequiredArgsConstructor; + +@Configuration +@RequiredArgsConstructor +public class UsageOrchestratorJobConfig { + + private final JobRepository jobRepository; + private final PlatformTransactionManager transactionManager; + private final UsageOrchestrator usageOrchestrator; + + @Bean + public Job usageOrchestratorJob() { + return new JobBuilder("usageOrchestratorJob", jobRepository) + .start(usageOrchestratorStep()) + .build(); + } + + @Bean + public Step usageOrchestratorStep() { + return new StepBuilder("usageOrchestratorStep", jobRepository) + .tasklet( + (contribution, chunkContext) -> { + usageOrchestrator.run(); + return RepeatStatus.FINISHED; + }, + transactionManager) + .build(); + } +} diff --git a/src/main/java/com/template/worker/batch/usageaggregate/config/Sqls.java b/src/main/java/com/template/worker/batch/usageaggregate/config/Sqls.java new file mode 100644 index 0000000..c9523ad --- /dev/null +++ b/src/main/java/com/template/worker/batch/usageaggregate/config/Sqls.java @@ -0,0 +1,38 @@ +package com.template.worker.batch.usageaggregate.config; + +public class Sqls { + + private Sqls() {} + + public static final String DAILY_UPSERT = + """ + INSERT INTO usage_summary_daily ( + sub_id, usage_date, total_used_bytes, updated_at + ) + VALUES ( + :subId, :usageDate, :delta, NOW() + ) + ON CONFLICT (sub_id, usage_date) + DO UPDATE SET + total_used_bytes = + usage_summary_daily.total_used_bytes + + EXCLUDED.total_used_bytes, + updated_at = NOW() + """; + + public static final String MONTHLY_UPSERT = + """ + INSERT INTO usage_summary_monthly ( + sub_id, period, total_used_bytes, updated_at + ) + VALUES ( + :subId, :period, :delta, NOW() + ) + ON CONFLICT (sub_id, period) + DO UPDATE SET + total_used_bytes = + usage_summary_monthly.total_used_bytes + + EXCLUDED.total_used_bytes, + updated_at = NOW() + """; +} diff --git a/src/main/java/com/template/worker/batch/usageaggregate/config/UsageAggregationJobConfig.java b/src/main/java/com/template/worker/batch/usageaggregate/config/UsageAggregationJobConfig.java new file mode 100644 index 0000000..23466ba --- /dev/null +++ b/src/main/java/com/template/worker/batch/usageaggregate/config/UsageAggregationJobConfig.java @@ -0,0 +1,84 @@ +package com.template.worker.batch.usageaggregate.config; + +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.configuration.annotation.JobScope; +import org.springframework.batch.core.job.builder.JobBuilder; +import org.springframework.batch.core.launch.support.RunIdIncrementer; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.step.builder.StepBuilder; +import org.springframework.batch.item.database.JdbcCursorItemReader; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.transaction.PlatformTransactionManager; + +import com.template.worker.batch.model.BatchStepMetricsListener; +import com.template.worker.batch.usageaggregate.dto.UsageDailyAggregation; +import com.template.worker.batch.usageaggregate.dto.UsageLogRow; +import com.template.worker.batch.usageaggregate.dto.UsageMonthlyAggregation; +import com.template.worker.batch.usageaggregate.processor.UsageDailyProcessor; +import com.template.worker.batch.usageaggregate.processor.UsageMonthlyProcessor; +import com.template.worker.batch.usageaggregate.writer.UsageSummaryDailyWriter; +import com.template.worker.batch.usageaggregate.writer.UsageSummaryMonthlyWriter; + +import lombok.RequiredArgsConstructor; + +@Configuration +@RequiredArgsConstructor +public class UsageAggregationJobConfig { + + @Value("${spring.batch.jobs.usage.chunk-size}") + private int chunkSize; + + private final JobRepository jobRepository; + private final PlatformTransactionManager txManager; + + @Qualifier("usageLogDailyReader") + private final JdbcCursorItemReader usageLogDailyReader; + + @Qualifier("usageLogMonthlyReader") + private final JdbcCursorItemReader usageLogMonthlyReader; + + private final UsageDailyProcessor usageDailyProcessor; + private final UsageMonthlyProcessor usageMonthlyProcessor; + + private final UsageSummaryDailyWriter usageSummaryDailyWriter; + private final UsageSummaryMonthlyWriter usageSummaryMonthlyWriter; + + private final BatchStepMetricsListener batchStepMetricsListener; + + @Bean + public Job usageAggregationJob() { + return new JobBuilder("usageAggregationJob", jobRepository) + .incrementer(new RunIdIncrementer()) + .start(dailyAggregationStep()) + .next(monthlyAggregationStep()) + .build(); + } + + @JobScope + @Bean + public Step dailyAggregationStep() { + return new StepBuilder("dailyAggregationStep", jobRepository) + .chunk(chunkSize, txManager) + .reader(usageLogDailyReader) + .processor(usageDailyProcessor) + .writer(usageSummaryDailyWriter) + .listener(batchStepMetricsListener) + .build(); + } + + @JobScope + @Bean + public Step monthlyAggregationStep() { + return new StepBuilder("monthlyAggregationStep", jobRepository) + .chunk(chunkSize, txManager) + .reader(usageLogMonthlyReader) + .processor(usageMonthlyProcessor) + .writer(usageSummaryMonthlyWriter) + .listener(batchStepMetricsListener) + .build(); + } +} diff --git a/src/main/java/com/template/worker/batch/usageaggregate/dto/UsageDailyAggregation.java b/src/main/java/com/template/worker/batch/usageaggregate/dto/UsageDailyAggregation.java new file mode 100644 index 0000000..459a791 --- /dev/null +++ b/src/main/java/com/template/worker/batch/usageaggregate/dto/UsageDailyAggregation.java @@ -0,0 +1,3 @@ +package com.template.worker.batch.usageaggregate.dto; + +public record UsageDailyAggregation(Long subId, String usageDate, long deltaBytes) {} diff --git a/src/main/java/com/template/worker/batch/usageaggregate/dto/UsageDailyKey.java b/src/main/java/com/template/worker/batch/usageaggregate/dto/UsageDailyKey.java new file mode 100644 index 0000000..dcdc0bf --- /dev/null +++ b/src/main/java/com/template/worker/batch/usageaggregate/dto/UsageDailyKey.java @@ -0,0 +1,16 @@ +package com.template.worker.batch.usageaggregate.dto; + +import java.util.Objects; + +public record UsageDailyKey(Long subId, String usageDate) { + @Override + public boolean equals(Object oj) { + if (this == oj) { + return true; + } + if (!(oj instanceof UsageDailyKey that)) { + return false; + } + return Objects.equals(subId, that.subId) && Objects.equals(usageDate, that.usageDate); + } +} diff --git a/src/main/java/com/template/worker/batch/usageaggregate/dto/UsageLogRow.java b/src/main/java/com/template/worker/batch/usageaggregate/dto/UsageLogRow.java new file mode 100644 index 0000000..a40f6af --- /dev/null +++ b/src/main/java/com/template/worker/batch/usageaggregate/dto/UsageLogRow.java @@ -0,0 +1,5 @@ +package com.template.worker.batch.usageaggregate.dto; + +import java.time.LocalDateTime; + +public record UsageLogRow(Long subId, Long usedBytes, LocalDateTime eventTime) {} diff --git a/src/main/java/com/template/worker/batch/usageaggregate/dto/UsageMonthlyAggregation.java b/src/main/java/com/template/worker/batch/usageaggregate/dto/UsageMonthlyAggregation.java new file mode 100644 index 0000000..af83c27 --- /dev/null +++ b/src/main/java/com/template/worker/batch/usageaggregate/dto/UsageMonthlyAggregation.java @@ -0,0 +1,3 @@ +package com.template.worker.batch.usageaggregate.dto; + +public record UsageMonthlyAggregation(Long subId, String period, long deltaBytes) {} diff --git a/src/main/java/com/template/worker/batch/usageaggregate/dto/UsageMonthlyKey.java b/src/main/java/com/template/worker/batch/usageaggregate/dto/UsageMonthlyKey.java new file mode 100644 index 0000000..77043a4 --- /dev/null +++ b/src/main/java/com/template/worker/batch/usageaggregate/dto/UsageMonthlyKey.java @@ -0,0 +1,17 @@ +package com.template.worker.batch.usageaggregate.dto; + +import java.util.Objects; + +public record UsageMonthlyKey(Long subId, String period) { + + @Override + public boolean equals(Object oj) { + if (this == oj) { + return true; + } + if (!(oj instanceof UsageMonthlyKey that)) { + return false; + } + return Objects.equals(subId, that.subId) && Objects.equals(period, that.period); + } +} diff --git a/src/main/java/com/template/worker/batch/usageaggregate/processor/UsageDailyProcessor.java b/src/main/java/com/template/worker/batch/usageaggregate/processor/UsageDailyProcessor.java new file mode 100644 index 0000000..c0e5e77 --- /dev/null +++ b/src/main/java/com/template/worker/batch/usageaggregate/processor/UsageDailyProcessor.java @@ -0,0 +1,21 @@ +package com.template.worker.batch.usageaggregate.processor; + +import java.time.format.DateTimeFormatter; + +import org.springframework.batch.item.ItemProcessor; +import org.springframework.stereotype.Component; + +import com.template.worker.batch.usageaggregate.dto.UsageDailyAggregation; +import com.template.worker.batch.usageaggregate.dto.UsageLogRow; + +@Component +public class UsageDailyProcessor implements ItemProcessor { + + private static final DateTimeFormatter DAY_FMT = DateTimeFormatter.ofPattern("yyyyMMdd"); + + @Override + public UsageDailyAggregation process(UsageLogRow log) { + return new UsageDailyAggregation( + log.subId(), log.eventTime().format(DAY_FMT), log.usedBytes()); + } +} diff --git a/src/main/java/com/template/worker/batch/usageaggregate/processor/UsageMonthlyProcessor.java b/src/main/java/com/template/worker/batch/usageaggregate/processor/UsageMonthlyProcessor.java new file mode 100644 index 0000000..d9fb8e1 --- /dev/null +++ b/src/main/java/com/template/worker/batch/usageaggregate/processor/UsageMonthlyProcessor.java @@ -0,0 +1,21 @@ +package com.template.worker.batch.usageaggregate.processor; + +import java.time.format.DateTimeFormatter; + +import org.springframework.batch.item.ItemProcessor; +import org.springframework.stereotype.Component; + +import com.template.worker.batch.usageaggregate.dto.UsageLogRow; +import com.template.worker.batch.usageaggregate.dto.UsageMonthlyAggregation; + +@Component +public class UsageMonthlyProcessor implements ItemProcessor { + + private static final DateTimeFormatter MONTH_FMT = DateTimeFormatter.ofPattern("yyyyMM"); + + @Override + public UsageMonthlyAggregation process(UsageLogRow log) { + return new UsageMonthlyAggregation( + log.subId(), log.eventTime().format(MONTH_FMT), log.usedBytes()); + } +} diff --git a/src/main/java/com/template/worker/batch/usageaggregate/reader/UsageLogDailyReaderConfig.java b/src/main/java/com/template/worker/batch/usageaggregate/reader/UsageLogDailyReaderConfig.java new file mode 100644 index 0000000..37d78fa --- /dev/null +++ b/src/main/java/com/template/worker/batch/usageaggregate/reader/UsageLogDailyReaderConfig.java @@ -0,0 +1,61 @@ +package com.template.worker.batch.usageaggregate.reader; + +import java.time.LocalDateTime; + +import javax.sql.DataSource; + +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.item.database.JdbcCursorItemReader; +import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import com.template.worker.batch.usageaggregate.dto.UsageLogRow; + +@Configuration +public class UsageLogDailyReaderConfig { + + @Value("${spring.batch.jobs.usage.fetch-size}") + private int fetchSize; + + @Bean(name = "usageLogDailyReader") + @StepScope + public JdbcCursorItemReader usageLogDailyReader( + DataSource dataSource, + @Value("#{jobParameters['fromTime']}") LocalDateTime fromTime, + @Value("#{jobParameters['toTime']}") LocalDateTime toTime) { + String sql = + """ + SELECT + ul.sub_id, + ul.used_bytes, + ul.event_time + FROM usage_log ul + JOIN subscription_plan sp + ON sp.sub_id = ul.sub_id + WHERE ul.event_time >= ? + AND ul.event_time < ? + AND sp.allotment_amount = 5120 + ORDER BY ul.id + """; + + return new JdbcCursorItemReaderBuilder() + .name("usageLogDailyReader") + .dataSource(dataSource) + .sql(sql) + .fetchSize(fetchSize) + .preparedStatementSetter( + ps -> { + ps.setObject(1, fromTime); + ps.setObject(2, toTime); + }) + .rowMapper( + (rs, rowNum) -> + new UsageLogRow( + rs.getLong("sub_id"), + rs.getLong("used_bytes"), + rs.getTimestamp("event_time").toLocalDateTime())) + .build(); + } +} diff --git a/src/main/java/com/template/worker/batch/usageaggregate/reader/UsageLogMonthlyReaderConfig.java b/src/main/java/com/template/worker/batch/usageaggregate/reader/UsageLogMonthlyReaderConfig.java new file mode 100644 index 0000000..69339d5 --- /dev/null +++ b/src/main/java/com/template/worker/batch/usageaggregate/reader/UsageLogMonthlyReaderConfig.java @@ -0,0 +1,62 @@ +package com.template.worker.batch.usageaggregate.reader; + +import java.time.LocalDateTime; + +import javax.sql.DataSource; + +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.item.database.JdbcCursorItemReader; +import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import com.template.worker.batch.usageaggregate.dto.UsageLogRow; + +@Configuration +public class UsageLogMonthlyReaderConfig { + + @Value("${spring.batch.jobs.usage.fetch-size}") + private int fetchSize; + + @Bean(name = "usageLogMonthlyReader") + @StepScope + public JdbcCursorItemReader usageLogMonthlyReader( + DataSource dataSource, + @Value("#{jobParameters['fromTime']}") LocalDateTime fromTime, + @Value("#{jobParameters['toTime']}") LocalDateTime toTime) { + String sql = + """ + SELECT + ul.sub_id, + ul.used_bytes, + ul.event_time + FROM usage_log ul + JOIN subscription_plan sp + ON sp.sub_id = ul.sub_id + WHERE ul.event_time >= ? + AND ul.event_time < ? + AND sp.allotment_amount > 0 + AND sp.allotment_amount != 5120 + ORDER BY ul.id + """; + + return new JdbcCursorItemReaderBuilder() + .name("usageLogMonthlyReader") + .dataSource(dataSource) + .sql(sql) + .fetchSize(fetchSize) + .preparedStatementSetter( + ps -> { + ps.setObject(1, fromTime); + ps.setObject(2, toTime); + }) + .rowMapper( + (rs, rowNum) -> + new UsageLogRow( + rs.getLong("sub_id"), + rs.getLong("used_bytes"), + rs.getTimestamp("event_time").toLocalDateTime())) + .build(); + } +} diff --git a/src/main/java/com/template/worker/batch/usageaggregate/writer/UsageSummaryDailyWriter.java b/src/main/java/com/template/worker/batch/usageaggregate/writer/UsageSummaryDailyWriter.java new file mode 100644 index 0000000..2424e1e --- /dev/null +++ b/src/main/java/com/template/worker/batch/usageaggregate/writer/UsageSummaryDailyWriter.java @@ -0,0 +1,54 @@ +package com.template.worker.batch.usageaggregate.writer; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.springframework.batch.item.Chunk; +import org.springframework.batch.item.ItemWriter; +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; +import org.springframework.stereotype.Component; + +import com.template.worker.batch.usageaggregate.config.Sqls; +import com.template.worker.batch.usageaggregate.dto.UsageDailyAggregation; +import com.template.worker.batch.usageaggregate.dto.UsageDailyKey; + +import lombok.RequiredArgsConstructor; + +@Component +@RequiredArgsConstructor +public class UsageSummaryDailyWriter implements ItemWriter { + + private final NamedParameterJdbcTemplate jdbcTemplate; + + @Override + public void write(Chunk chunk) { + Map aggregated = new HashMap<>(); + + for (UsageDailyAggregation item : chunk) { + UsageDailyKey key = new UsageDailyKey(item.subId(), item.usageDate()); + + aggregated.merge(key, item.deltaBytes(), Long::sum); + } + + if (aggregated.isEmpty()) { + return; + } + + String sql = Sqls.DAILY_UPSERT; + + List> params = + aggregated.entrySet().stream() + .map( + e -> { + Map map = new HashMap<>(); + map.put("subId", e.getKey().subId()); + map.put("usageDate", e.getKey().usageDate()); + map.put("delta", e.getValue()); + return map; + }) + .toList(); + + jdbcTemplate.batchUpdate(sql, params.toArray(new Map[0])); + } +} diff --git a/src/main/java/com/template/worker/batch/usageaggregate/writer/UsageSummaryMonthlyWriter.java b/src/main/java/com/template/worker/batch/usageaggregate/writer/UsageSummaryMonthlyWriter.java new file mode 100644 index 0000000..d04f61e --- /dev/null +++ b/src/main/java/com/template/worker/batch/usageaggregate/writer/UsageSummaryMonthlyWriter.java @@ -0,0 +1,55 @@ +package com.template.worker.batch.usageaggregate.writer; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.springframework.batch.item.Chunk; +import org.springframework.batch.item.ItemWriter; +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; +import org.springframework.stereotype.Component; + +import com.template.worker.batch.usageaggregate.config.Sqls; +import com.template.worker.batch.usageaggregate.dto.UsageMonthlyAggregation; +import com.template.worker.batch.usageaggregate.dto.UsageMonthlyKey; + +import lombok.RequiredArgsConstructor; + +@Component +@RequiredArgsConstructor +public class UsageSummaryMonthlyWriter implements ItemWriter { + + private final NamedParameterJdbcTemplate jdbcTemplate; + + @Override + public void write(Chunk chunk) { + + Map aggregated = new HashMap<>(); + + for (UsageMonthlyAggregation item : chunk) { + UsageMonthlyKey key = new UsageMonthlyKey(item.subId(), item.period()); + + aggregated.merge(key, item.deltaBytes(), Long::sum); + } + + if (aggregated.isEmpty()) { + return; + } + + String sql = Sqls.MONTHLY_UPSERT; + + List> params = + aggregated.entrySet().stream() + .map( + entry -> { + Map map = new HashMap<>(); + map.put("subId", entry.getKey().subId()); + map.put("period", entry.getKey().period()); + map.put("delta", entry.getValue()); + return map; + }) + .toList(); + + jdbcTemplate.batchUpdate(sql, params.toArray(new Map[0])); + } +} diff --git a/src/main/java/com/template/worker/batch/usagenotification/config/UsageNotificationJobConfig.java b/src/main/java/com/template/worker/batch/usagenotification/config/UsageNotificationJobConfig.java new file mode 100644 index 0000000..883e663 --- /dev/null +++ b/src/main/java/com/template/worker/batch/usagenotification/config/UsageNotificationJobConfig.java @@ -0,0 +1,77 @@ +package com.template.worker.batch.usagenotification.config; + +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.configuration.annotation.JobScope; +import org.springframework.batch.core.job.builder.JobBuilder; +import org.springframework.batch.core.launch.support.RunIdIncrementer; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.step.builder.StepBuilder; +import org.springframework.batch.item.database.JdbcCursorItemReader; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.transaction.PlatformTransactionManager; + +import com.template.worker.batch.model.BatchStepMetricsListener; +import com.template.worker.batch.usagenotification.dto.UsageNotificationCandidate; +import com.template.worker.batch.usagenotification.dto.UsageNotificationSource; +import com.template.worker.batch.usagenotification.processor.UsageNotificationProcessor; +import com.template.worker.batch.usagenotification.writer.UsageNotificationWriter; + +import lombok.RequiredArgsConstructor; + +@Configuration +@RequiredArgsConstructor +public class UsageNotificationJobConfig { + + @Value("${spring.batch.jobs.usage.chunk-size}") + private int chunkSize; + + private final JobRepository jobRepository; + private final PlatformTransactionManager txManager; + + @Qualifier("usageNotificationMonthlyReader") + private final JdbcCursorItemReader usageNotificationMonthlyReader; + + @Qualifier("usageNotificationDailyReader") + private final JdbcCursorItemReader usageNotificationDailyReader; + + private final UsageNotificationProcessor usageNotificationProcessor; + private final UsageNotificationWriter usageNotificationWriter; + private final BatchStepMetricsListener batchStepMetricsListener; + + @Bean + public Job usageNotificationJob() { + return new JobBuilder("usageNotificationJob", jobRepository) + .incrementer(new RunIdIncrementer()) + .start(usageNotificationMonthlyStep()) + .next(usageNotificationDailyStep()) + .build(); + } + + @Bean + @JobScope + public Step usageNotificationMonthlyStep() { + return new StepBuilder("usageNotificationMonthlyStep", jobRepository) + .chunk(chunkSize, txManager) + .reader(usageNotificationMonthlyReader) + .processor(usageNotificationProcessor) + .writer(usageNotificationWriter) + .listener(batchStepMetricsListener) + .build(); + } + + @Bean + @JobScope + public Step usageNotificationDailyStep() { + return new StepBuilder("usageNotificationDailyStep", jobRepository) + .chunk(chunkSize, txManager) + .reader(usageNotificationDailyReader) + .processor(usageNotificationProcessor) + .writer(usageNotificationWriter) + .listener(batchStepMetricsListener) + .build(); + } +} diff --git a/src/main/java/com/template/worker/batch/usagenotification/config/util/UsageNotificationPolicy.java b/src/main/java/com/template/worker/batch/usagenotification/config/util/UsageNotificationPolicy.java new file mode 100644 index 0000000..119d043 --- /dev/null +++ b/src/main/java/com/template/worker/batch/usagenotification/config/util/UsageNotificationPolicy.java @@ -0,0 +1,49 @@ +package com.template.worker.batch.usagenotification.config.util; + +import java.util.Optional; + +import org.springframework.stereotype.Component; + +import com.template.worker.batch.usagenotification.dto.UsageNotificationCandidate; +import com.template.worker.batch.usagenotification.dto.UsageNotificationSource; + +@Component +public class UsageNotificationPolicy { + + public Optional evaluate(UsageNotificationSource source) { + if (source.allotmentAmount() <= 0) { + return Optional.empty(); + } + + long totalUsedMb = source.totalUsedBytes() / (1024 * 1024); + long allotmentMb = source.allotmentAmount(); + + double percent = (double) (totalUsedMb * 100L) / allotmentMb; + + return decideThreshold((int) percent) + .map( + threshold -> + new UsageNotificationCandidate( + source.subId(), + source.period(), + source.unit(), + source.planName(), + threshold, + percent, + totalUsedMb, + allotmentMb)); + } + + private Optional decideThreshold(int percent) { + if (percent >= 100) { + return Optional.of(100); + } + if (percent >= 80) { + return Optional.of(80); + } + if (percent >= 50) { + return Optional.of(50); + } + return Optional.empty(); + } +} diff --git a/src/main/java/com/template/worker/batch/usagenotification/dto/UsageNotificationCandidate.java b/src/main/java/com/template/worker/batch/usagenotification/dto/UsageNotificationCandidate.java new file mode 100644 index 0000000..379d835 --- /dev/null +++ b/src/main/java/com/template/worker/batch/usagenotification/dto/UsageNotificationCandidate.java @@ -0,0 +1,11 @@ +package com.template.worker.batch.usagenotification.dto; + +public record UsageNotificationCandidate( + Long subId, + String period, + String unit, + String planName, + int threshold, + double percent, + long totalUsedMb, + long allotmentMb) {} diff --git a/src/main/java/com/template/worker/batch/usagenotification/dto/UsageNotificationOutboxRow.java b/src/main/java/com/template/worker/batch/usagenotification/dto/UsageNotificationOutboxRow.java new file mode 100644 index 0000000..fbaf299 --- /dev/null +++ b/src/main/java/com/template/worker/batch/usagenotification/dto/UsageNotificationOutboxRow.java @@ -0,0 +1,16 @@ +package com.template.worker.batch.usagenotification.dto; + +import java.time.LocalDateTime; + +public record UsageNotificationOutboxRow( + Long id, + Long subId, + String period, + String planName, + int threshold, + double percent, + long totalUsedMb, + long allotmentMb, + String phoneNumber, + String email, + LocalDateTime createdAt) {} diff --git a/src/main/java/com/template/worker/batch/usagenotification/dto/UsageNotificationSource.java b/src/main/java/com/template/worker/batch/usagenotification/dto/UsageNotificationSource.java new file mode 100644 index 0000000..173d99c --- /dev/null +++ b/src/main/java/com/template/worker/batch/usagenotification/dto/UsageNotificationSource.java @@ -0,0 +1,9 @@ +package com.template.worker.batch.usagenotification.dto; + +public record UsageNotificationSource( + Long subId, + String period, + String unit, + String planName, + long totalUsedBytes, + long allotmentAmount) {} diff --git a/src/main/java/com/template/worker/batch/usagenotification/processor/UsageNotificationProcessor.java b/src/main/java/com/template/worker/batch/usagenotification/processor/UsageNotificationProcessor.java new file mode 100644 index 0000000..91a5ac5 --- /dev/null +++ b/src/main/java/com/template/worker/batch/usagenotification/processor/UsageNotificationProcessor.java @@ -0,0 +1,24 @@ +package com.template.worker.batch.usagenotification.processor; + +import org.springframework.batch.item.ItemProcessor; +import org.springframework.stereotype.Component; + +import com.template.worker.batch.usagenotification.config.util.UsageNotificationPolicy; +import com.template.worker.batch.usagenotification.dto.UsageNotificationCandidate; +import com.template.worker.batch.usagenotification.dto.UsageNotificationSource; + +import lombok.RequiredArgsConstructor; + +@Component +@RequiredArgsConstructor +public class UsageNotificationProcessor + implements ItemProcessor { + + private final UsageNotificationPolicy policy; + + @Override + public UsageNotificationCandidate process(UsageNotificationSource source) { + + return policy.evaluate(source).orElse(null); + } +} diff --git a/src/main/java/com/template/worker/batch/usagenotification/reader/UsageNotificationDailyReaderConfig.java b/src/main/java/com/template/worker/batch/usagenotification/reader/UsageNotificationDailyReaderConfig.java new file mode 100644 index 0000000..605d715 --- /dev/null +++ b/src/main/java/com/template/worker/batch/usagenotification/reader/UsageNotificationDailyReaderConfig.java @@ -0,0 +1,68 @@ +package com.template.worker.batch.usagenotification.reader; + +import java.time.LocalDateTime; + +import javax.sql.DataSource; + +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.item.database.JdbcCursorItemReader; +import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import com.template.worker.batch.usagenotification.dto.UsageNotificationSource; + +@Configuration +public class UsageNotificationDailyReaderConfig { + + @Value("${spring.batch.jobs.usage.fetch-size}") + private int fetchSize; + + @Bean(name = "usageNotificationDailyReader") + @StepScope + public JdbcCursorItemReader usageNotificationDailyReader( + DataSource dataSource, + @Value("#{jobParameters['fromTime']}") LocalDateTime fromTime, + @Value("#{jobParameters['toTime']}") LocalDateTime toTime) { + + String sql = + """ + SELECT + usd.sub_id, + usd.usage_date AS period, + 'DAY' AS unit, + sp.plan_name, + usd.total_used_bytes, + sp.allotment_amount + FROM usage_summary_daily usd + JOIN subscription_plan sp + ON sp.sub_id = usd.sub_id + WHERE usd.updated_at >= ? + AND usd.updated_at < ? + AND sp.allotment_amount = 5120 + ORDER BY usd.sub_id + """; + + return new JdbcCursorItemReaderBuilder() + .name("usageNotificationDailyReader") + .dataSource(dataSource) + .sql(sql) + .fetchSize(fetchSize) + .preparedStatementSetter( + ps -> { + ps.setObject(1, fromTime); + ps.setObject(2, toTime); + }) + .rowMapper( + (rs, rowNum) -> + new UsageNotificationSource( + rs.getLong("sub_id"), + rs.getString("period"), + rs.getString("unit"), + rs.getString("plan_name"), + rs.getLong("total_used_bytes"), + rs.getLong("allotment_amount"))) + .build(); + } +} diff --git a/src/main/java/com/template/worker/batch/usagenotification/reader/UsageNotificationMonthlyReaderConfig.java b/src/main/java/com/template/worker/batch/usagenotification/reader/UsageNotificationMonthlyReaderConfig.java new file mode 100644 index 0000000..bcc63c7 --- /dev/null +++ b/src/main/java/com/template/worker/batch/usagenotification/reader/UsageNotificationMonthlyReaderConfig.java @@ -0,0 +1,69 @@ +package com.template.worker.batch.usagenotification.reader; + +import java.time.LocalDateTime; + +import javax.sql.DataSource; + +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.item.database.JdbcCursorItemReader; +import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import com.template.worker.batch.usagenotification.dto.UsageNotificationSource; + +@Configuration +public class UsageNotificationMonthlyReaderConfig { + + @Value("${spring.batch.jobs.usage.fetch-size}") + private int fetchSize; + + @Bean(name = "usageNotificationMonthlyReader") + @StepScope + public JdbcCursorItemReader usageNotificationMonthlyReader( + DataSource dataSource, + @Value("#{jobParameters['fromTime']}") LocalDateTime fromTime, + @Value("#{jobParameters['toTime']}") LocalDateTime toTime) { + + String sql = + """ + SELECT + usm.sub_id, + usm.period, + 'MONTH' AS unit, + sp.plan_name, + usm.total_used_bytes, + sp.allotment_amount + FROM usage_summary_monthly usm + JOIN subscription_plan sp + ON sp.sub_id = usm.sub_id + WHERE usm.updated_at >= ? + AND usm.updated_at < ? + AND sp.allotment_amount > 0 + AND sp.allotment_amount != 5120 + ORDER BY usm.sub_id + """; + + return new JdbcCursorItemReaderBuilder() + .name("usageNotificationMonthlyReader") + .dataSource(dataSource) + .sql(sql) + .fetchSize(fetchSize) + .preparedStatementSetter( + ps -> { + ps.setObject(1, fromTime); + ps.setObject(2, toTime); + }) + .rowMapper( + (rs, rowNum) -> + new UsageNotificationSource( + rs.getLong("sub_id"), + rs.getString("period"), + rs.getString("unit"), + rs.getString("plan_name"), + rs.getLong("total_used_bytes"), + rs.getLong("allotment_amount"))) + .build(); + } +} diff --git a/src/main/java/com/template/worker/batch/usagenotification/writer/UsageNotificationWriter.java b/src/main/java/com/template/worker/batch/usagenotification/writer/UsageNotificationWriter.java new file mode 100644 index 0000000..b75e0b2 --- /dev/null +++ b/src/main/java/com/template/worker/batch/usagenotification/writer/UsageNotificationWriter.java @@ -0,0 +1,68 @@ +package com.template.worker.batch.usagenotification.writer; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.springframework.batch.item.Chunk; +import org.springframework.batch.item.ItemWriter; +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import com.template.worker.batch.usagenotification.dto.UsageNotificationCandidate; + +import lombok.RequiredArgsConstructor; + +@Component +@RequiredArgsConstructor +public class UsageNotificationWriter implements ItemWriter { + + private final NamedParameterJdbcTemplate jdbcTemplate; + + @Transactional + @Override + public void write(Chunk chunk) { + + if (chunk.isEmpty()) { + return; + } + + String sql = + """ + INSERT INTO usage_notification_outbox + (sub_id, period, + plan_name, unit, + threshold, percent, + total_used_mb, allotment_mb, + status, created_at) + VALUES + (:subId, :period, + :planName, :unit, + :threshold, :percent, + :totalUsedMb, :allotmentMb, + 'PENDING', NOW()) + ON CONFLICT (sub_id, period, unit, threshold) + DO NOTHING + """; + + List> paramList = + chunk.getItems().stream() + .map( + c -> { + Map map = new HashMap<>(); + map.put("subId", c.subId()); + map.put("period", c.period()); + map.put("planName", c.planName()); + map.put("unit", c.unit()); + map.put("threshold", c.threshold()); + map.put("percent", c.percent()); + map.put("totalUsedMb", c.totalUsedMb()); + map.put("allotmentMb", c.allotmentMb()); + return map; + }) + .toList(); + + jdbcTemplate.batchUpdate(sql, paramList.toArray(new Map[0])); + } +} diff --git a/src/main/java/com/template/worker/global/runner/BatchJobRunner.java b/src/main/java/com/template/worker/global/runner/BatchJobRunner.java index c92cae3..65e948b 100644 --- a/src/main/java/com/template/worker/global/runner/BatchJobRunner.java +++ b/src/main/java/com/template/worker/global/runner/BatchJobRunner.java @@ -5,7 +5,6 @@ import java.util.Optional; import org.springframework.batch.core.Job; -import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.configuration.JobRegistry; import org.springframework.batch.core.explore.JobExplorer; @@ -13,6 +12,7 @@ import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; +import com.template.worker.batch.orchestrator.UsageOrchestrator; import com.template.worker.global.launcher.BatchJobLauncher; import lombok.RequiredArgsConstructor; @@ -24,6 +24,7 @@ public class BatchJobRunner implements ApplicationRunner { private final BatchJobLauncher batchJobLauncher; + private final UsageOrchestrator usageOrchestrator; private final JobRegistry jobRegistry; private final JobExplorer jobExplorer; @@ -49,13 +50,21 @@ public void run(ApplicationArguments args) throws Exception { Job job = jobRegistry.getJob(jobName); - JobParameters params = - new JobParametersBuilder(jobExplorer) - .addString("invMonth", invMonth) - .addLong("run.id", System.currentTimeMillis()) - .toJobParameters(); + JobParametersBuilder builder = + new JobParametersBuilder(jobExplorer).addLong("run.id", System.currentTimeMillis()); - log.info("▶ BATCH START job={} invMonth={}", jobName, invMonth); - batchJobLauncher.launch(job, params); + // 1️⃣ Orchestrator Job (파라미터 없음) + if ("usageOrchestratorJob".equals(jobName)) { + + log.info("▶ BATCH START (orchestrator) job={}", jobName); + usageOrchestrator.run(); + + } else { + + builder.addString("invMonth", invMonth); + + log.info("▶ BATCH START (invMonth) job={} invMonth={}", jobName, invMonth); + batchJobLauncher.launch(job, builder.toJobParameters()); + } } } diff --git a/src/main/java/com/template/worker/jobs/invoicesend/job/InvoiceSendJobConfig.java b/src/main/java/com/template/worker/jobs/invoicesend/job/InvoiceSendJobConfig.java index 0009988..8122f8b 100644 --- a/src/main/java/com/template/worker/jobs/invoicesend/job/InvoiceSendJobConfig.java +++ b/src/main/java/com/template/worker/jobs/invoicesend/job/InvoiceSendJobConfig.java @@ -7,10 +7,12 @@ import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.step.builder.StepBuilder; import org.springframework.batch.item.database.JdbcCursorItemReader; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.transaction.PlatformTransactionManager; +import com.template.worker.global.listener.JobResultListener; import com.template.worker.jobs.invoicesend.model.InvoiceAggregateRecord; import com.template.worker.jobs.invoicesend.model.InvoiceSendRecord; import com.template.worker.jobs.invoicesend.processor.InvoiceSendProcessor; @@ -22,13 +24,15 @@ @RequiredArgsConstructor public class InvoiceSendJobConfig { - private static final int CHUNK_SIZE = 10000; + @Value("${spring.batch.jobs.invoice-send.chunk-size}") + private int chunkSize; private final JobRepository jobRepository; private final PlatformTransactionManager transactionManager; private final InvoiceSendProcessor invoiceSendProcessor; private final InvoiceSendWriter invoiceSendWriter; + private final JobResultListener jobResultListener; @Bean public Job invoiceSendJob(Step invoiceSendStep) { @@ -41,10 +45,11 @@ public Job invoiceSendJob(Step invoiceSendStep) { @Bean public Step invoiceSendStep(JdbcCursorItemReader invoiceJoinReader) { return new StepBuilder("invoiceSendStep", jobRepository) - .chunk(CHUNK_SIZE, transactionManager) + .chunk(chunkSize, transactionManager) .reader(invoiceJoinReader) .processor(invoiceSendProcessor) .writer(invoiceSendWriter) + .listener(jobResultListener) .build(); } } diff --git a/src/main/java/com/template/worker/jobs/invoicesend/model/InvoiceAggregateRecord.java b/src/main/java/com/template/worker/jobs/invoicesend/model/InvoiceAggregateRecord.java index 81b5cb5..4cf6723 100644 --- a/src/main/java/com/template/worker/jobs/invoicesend/model/InvoiceAggregateRecord.java +++ b/src/main/java/com/template/worker/jobs/invoicesend/model/InvoiceAggregateRecord.java @@ -4,8 +4,9 @@ import java.util.List; public record InvoiceAggregateRecord( + Long templateGroupId, + Long eventId, Long invId, - Long invNo, Long subId, String invMonth, String phoneEnc, diff --git a/src/main/java/com/template/worker/jobs/invoicesend/processor/InvoiceSendProcessor.java b/src/main/java/com/template/worker/jobs/invoicesend/processor/InvoiceSendProcessor.java index 7d3c976..9805d9e 100644 --- a/src/main/java/com/template/worker/jobs/invoicesend/processor/InvoiceSendProcessor.java +++ b/src/main/java/com/template/worker/jobs/invoicesend/processor/InvoiceSendProcessor.java @@ -65,8 +65,9 @@ public InvoiceAggregateRecord flushLast() { private InvoiceAggregateRecord buildAggregate() { return new InvoiceAggregateRecord( - currentInvoice.invId(), + 2L, currentInvoice.invNo(), + currentInvoice.invId(), currentInvoice.subId(), currentInvoice.invMonth(), currentInvoice.phone_enc(), diff --git a/src/main/java/com/template/worker/jobs/invoicesend/reader/InvoiceSendReader.java b/src/main/java/com/template/worker/jobs/invoicesend/reader/InvoiceSendReader.java index b359dbc..5ae8fe8 100644 --- a/src/main/java/com/template/worker/jobs/invoicesend/reader/InvoiceSendReader.java +++ b/src/main/java/com/template/worker/jobs/invoicesend/reader/InvoiceSendReader.java @@ -17,6 +17,9 @@ @RequiredArgsConstructor public class InvoiceSendReader { + @Value("${spring.batch.jobs.invoice-send.fetch-size}") + private int fetchSize; + @Bean @StepScope public JdbcCursorItemReader invoiceJoinReader( @@ -55,7 +58,7 @@ public JdbcCursorItemReader invoiceJoinReader( ps.setString(1, invMonth); ps.setString(2, invMonth); }); - reader.setFetchSize(1000); + reader.setFetchSize(fetchSize); reader.setRowMapper(new DataClassRowMapper<>(InvoiceSendRecord.class)); diff --git a/src/main/resources/batch.yml b/src/main/resources/batch.yml index 2a738d6..7aa5a9f 100644 --- a/src/main/resources/batch.yml +++ b/src/main/resources/batch.yml @@ -33,4 +33,12 @@ spring: thread: core-pool-size: 4 max-pool-size: 6 - queue-capacity: 1000 \ No newline at end of file + queue-capacity: 1000 + + invoice-send: + chunk-size: 10000 + fetch-size: 1000 + + usage: + chunk-size: 10000 + fetch-size: 1000 \ No newline at end of file