-
Notifications
You must be signed in to change notification settings - Fork 0
[UPLUS-152] Cloud Tasks Queue를 통한 Job 실행 직렬화 #3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,54 +1,17 @@ | ||
| package com.telecom.fn.batchcontrol.service; | ||
|
|
||
| import com.google.cloud.run.v2.JobName; | ||
| import com.google.cloud.run.v2.JobsClient; | ||
| import com.google.cloud.run.v2.RunJobRequest; | ||
| import com.telecom.fn.batchcontrol.model.BatchJobType; | ||
|
|
||
| import java.util.List; | ||
|
|
||
| public class BatchRunService { | ||
|
|
||
| private static final String PROJECT_ID = System.getenv("GCP_PROJECT"); | ||
| private static final String REGION = System.getenv("GCP_REGION"); | ||
| private static final String CLOUD_RUN_JOB_NAME = "telecom-app-batch-core"; | ||
|
|
||
| private final CloudTasksService cloudTasksService = new CloudTasksService(); | ||
|
|
||
| public void runJob(BatchJobType jobType, String invMonth) throws Exception { | ||
|
|
||
| System.out.println("[BatchRunService] ENTER"); | ||
| System.out.println("[BatchRunService] jobType=" + jobType + ", invMonth=" + invMonth); | ||
| System.out.println("[BatchRunService] PROJECT_ID=" + PROJECT_ID + ", REGION=" + REGION); | ||
|
|
||
| List<String> args = List.of( | ||
| "--spring.batch.job.name=" + jobType.springBatchJobName(), | ||
| "--invMonth=" + invMonth, | ||
| "--spring.batch.job.enabled=true", | ||
| "--spring.main.web-application-type=none" | ||
| ); | ||
|
|
||
| System.out.println("[BatchRunService] args=" + args); | ||
|
|
||
| try (JobsClient client = JobsClient.create()) { | ||
|
|
||
| String jobName = JobName.of(PROJECT_ID, REGION, CLOUD_RUN_JOB_NAME).toString(); | ||
| System.out.println("[BatchRunService] CloudRunJob=" + jobName); | ||
|
|
||
| RunJobRequest request = RunJobRequest.newBuilder() | ||
| .setName(jobName) | ||
| .setOverrides( | ||
| RunJobRequest.Overrides.newBuilder() | ||
| .addContainerOverrides( | ||
| RunJobRequest.Overrides.ContainerOverride.newBuilder() | ||
| .addAllArgs(args) | ||
| .build() | ||
| ) | ||
| .build() | ||
| ) | ||
| .build(); | ||
| System.out.println("[BatchRunService] Delegating to CloudTasksService..."); | ||
|
|
||
| client.runJobAsync(request); | ||
| System.out.println("[BatchRunService] runJobAsync CALLED"); | ||
| } | ||
| String taskName = cloudTasksService.createTask(jobType, invMonth); | ||
| System.out.println("[BatchRunService] Task created: " + taskName); | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,168 @@ | ||
| package com.telecom.fn.batchcontrol.service; | ||
|
|
||
| import com.google.cloud.tasks.v2.CloudTasksClient; | ||
| import com.google.cloud.tasks.v2.HttpMethod; | ||
| import com.google.cloud.tasks.v2.HttpRequest; | ||
| import com.google.cloud.tasks.v2.OidcToken; | ||
| import com.google.cloud.tasks.v2.QueueName; | ||
| import com.google.cloud.tasks.v2.Task; | ||
| import com.google.cloud.tasks.v2.CreateTaskRequest; | ||
| import com.google.protobuf.ByteString; | ||
| import com.telecom.fn.batchcontrol.model.BatchJobType; | ||
|
|
||
| import java.nio.charset.StandardCharsets; | ||
| import java.util.Base64; | ||
|
|
||
| public class CloudTasksService { | ||
|
|
||
| private static final String PROJECT_ID = System.getenv("GCP_PROJECT"); | ||
| private static final String REGION = System.getenv("GCP_REGION"); | ||
| private static final String QUEUE_NAME = "telecom-app-batch-queue"; | ||
| private static final String QUEUE_LOCATION = "asia-northeast3"; | ||
| private static final String CLOUD_RUN_JOB_NAME = "telecom-app-batch-core"; | ||
|
|
||
| /** | ||
| * Cloud Task 생성 (즉시 실행용) | ||
| * Task가 Cloud Run Job :run 엔드포인트를 호출 | ||
| */ | ||
| public String createTask(BatchJobType jobType, String invMonth) throws Exception { | ||
| System.out.println("[CloudTasksService] ENTER createTask"); | ||
| System.out.println("[CloudTasksService] jobType=" + jobType + ", invMonth=" + invMonth); | ||
|
|
||
| try (CloudTasksClient client = CloudTasksClient.create()) { | ||
| QueueName queueName = QueueName.of(PROJECT_ID, QUEUE_LOCATION, QUEUE_NAME); | ||
| System.out.println("[CloudTasksService] Queue=" + queueName.toString()); | ||
|
|
||
| String cloudRunJobUrl = String.format( | ||
| "https://%s-run.googleapis.com/apis/run.googleapis.com/v1/namespaces/%s/jobs/%s:run", | ||
| REGION, PROJECT_ID, CLOUD_RUN_JOB_NAME | ||
| ); | ||
| System.out.println("[CloudTasksService] CloudRunJobUrl=" + cloudRunJobUrl); | ||
|
|
||
| String requestBody = buildCloudRunJobRequestBody(jobType, invMonth); | ||
| System.out.println("[CloudTasksService] RequestBody=" + requestBody); | ||
|
|
||
| String serviceAccountEmail = getDefaultServiceAccount(); | ||
|
|
||
| HttpRequest httpRequest = HttpRequest.newBuilder() | ||
| .setUrl(cloudRunJobUrl) | ||
| .setHttpMethod(HttpMethod.POST) | ||
| .putHeaders("Content-Type", "application/json") | ||
| .setBody(ByteString.copyFrom(requestBody, StandardCharsets.UTF_8)) | ||
| .setOidcToken(OidcToken.newBuilder() | ||
| .setServiceAccountEmail(serviceAccountEmail) | ||
| .setAudience(cloudRunJobUrl) | ||
| .build()) | ||
| .build(); | ||
|
|
||
| Task task = Task.newBuilder() | ||
| .setHttpRequest(httpRequest) | ||
| .build(); | ||
|
|
||
| CreateTaskRequest request = CreateTaskRequest.newBuilder() | ||
| .setParent(queueName.toString()) | ||
| .setTask(task) | ||
| .build(); | ||
|
|
||
| Task createdTask = client.createTask(request); | ||
| System.out.println("[CloudTasksService] Task created: " + createdTask.getName()); | ||
| return createdTask.getName(); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Cloud Run Job 실행 요청 본문 생성 | ||
| */ | ||
| private String buildCloudRunJobRequestBody(BatchJobType jobType, String invMonth) { | ||
| String jobArg = "\"--spring.batch.job.name=" + jobType.springBatchJobName() + "\""; | ||
| String monthArg = "\"--invMonth=" + invMonth + "\""; | ||
| String enabledArg = "\"--spring.batch.job.enabled=true\""; | ||
| String webTypeArg = "\"--spring.main.web-application-type=none\""; | ||
|
|
||
| return "{\"overrides\": {\"containerOverrides\": [{\"args\": [" | ||
| + jobArg + ", " + monthArg + ", " + enabledArg + ", " + webTypeArg | ||
| + "]}]}}"; | ||
| } | ||
|
Comment on lines
+76
to
+85
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
private String buildCloudRunJobRequestBody(BatchJobType jobType, String invMonth) {
try {
com.fasterxml.jackson.databind.ObjectMapper objectMapper = new com.fasterxml.jackson.databind.ObjectMapper();
java.util.Map<String, Object> root = java.util.Map.of(
"overrides", java.util.Map.of(
"containerOverrides", java.util.List.of(
java.util.Map.of(
"args", java.util.List.of(
"--spring.batch.job.name=" + jobType.springBatchJobName(),
"--invMonth=" + invMonth,
"--spring.batch.job.enabled=true",
"--spring.main.web-application-type=none"
)
)
)
)
);
return objectMapper.writeValueAsString(root);
} catch (com.fasterxml.jackson.core.JsonProcessingException e) {
throw new RuntimeException("Failed to build Cloud Run job request body", e);
}
} |
||
|
|
||
| /** | ||
| * Cloud Tasks API URL 반환 (Scheduler 타겟용) | ||
| * https://cloudtasks.googleapis.com/v2/projects/{project}/locations/{location}/queues/{queue}/tasks | ||
| */ | ||
| public static String getCloudTasksApiUrl() { | ||
| return String.format( | ||
| "https://cloudtasks.googleapis.com/v2/projects/%s/locations/%s/queues/%s/tasks", | ||
| PROJECT_ID, QUEUE_LOCATION, QUEUE_NAME | ||
| ); | ||
| } | ||
|
|
||
| /** | ||
| * Scheduler가 Cloud Tasks API에 POST할 요청 본문 생성 | ||
| * Task 생성 요청 (내부에 Cloud Run Job 호출 정보 포함) | ||
| */ | ||
| public static String buildSchedulerTaskRequestBody(BatchJobType jobType, String invMonth) { | ||
| String cloudRunJobUrl = String.format( | ||
| "https://%s-run.googleapis.com/apis/run.googleapis.com/v1/namespaces/%s/jobs/%s:run", | ||
| REGION, PROJECT_ID, CLOUD_RUN_JOB_NAME | ||
| ); | ||
|
|
||
| String serviceAccountEmail = getDefaultServiceAccountStatic(); | ||
|
|
||
| String jobArg = "\"--spring.batch.job.name=" + jobType.springBatchJobName() + "\""; | ||
| String monthArg = (invMonth != null && !invMonth.isBlank()) | ||
| ? ", \"--invMonth=" + invMonth + "\"" : ""; | ||
|
|
||
| String innerBody = "{\"overrides\": {\"containerOverrides\": [{\"args\": [" | ||
| + jobArg + monthArg + "]}]}}"; | ||
| String encodedBody = Base64.getEncoder().encodeToString( | ||
| innerBody.getBytes(StandardCharsets.UTF_8)); | ||
|
|
||
| return String.format( | ||
| "{\"task\": {\"httpRequest\": {" + | ||
| "\"url\": \"%s\", " + | ||
| "\"httpMethod\": \"POST\", " + | ||
| "\"headers\": {\"Content-Type\": \"application/json\"}, " + | ||
| "\"body\": \"%s\", " + | ||
| "\"oidcToken\": {\"serviceAccountEmail\": \"%s\", \"audience\": \"%s\"}" + | ||
| "}}}", | ||
| cloudRunJobUrl, encodedBody, serviceAccountEmail, cloudRunJobUrl | ||
| ); | ||
| } | ||
|
Comment on lines
+102
to
+129
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
public static String buildSchedulerTaskRequestBody(BatchJobType jobType, String invMonth) {
try {
com.fasterxml.jackson.databind.ObjectMapper objectMapper = new com.fasterxml.jackson.databind.ObjectMapper();
java.util.List<String> args = new java.util.ArrayList<>();
args.add("--spring.batch.job.name=" + jobType.springBatchJobName());
if (invMonth != null && !invMonth.isBlank()) {
args.add("--invMonth=" + invMonth);
}
java.util.Map<String, Object> innerBodyMap = java.util.Map.of(
"overrides", java.util.Map.of(
"containerOverrides", java.util.List.of(
java.util.Map.of("args", args)
)
)
);
String innerBodyJson = objectMapper.writeValueAsString(innerBodyMap);
String encodedBody = java.util.Base64.getEncoder().encodeToString(
innerBodyJson.getBytes(java.nio.charset.StandardCharsets.UTF_8));
String cloudRunJobUrl = String.format(
"https://%s-run.googleapis.com/apis/run.googleapis.com/v1/namespaces/%s/jobs/%s:run",
REGION, PROJECT_ID, CLOUD_RUN_JOB_NAME
);
String serviceAccountEmail = getDefaultServiceAccountStatic();
java.util.Map<String, Object> taskMap = java.util.Map.of(
"task", java.util.Map.of(
"httpRequest", java.util.Map.of(
"url", cloudRunJobUrl,
"httpMethod", "POST",
"headers", java.util.Map.of("Content-Type", "application/json"),
"body", encodedBody,
"oidcToken", java.util.Map.of(
"serviceAccountEmail", serviceAccountEmail,
"audience", cloudRunJobUrl
)
)
)
);
return objectMapper.writeValueAsString(taskMap);
} catch (com.fasterxml.jackson.core.JsonProcessingException e) {
throw new RuntimeException("Error creating JSON for scheduler task request", e);
}
} |
||
|
|
||
| /** | ||
| * Scheduler 요청 본문에서 invMonth 추출 | ||
| */ | ||
| public static String extractInvMonthFromSchedulerBody(String body) { | ||
| try { | ||
| int bodyStart = body.indexOf("\"body\":"); | ||
| if (bodyStart == -1) return null; | ||
|
|
||
| int valueStart = body.indexOf("\"", bodyStart + 7) + 1; | ||
| int valueEnd = body.indexOf("\"", valueStart); | ||
| if (valueStart == 0 || valueEnd == -1) return null; | ||
|
|
||
| String base64Body = body.substring(valueStart, valueEnd); | ||
| String decodedBody = new String(Base64.getDecoder().decode(base64Body), StandardCharsets.UTF_8); | ||
|
|
||
| if (decodedBody.contains("--invMonth=")) { | ||
| int start = decodedBody.indexOf("--invMonth=") + 11; | ||
| int end = decodedBody.indexOf("\"", start); | ||
| if (end != -1) return decodedBody.substring(start, end); | ||
| } | ||
| } catch (Exception e) { | ||
| System.err.println("[CloudTasksService] Failed to extract invMonth: " + e.getMessage()); | ||
| } | ||
| return null; | ||
| } | ||
|
Comment on lines
+134
to
+155
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
public static String extractInvMonthFromSchedulerBody(String body) {
try {
com.fasterxml.jackson.databind.ObjectMapper objectMapper = new com.fasterxml.jackson.databind.ObjectMapper();
com.fasterxml.jackson.databind.JsonNode rootNode = objectMapper.readTree(body);
String base64Body = rootNode.path("task").path("httpRequest").path("body").asText(null);
if (base64Body == null) {
return null;
}
byte[] decodedBytes = java.util.Base64.getDecoder().decode(base64Body);
String decodedBody = new String(decodedBytes, java.nio.charset.StandardCharsets.UTF_8);
com.fasterxml.jackson.databind.JsonNode innerRootNode = objectMapper.readTree(decodedBody);
com.fasterxml.jackson.databind.JsonNode argsNode = innerRootNode.path("overrides").path("containerOverrides").get(0).path("args");
if (argsNode == null || argsNode.isMissingNode() || !argsNode.isArray()) {
return null;
}
for (com.fasterxml.jackson.databind.JsonNode argNode : argsNode) {
String arg = argNode.asText();
if (arg.startsWith("--invMonth=")) {
return arg.substring("--invMonth=".length());
}
}
} catch (Exception e) {
System.err.println("[CloudTasksService] Failed to extract invMonth: " + e.getMessage());
}
return null;
} |
||
|
|
||
| private String getDefaultServiceAccount() { | ||
| return getDefaultServiceAccountStatic(); | ||
| } | ||
|
|
||
| private static String getDefaultServiceAccountStatic() { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 이 메소드는 public static String getDefaultServiceAccountStatic() { |
||
| String projectNumber = System.getenv("GCP_PROJECT_NUMBER"); | ||
| if (projectNumber == null || projectNumber.isBlank()) { | ||
| throw new IllegalStateException("Environment variable 'GCP_PROJECT_NUMBER' is required."); | ||
| } | ||
| return String.format("%s-compute@developer.gserviceaccount.com", projectNumber); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
QUEUE_LOCATION이 "asia-northeast3"으로 하드코딩되어 있습니다.PROJECT_ID나REGION처럼 환경 변수에서 값을 읽어오도록 변경하면, 다른 환경(예: 개발, 스테이징)에 배포할 때 유연성이 높아집니다.GCP_QUEUE_LOCATION같은 환경 변수를 사용하고, 이 값이 없을 경우 애플리케이션이 시작되지 않도록 하여 설정 오류를 방지하는 것을 권장합니다.