Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ dependencies {
implementation 'org.springframework.boot:spring-boot-starter-data-jdbc'
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.boot:spring-boot-starter-actuator'
implementation 'org.springframework.boot:spring-boot-starter-data-jpa'

// Monitoring & Observability - OpenTelemetry (traces, logs)
implementation 'io.opentelemetry.instrumentation:opentelemetry-spring-boot-starter'
Expand Down
Empty file modified gradlew
100644 → 100755
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package com.template.worker.batch.model;

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Objects;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.stereotype.Component;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.template.worker.batch.model.entity.BatchExecutionReport;
import com.template.worker.batch.model.repository.BatchExecutionReportRepository;

import lombok.RequiredArgsConstructor;

@Component
@RequiredArgsConstructor
public class BatchStepMetricsListener implements StepExecutionListener {
private final BatchExecutionReportRepository reportRepository;
private final ObjectMapper objectMapper;

@Override
public ExitStatus afterStep(StepExecution stepExecution) {

JobExecution jobExecution = stepExecution.getJobExecution();

LocalDateTime start =
Objects.requireNonNull(stepExecution.getStartTime())
.atZone(ZoneId.of("Asia/Seoul"))
.toLocalDateTime();

LocalDateTime end =
Objects.requireNonNull(stepExecution.getEndTime())
.atZone(ZoneId.of("Asia/Seoul"))
.toLocalDateTime();
Comment on lines +33 to +41
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

하드코딩된 Asia/Seoul 시간대는 다른 환경이나 지역에서 애플리케이션을 실행할 때 문제를 일으킬 수 있습니다. ZoneId를 설정 파일에서 주입받거나, ZoneId.of("UTC")와 같이 표준 시간대를 사용하는 것을 권장합니다. 이를 통해 코드의 유연성과 이식성을 높일 수 있습니다.


long durationMs = Duration.between(start, end).toMillis();

long readCount = stepExecution.getReadCount();
BigDecimal tps =
durationMs > 0
? BigDecimal.valueOf(readCount)
.multiply(BigDecimal.valueOf(1000))
.divide(BigDecimal.valueOf(durationMs), 2, RoundingMode.HALF_UP)
: BigDecimal.ZERO;

String paramsJson;
try {
paramsJson =
objectMapper.writeValueAsString(
jobExecution.getJobParameters().getParameters());
} catch (Exception e) {
paramsJson = "{}";
Comment on lines +58 to +59
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Exception으로 모든 예외를 잡는 것은 잠재적인 버그를 숨길 수 있습니다. objectMapper.writeValueAsStringJsonProcessingException을 발생시키므로, 더 구체적인 예외를 처리하는 것이 좋습니다.

Suggested change
} catch (Exception e) {
paramsJson = "{}";
} catch (com.fasterxml.jackson.core.JsonProcessingException e) {
paramsJson = "{}";

}

reportRepository.save(
BatchExecutionReport.builder()
.jobName(jobExecution.getJobInstance().getJobName())
.stepName(stepExecution.getStepName())
.status(stepExecution.getStatus().toString())
.startedAt(start)
.endedAt(end)
.durationMs(durationMs)
.readCount(readCount)
.writeCount(stepExecution.getWriteCount())
.filterCount(stepExecution.getFilterCount())
.skipCount(stepExecution.getSkipCount())
.commitCount(stepExecution.getCommitCount())
.rollbackCount(stepExecution.getRollbackCount())
.tps(tps)
.jobParameters(paramsJson)
.createdAt(LocalDateTime.now())
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

LocalDateTime.now()는 서버의 기본 시간대를 사용합니다. 이는 서버 설정에 따라 예기치 않은 동작을 유발할 수 있습니다. LocalDateTime.now(ZoneId.of("UTC"))와 같이 명시적으로 시간대를 지정하여 일관성을 보장하는 것이 좋습니다.

.build());

return stepExecution.getExitStatus();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.template.worker.batch.model.entity;

import java.math.BigDecimal;
import java.time.LocalDateTime;

import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import jakarta.persistence.Table;

import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;

@Entity
@Table(name = "batch_execution_report")
@Getter
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@AllArgsConstructor
@Builder
public class BatchExecutionReport {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;

private String jobName;
private String stepName;

private String status;

private LocalDateTime startedAt;
private LocalDateTime endedAt;
private long durationMs;

private long readCount;
private long writeCount;
private long filterCount;
private long skipCount;
private long commitCount;
private long rollbackCount;

private BigDecimal tps;

@Column(columnDefinition = "jsonb")
private String jobParameters;

private LocalDateTime createdAt;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.template.worker.batch.model.repository;

import org.springframework.data.jpa.repository.JpaRepository;

import com.template.worker.batch.model.entity.BatchExecutionReport;

public interface BatchExecutionReportRepository extends JpaRepository<BatchExecutionReport, Long> {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.template.worker.batch.model.repository;

import java.util.List;
import java.util.Map;

import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;

import lombok.RequiredArgsConstructor;

@Repository
@RequiredArgsConstructor
public class UsageNotificationOutboxRepository {

private final JdbcTemplate jdbcTemplate;

public void markProcessing(List<Long> ids) {
jdbcTemplate.batchUpdate(
"UPDATE usage_notification_outbox SET status = 'PROCESSING' WHERE id = ?",
ids,
ids.size(),
(ps, id) -> ps.setLong(1, id));
}

public void markSent(List<Long> ids) {
jdbcTemplate.batchUpdate(
"UPDATE usage_notification_outbox SET status = 'SENT', sent_at = now() WHERE id ="
+ " ?",
Comment on lines +27 to +28
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

SQL 쿼리 문자열을 + 연산자로 연결하는 대신, 파라미터 마커(?)를 문자열 내에 직접 포함하는 것이 가독성 측면에서 더 좋습니다.

Suggested change
"UPDATE usage_notification_outbox SET status = 'SENT', sent_at = now() WHERE id ="
+ " ?",
"UPDATE usage_notification_outbox SET status = 'SENT', sent_at = now() WHERE id = ?",

ids,
ids.size(),
(ps, id) -> ps.setLong(1, id));
}

public void markFailedWithReasons(Map<Long, String> reasons) {
jdbcTemplate.batchUpdate(
"UPDATE usage_notification_outbox "
+ "SET status = 'FAILED', failure_reason = ? "
+ "WHERE id = ?",
reasons.entrySet(),
reasons.size(),
(ps, entry) -> {
ps.setString(1, entry.getValue());
ps.setLong(2, entry.getKey());
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.template.worker.batch.notificationsend.config;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

import com.template.worker.batch.notificationsend.dto.NotificationMessage;
import com.template.worker.batch.notificationsend.processor.NotificationSendProcessor;
import com.template.worker.batch.notificationsend.writer.NotificationSendWriter;
import com.template.worker.batch.usagenotification.dto.UsageNotificationOutboxRow;

import lombok.RequiredArgsConstructor;

@Configuration
@RequiredArgsConstructor
public class NotificationSendJobConfig {

@Value("${spring.batch.jobs.usage.chunk-size}")
private int chunkSize;

private final JobRepository jobRepository;
private final PlatformTransactionManager txManager;

private final JdbcCursorItemReader<UsageNotificationOutboxRow> outboxReader;
private final NotificationSendProcessor notificationSendProcessor;
private final NotificationSendWriter notificationSendWriter;

@Bean
public Job notificationSendJob() {
return new JobBuilder("notificationSendJob", jobRepository)
.incrementer(new RunIdIncrementer())
.start(notificationSendStep())
.build();
}

@JobScope
@Bean
public Step notificationSendStep() {
return new StepBuilder("notificationSendStep", jobRepository)
.<UsageNotificationOutboxRow, NotificationMessage>chunk(chunkSize, txManager)
.reader(outboxReader)
.processor(notificationSendProcessor)
.writer(notificationSendWriter)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.template.worker.batch.notificationsend.dto;

import java.util.Map;
import java.util.UUID;

public record NotificationMessage(
UUID eventId,
Long id,
Long templateGroupId,
Map<String, Object> subscriptionInfo,
Map<String, Object> variables) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.template.worker.batch.notificationsend.dto;

import java.util.concurrent.CompletableFuture;

import org.springframework.kafka.support.SendResult;

public record NotificationSendTask(
NotificationMessage event, CompletableFuture<SendResult<String, String>> future) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.template.worker.batch.notificationsend.processor;

import java.util.Map;
import java.util.UUID;

import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;

import com.template.worker.batch.notificationsend.dto.NotificationMessage;
import com.template.worker.batch.usagenotification.dto.UsageNotificationOutboxRow;

import lombok.extern.slf4j.Slf4j;

@Component
@Slf4j
public class NotificationSendProcessor
implements ItemProcessor<UsageNotificationOutboxRow, NotificationMessage> {

@Override
public NotificationMessage process(UsageNotificationOutboxRow item) {

return new NotificationMessage(
UUID.randomUUID(),
item.id(),
101L,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

코드에 매직 넘버 101L이 사용되었습니다. 이 숫자가 무엇을 의미하는지 알기 어렵습니다. private static final Long USAGE_NOTIFICATION_TEMPLATE_GROUP_ID = 101L;과 같이 의미 있는 이름을 가진 상수로 선언하여 사용하면 코드의 가독성과 유지보수성이 향상됩니다.

Map.of(
"subId", item.subId(),
"phoneNumber", item.phoneNumber(),
"email", item.email()),
Map.of(
"period", item.period(),
"threshold", item.threshold(),
"percent", item.percent(),
"totalUsedMb", item.totalUsedMb(),
"allotmentMb", item.allotmentMb(),
"phoneNumber", item.phoneNumber(),
"email", item.email(),
"createdAt", item.createdAt()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package com.template.worker.batch.notificationsend.reader;

import javax.sql.DataSource;

import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.template.worker.batch.usagenotification.dto.UsageNotificationOutboxRow;

@Configuration
public class NotificationSendReaderConfig {

@Value("${spring.batch.jobs.usage.fetch-size}")
private int fetchSize;

@Bean(name = "notificationSendOutboxReader")
public JdbcCursorItemReader<UsageNotificationOutboxRow> notificationSendOutboxReader(
DataSource dataSource) {

String sql =
"""
SELECT
o.id,
o.sub_id,
o.period,
o.plan_name,
o.threshold,
o.percent,
o.total_used_mb,
o.allotment_mb,
s.phone_number,
c.email_enc,
o.created_at
FROM usage_notification_outbox o
JOIN subscription s
ON s.sub_id = o.sub_id
JOIN customer c
ON c.customer_id = s.customer_id
WHERE o.status = 'PENDING'
ORDER BY o.id
""";

return new JdbcCursorItemReaderBuilder<UsageNotificationOutboxRow>()
.name("notificationSendOutboxReader")
.dataSource(dataSource)
.sql(sql)
.fetchSize(fetchSize)
.rowMapper(
(rs, rowNum) ->
new UsageNotificationOutboxRow(
rs.getLong("id"),
rs.getLong("sub_id"),
rs.getString("period"),
rs.getString("plan_name"),
rs.getInt("threshold"),
rs.getDouble("percent"),
rs.getLong("total_used_mb"),
rs.getLong("allotment_mb"),
rs.getString("phone_number"),
rs.getString("email_enc"),
rs.getTimestamp("created_at").toLocalDateTime()))
.build();
}
}
Loading
Loading