-
Notifications
You must be signed in to change notification settings - Fork 1
[UPLUS-145] usage Batch 로직 batch-core repository로 이관 작업 진행 #13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
e3e3c34
f4d7a37
213f62c
39b8715
fa2f63c
211168f
15ac803
a31ba15
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,83 @@ | ||
| package com.template.worker.batch.model; | ||
|
|
||
| import java.math.BigDecimal; | ||
| import java.math.RoundingMode; | ||
| import java.time.Duration; | ||
| import java.time.LocalDateTime; | ||
| import java.time.ZoneId; | ||
| import java.util.Objects; | ||
|
|
||
| import org.springframework.batch.core.ExitStatus; | ||
| import org.springframework.batch.core.JobExecution; | ||
| import org.springframework.batch.core.StepExecution; | ||
| import org.springframework.batch.core.StepExecutionListener; | ||
| import org.springframework.stereotype.Component; | ||
|
|
||
| import com.fasterxml.jackson.databind.ObjectMapper; | ||
| import com.template.worker.batch.model.entity.BatchExecutionReport; | ||
| import com.template.worker.batch.model.repository.BatchExecutionReportRepository; | ||
|
|
||
| import lombok.RequiredArgsConstructor; | ||
|
|
||
| @Component | ||
| @RequiredArgsConstructor | ||
| public class BatchStepMetricsListener implements StepExecutionListener { | ||
| private final BatchExecutionReportRepository reportRepository; | ||
| private final ObjectMapper objectMapper; | ||
|
|
||
| @Override | ||
| public ExitStatus afterStep(StepExecution stepExecution) { | ||
|
|
||
| JobExecution jobExecution = stepExecution.getJobExecution(); | ||
|
|
||
| LocalDateTime start = | ||
| Objects.requireNonNull(stepExecution.getStartTime()) | ||
| .atZone(ZoneId.of("Asia/Seoul")) | ||
| .toLocalDateTime(); | ||
|
|
||
| LocalDateTime end = | ||
| Objects.requireNonNull(stepExecution.getEndTime()) | ||
| .atZone(ZoneId.of("Asia/Seoul")) | ||
| .toLocalDateTime(); | ||
|
|
||
| long durationMs = Duration.between(start, end).toMillis(); | ||
|
|
||
| long readCount = stepExecution.getReadCount(); | ||
| BigDecimal tps = | ||
| durationMs > 0 | ||
| ? BigDecimal.valueOf(readCount) | ||
| .multiply(BigDecimal.valueOf(1000)) | ||
| .divide(BigDecimal.valueOf(durationMs), 2, RoundingMode.HALF_UP) | ||
| : BigDecimal.ZERO; | ||
|
|
||
| String paramsJson; | ||
| try { | ||
| paramsJson = | ||
| objectMapper.writeValueAsString( | ||
| jobExecution.getJobParameters().getParameters()); | ||
| } catch (Exception e) { | ||
| paramsJson = "{}"; | ||
|
Comment on lines
+58
to
+59
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| } | ||
|
|
||
| reportRepository.save( | ||
| BatchExecutionReport.builder() | ||
| .jobName(jobExecution.getJobInstance().getJobName()) | ||
| .stepName(stepExecution.getStepName()) | ||
| .status(stepExecution.getStatus().toString()) | ||
| .startedAt(start) | ||
| .endedAt(end) | ||
| .durationMs(durationMs) | ||
| .readCount(readCount) | ||
| .writeCount(stepExecution.getWriteCount()) | ||
| .filterCount(stepExecution.getFilterCount()) | ||
| .skipCount(stepExecution.getSkipCount()) | ||
| .commitCount(stepExecution.getCommitCount()) | ||
| .rollbackCount(stepExecution.getRollbackCount()) | ||
| .tps(tps) | ||
| .jobParameters(paramsJson) | ||
| .createdAt(LocalDateTime.now()) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| .build()); | ||
|
|
||
| return stepExecution.getExitStatus(); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,52 @@ | ||
| package com.template.worker.batch.model.entity; | ||
|
|
||
| import java.math.BigDecimal; | ||
| import java.time.LocalDateTime; | ||
|
|
||
| import jakarta.persistence.Column; | ||
| import jakarta.persistence.Entity; | ||
| import jakarta.persistence.GeneratedValue; | ||
| import jakarta.persistence.GenerationType; | ||
| import jakarta.persistence.Id; | ||
| import jakarta.persistence.Table; | ||
|
|
||
| import lombok.AccessLevel; | ||
| import lombok.AllArgsConstructor; | ||
| import lombok.Builder; | ||
| import lombok.Getter; | ||
| import lombok.NoArgsConstructor; | ||
|
|
||
| @Entity | ||
| @Table(name = "batch_execution_report") | ||
| @Getter | ||
| @NoArgsConstructor(access = AccessLevel.PROTECTED) | ||
| @AllArgsConstructor | ||
| @Builder | ||
| public class BatchExecutionReport { | ||
| @Id | ||
| @GeneratedValue(strategy = GenerationType.IDENTITY) | ||
| private Long id; | ||
|
|
||
| private String jobName; | ||
| private String stepName; | ||
|
|
||
| private String status; | ||
|
|
||
| private LocalDateTime startedAt; | ||
| private LocalDateTime endedAt; | ||
| private long durationMs; | ||
|
|
||
| private long readCount; | ||
| private long writeCount; | ||
| private long filterCount; | ||
| private long skipCount; | ||
| private long commitCount; | ||
| private long rollbackCount; | ||
|
|
||
| private BigDecimal tps; | ||
|
|
||
| @Column(columnDefinition = "jsonb") | ||
| private String jobParameters; | ||
|
|
||
| private LocalDateTime createdAt; | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,7 @@ | ||
| package com.template.worker.batch.model.repository; | ||
|
|
||
| import org.springframework.data.jpa.repository.JpaRepository; | ||
|
|
||
| import com.template.worker.batch.model.entity.BatchExecutionReport; | ||
|
|
||
| public interface BatchExecutionReportRepository extends JpaRepository<BatchExecutionReport, Long> {} |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,46 @@ | ||
| package com.template.worker.batch.model.repository; | ||
|
|
||
| import java.util.List; | ||
| import java.util.Map; | ||
|
|
||
| import org.springframework.jdbc.core.JdbcTemplate; | ||
| import org.springframework.stereotype.Repository; | ||
|
|
||
| import lombok.RequiredArgsConstructor; | ||
|
|
||
| @Repository | ||
| @RequiredArgsConstructor | ||
| public class UsageNotificationOutboxRepository { | ||
|
|
||
| private final JdbcTemplate jdbcTemplate; | ||
|
|
||
| public void markProcessing(List<Long> ids) { | ||
| jdbcTemplate.batchUpdate( | ||
| "UPDATE usage_notification_outbox SET status = 'PROCESSING' WHERE id = ?", | ||
| ids, | ||
| ids.size(), | ||
| (ps, id) -> ps.setLong(1, id)); | ||
| } | ||
|
|
||
| public void markSent(List<Long> ids) { | ||
| jdbcTemplate.batchUpdate( | ||
| "UPDATE usage_notification_outbox SET status = 'SENT', sent_at = now() WHERE id =" | ||
| + " ?", | ||
|
Comment on lines
+27
to
+28
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| ids, | ||
| ids.size(), | ||
| (ps, id) -> ps.setLong(1, id)); | ||
| } | ||
|
|
||
| public void markFailedWithReasons(Map<Long, String> reasons) { | ||
| jdbcTemplate.batchUpdate( | ||
| "UPDATE usage_notification_outbox " | ||
| + "SET status = 'FAILED', failure_reason = ? " | ||
| + "WHERE id = ?", | ||
| reasons.entrySet(), | ||
| reasons.size(), | ||
| (ps, entry) -> { | ||
| ps.setString(1, entry.getValue()); | ||
| ps.setLong(2, entry.getKey()); | ||
| }); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,55 @@ | ||
| package com.template.worker.batch.notificationsend.config; | ||
|
|
||
| import org.springframework.batch.core.Job; | ||
| import org.springframework.batch.core.Step; | ||
| import org.springframework.batch.core.configuration.annotation.JobScope; | ||
| import org.springframework.batch.core.job.builder.JobBuilder; | ||
| import org.springframework.batch.core.launch.support.RunIdIncrementer; | ||
| import org.springframework.batch.core.repository.JobRepository; | ||
| import org.springframework.batch.core.step.builder.StepBuilder; | ||
| import org.springframework.batch.item.database.JdbcCursorItemReader; | ||
| import org.springframework.beans.factory.annotation.Value; | ||
| import org.springframework.context.annotation.Bean; | ||
| import org.springframework.context.annotation.Configuration; | ||
| import org.springframework.transaction.PlatformTransactionManager; | ||
|
|
||
| import com.template.worker.batch.notificationsend.dto.NotificationMessage; | ||
| import com.template.worker.batch.notificationsend.processor.NotificationSendProcessor; | ||
| import com.template.worker.batch.notificationsend.writer.NotificationSendWriter; | ||
| import com.template.worker.batch.usagenotification.dto.UsageNotificationOutboxRow; | ||
|
|
||
| import lombok.RequiredArgsConstructor; | ||
|
|
||
| @Configuration | ||
| @RequiredArgsConstructor | ||
| public class NotificationSendJobConfig { | ||
|
|
||
| @Value("${spring.batch.jobs.usage.chunk-size}") | ||
| private int chunkSize; | ||
|
|
||
| private final JobRepository jobRepository; | ||
| private final PlatformTransactionManager txManager; | ||
|
|
||
| private final JdbcCursorItemReader<UsageNotificationOutboxRow> outboxReader; | ||
| private final NotificationSendProcessor notificationSendProcessor; | ||
| private final NotificationSendWriter notificationSendWriter; | ||
|
|
||
| @Bean | ||
| public Job notificationSendJob() { | ||
| return new JobBuilder("notificationSendJob", jobRepository) | ||
| .incrementer(new RunIdIncrementer()) | ||
| .start(notificationSendStep()) | ||
| .build(); | ||
| } | ||
|
|
||
| @JobScope | ||
| @Bean | ||
| public Step notificationSendStep() { | ||
| return new StepBuilder("notificationSendStep", jobRepository) | ||
| .<UsageNotificationOutboxRow, NotificationMessage>chunk(chunkSize, txManager) | ||
| .reader(outboxReader) | ||
| .processor(notificationSendProcessor) | ||
| .writer(notificationSendWriter) | ||
| .build(); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| package com.template.worker.batch.notificationsend.dto; | ||
|
|
||
| import java.util.Map; | ||
| import java.util.UUID; | ||
|
|
||
| public record NotificationMessage( | ||
| UUID eventId, | ||
| Long id, | ||
| Long templateGroupId, | ||
| Map<String, Object> subscriptionInfo, | ||
| Map<String, Object> variables) {} |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| package com.template.worker.batch.notificationsend.dto; | ||
|
|
||
| import java.util.concurrent.CompletableFuture; | ||
|
|
||
| import org.springframework.kafka.support.SendResult; | ||
|
|
||
| public record NotificationSendTask( | ||
| NotificationMessage event, CompletableFuture<SendResult<String, String>> future) {} |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,40 @@ | ||
| package com.template.worker.batch.notificationsend.processor; | ||
|
|
||
| import java.util.Map; | ||
| import java.util.UUID; | ||
|
|
||
| import org.springframework.batch.item.ItemProcessor; | ||
| import org.springframework.stereotype.Component; | ||
|
|
||
| import com.template.worker.batch.notificationsend.dto.NotificationMessage; | ||
| import com.template.worker.batch.usagenotification.dto.UsageNotificationOutboxRow; | ||
|
|
||
| import lombok.extern.slf4j.Slf4j; | ||
|
|
||
| @Component | ||
| @Slf4j | ||
| public class NotificationSendProcessor | ||
| implements ItemProcessor<UsageNotificationOutboxRow, NotificationMessage> { | ||
|
|
||
| @Override | ||
| public NotificationMessage process(UsageNotificationOutboxRow item) { | ||
|
|
||
| return new NotificationMessage( | ||
| UUID.randomUUID(), | ||
| item.id(), | ||
| 101L, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| Map.of( | ||
| "subId", item.subId(), | ||
| "phoneNumber", item.phoneNumber(), | ||
| "email", item.email()), | ||
| Map.of( | ||
| "period", item.period(), | ||
| "threshold", item.threshold(), | ||
| "percent", item.percent(), | ||
| "totalUsedMb", item.totalUsedMb(), | ||
| "allotmentMb", item.allotmentMb(), | ||
| "phoneNumber", item.phoneNumber(), | ||
| "email", item.email(), | ||
| "createdAt", item.createdAt())); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,67 @@ | ||
| package com.template.worker.batch.notificationsend.reader; | ||
|
|
||
| import javax.sql.DataSource; | ||
|
|
||
| import org.springframework.batch.item.database.JdbcCursorItemReader; | ||
| import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder; | ||
| import org.springframework.beans.factory.annotation.Value; | ||
| import org.springframework.context.annotation.Bean; | ||
| import org.springframework.context.annotation.Configuration; | ||
|
|
||
| import com.template.worker.batch.usagenotification.dto.UsageNotificationOutboxRow; | ||
|
|
||
| @Configuration | ||
| public class NotificationSendReaderConfig { | ||
|
|
||
| @Value("${spring.batch.jobs.usage.fetch-size}") | ||
| private int fetchSize; | ||
|
|
||
| @Bean(name = "notificationSendOutboxReader") | ||
| public JdbcCursorItemReader<UsageNotificationOutboxRow> notificationSendOutboxReader( | ||
| DataSource dataSource) { | ||
|
|
||
| String sql = | ||
| """ | ||
| SELECT | ||
| o.id, | ||
| o.sub_id, | ||
| o.period, | ||
| o.plan_name, | ||
| o.threshold, | ||
| o.percent, | ||
| o.total_used_mb, | ||
| o.allotment_mb, | ||
| s.phone_number, | ||
| c.email_enc, | ||
| o.created_at | ||
| FROM usage_notification_outbox o | ||
| JOIN subscription s | ||
| ON s.sub_id = o.sub_id | ||
| JOIN customer c | ||
| ON c.customer_id = s.customer_id | ||
| WHERE o.status = 'PENDING' | ||
| ORDER BY o.id | ||
| """; | ||
|
|
||
| return new JdbcCursorItemReaderBuilder<UsageNotificationOutboxRow>() | ||
| .name("notificationSendOutboxReader") | ||
| .dataSource(dataSource) | ||
| .sql(sql) | ||
| .fetchSize(fetchSize) | ||
| .rowMapper( | ||
| (rs, rowNum) -> | ||
| new UsageNotificationOutboxRow( | ||
| rs.getLong("id"), | ||
| rs.getLong("sub_id"), | ||
| rs.getString("period"), | ||
| rs.getString("plan_name"), | ||
| rs.getInt("threshold"), | ||
| rs.getDouble("percent"), | ||
| rs.getLong("total_used_mb"), | ||
| rs.getLong("allotment_mb"), | ||
| rs.getString("phone_number"), | ||
| rs.getString("email_enc"), | ||
| rs.getTimestamp("created_at").toLocalDateTime())) | ||
| .build(); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
하드코딩된
Asia/Seoul시간대는 다른 환경이나 지역에서 애플리케이션을 실행할 때 문제를 일으킬 수 있습니다.ZoneId를 설정 파일에서 주입받거나,ZoneId.of("UTC")와 같이 표준 시간대를 사용하는 것을 권장합니다. 이를 통해 코드의 유연성과 이식성을 높일 수 있습니다.