diff --git a/src/main/java/com/github/wellch4n/oops/application/event/PipelineNotificationListener.java b/src/main/java/com/github/wellch4n/oops/application/event/PipelineNotificationListener.java index 0b268bd..32a22ef 100644 --- a/src/main/java/com/github/wellch4n/oops/application/event/PipelineNotificationListener.java +++ b/src/main/java/com/github/wellch4n/oops/application/event/PipelineNotificationListener.java @@ -9,6 +9,7 @@ import java.util.List; import lombok.extern.slf4j.Slf4j; import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; @Slf4j @@ -26,6 +27,7 @@ public PipelineNotificationListener(ExternalMessageService externalMessageServic this.userService = userService; } + @Async("notificationTaskExecutor") @EventListener public void onPipelineNotification(PipelineNotificationEvent event) { if (event == null || event.operatorId() == null || event.operatorId().isBlank()) { @@ -33,7 +35,11 @@ public void onPipelineNotification(PipelineNotificationEvent event) { return; } - externalMessageService.sendToUser(event.operatorId(), buildMessage(event)); + try { + externalMessageService.sendToUser(event.operatorId(), buildMessage(event)); + } catch (Exception exception) { + log.warn("Failed to send pipeline notification for pipeline {}", event.pipelineId(), exception); + } } private ExternalUserMessage buildMessage(PipelineNotificationEvent event) { diff --git a/src/main/java/com/github/wellch4n/oops/infrastructure/config/AsyncConfiguration.java b/src/main/java/com/github/wellch4n/oops/infrastructure/config/AsyncConfiguration.java new file mode 100644 index 0000000..dac07d4 --- /dev/null +++ b/src/main/java/com/github/wellch4n/oops/infrastructure/config/AsyncConfiguration.java @@ -0,0 +1,26 @@ +package com.github.wellch4n.oops.infrastructure.config; + +import java.util.concurrent.ThreadPoolExecutor; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.task.AsyncTaskExecutor; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +@Configuration +@EnableAsync +public class AsyncConfiguration { + + @Bean + public AsyncTaskExecutor notificationTaskExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setThreadNamePrefix("notification-"); + executor.setCorePoolSize(2); + executor.setMaxPoolSize(4); + executor.setQueueCapacity(200); + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + executor.setWaitForTasksToCompleteOnShutdown(true); + executor.setAwaitTerminationSeconds(10); + return executor; + } +} diff --git a/src/main/java/com/github/wellch4n/oops/infrastructure/scheduler/PipelineInstanceScanJob.java b/src/main/java/com/github/wellch4n/oops/infrastructure/scheduler/PipelineInstanceScanJob.java index 2818894..97830e2 100644 --- a/src/main/java/com/github/wellch4n/oops/infrastructure/scheduler/PipelineInstanceScanJob.java +++ b/src/main/java/com/github/wellch4n/oops/infrastructure/scheduler/PipelineInstanceScanJob.java @@ -20,6 +20,7 @@ import java.time.Duration; import java.time.Instant; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang3.StringUtils; import org.springframework.context.ApplicationEventPublisher; import org.springframework.scheduling.annotation.Scheduled; @@ -34,6 +35,8 @@ public class PipelineInstanceScanJob { private static final Duration ROLLOUT_TIMEOUT = Duration.ofMinutes(5); + private final AtomicBoolean pipelineJobScanInProgress = new AtomicBoolean(false); + private final AtomicBoolean rolloutScanInProgress = new AtomicBoolean(false); private final ApplicationRepository applicationRepository; private final PipelineRepository pipelineRepository; private final EnvironmentService environmentService; @@ -60,24 +63,28 @@ public PipelineInstanceScanJob(ApplicationRepository applicationRepository, this.applicationRuntimeGateway = applicationRuntimeGateway; } - @Scheduled(fixedRate = 5000) + @Scheduled(fixedDelay = 5000) public void scanPipelineJobs() { - List runningPipelines = pipelineRepository.findAllByStatus(PipelineStatus.RUNNING); - for (Pipeline pipeline : runningPipelines) { - try { + if (!pipelineJobScanInProgress.compareAndSet(false, true)) { + return; + } + try { + List runningPipelines = pipelineRepository.findAllByStatus(PipelineStatus.RUNNING); + for (Pipeline pipeline : runningPipelines) { + try { - if (pipelineStateMachine.isTerminal(pipeline.getStatus())) { - continue; - } + if (pipelineStateMachine.isTerminal(pipeline.getStatus())) { + continue; + } - String environmentName = pipeline.getEnvironment(); - Environment environment = environmentService.getEnvironment(environmentName); - if (environment == null) { - throw new IllegalStateException("Environment not found: " + environmentName); - } + String environmentName = pipeline.getEnvironment(); + Environment environment = environmentService.getEnvironment(environmentName); + if (environment == null) { + throw new IllegalStateException("Environment not found: " + environmentName); + } - PipelineJobStatus jobStatus = pipelineJobGateway.getStatus(environment, pipeline.getName()); - if (jobStatus == PipelineJobStatus.SUCCEEDED) { + PipelineJobStatus jobStatus = pipelineJobGateway.getStatus(environment, pipeline.getName()); + if (jobStatus == PipelineJobStatus.SUCCEEDED) { if (DeployMode.MANUAL.equals(pipeline.getDeployMode())) { pipelineStateMachine.ensureCanTransition(PipelineStatus.RUNNING, PipelineStatus.BUILD_SUCCEEDED); int updated = pipelineRepository.updateStatusIfMatch( @@ -136,22 +143,25 @@ public void scanPipelineJobs() { )); } } - } catch (Exception e) { - System.out.println("Error scanning pipeline instance: " + e.getMessage()); - String message = StringUtils.defaultIfBlank(e.getMessage(), "发布任务执行失败,请查看日志。"); - int deployingUpdated = pipelineRepository.updateStatusAndMessageIfMatch( - pipeline.getId(), PipelineStatus.DEPLOYING, PipelineStatus.ERROR, message - ); - int runningUpdated = pipelineRepository.updateStatusAndMessageIfMatch( - pipeline.getId(), PipelineStatus.RUNNING, PipelineStatus.ERROR, message - ); - if (deployingUpdated > 0 || runningUpdated > 0) { - pipeline.markFailed(message); - eventPublisher.publishEvent(PipelineNotificationEvent.of( - pipeline, PipelineNotificationType.FAILED, message - )); + } catch (Exception exception) { + System.out.println("Error scanning pipeline instance: " + exception.getMessage()); + String message = StringUtils.defaultIfBlank(exception.getMessage(), "发布任务执行失败,请查看日志。"); + int deployingUpdated = pipelineRepository.updateStatusAndMessageIfMatch( + pipeline.getId(), PipelineStatus.DEPLOYING, PipelineStatus.ERROR, message + ); + int runningUpdated = pipelineRepository.updateStatusAndMessageIfMatch( + pipeline.getId(), PipelineStatus.RUNNING, PipelineStatus.ERROR, message + ); + if (deployingUpdated > 0 || runningUpdated > 0) { + pipeline.markFailed(message); + eventPublisher.publishEvent(PipelineNotificationEvent.of( + pipeline, PipelineNotificationType.FAILED, message + )); + } } } + } finally { + pipelineJobScanInProgress.set(false); } } @@ -160,32 +170,39 @@ public void scanPipelineJobs() { * live StatefulSet rollout: a converged rollout marks it SUCCEEDED, a missing workload, fatal pod state, or * prolonged not-ready state marks it ERROR, and anything in between leaves it ROLLING_OUT for the next tick. */ - @Scheduled(fixedRate = 5000) + @Scheduled(fixedDelay = 5000) public void scanRollingOutPipelines() { - List rollingOutPipelines = pipelineRepository.findAllByStatus(PipelineStatus.ROLLING_OUT); - for (Pipeline pipeline : rollingOutPipelines) { - try { - Environment environment = environmentService.getEnvironment(pipeline.getEnvironment()); - if (environment == null) { - throw new IllegalStateException("Environment not found: " + pipeline.getEnvironment()); - } + if (!rolloutScanInProgress.compareAndSet(false, true)) { + return; + } + try { + List rollingOutPipelines = pipelineRepository.findAllByStatus(PipelineStatus.ROLLING_OUT); + for (Pipeline pipeline : rollingOutPipelines) { + try { + Environment environment = environmentService.getEnvironment(pipeline.getEnvironment()); + if (environment == null) { + throw new IllegalStateException("Environment not found: " + pipeline.getEnvironment()); + } - DeploymentHealth health = applicationRuntimeGateway.getDeploymentHealth( - environment, pipeline.getNamespace(), pipeline.getApplicationName()); - - if (health.workloadMissing()) { - failRollout(pipeline, "新版本部署失败:StatefulSet 不存在。"); - } else if (health.hasFailure()) { - failRollout(pipeline, "新版本部署失败:" + health.failureReason()); - } else if (health.rolloutComplete()) { - succeedRollout(pipeline); - } else if (health.notReadyLongerThan(Instant.now(), ROLLOUT_TIMEOUT)) { - failRollout(pipeline, "发布生效超时,新版本未在规定时间内就绪。"); + DeploymentHealth health = applicationRuntimeGateway.getDeploymentHealth( + environment, pipeline.getNamespace(), pipeline.getApplicationName()); + + if (health.workloadMissing()) { + failRollout(pipeline, "新版本部署失败:StatefulSet 不存在。"); + } else if (health.hasFailure()) { + failRollout(pipeline, "新版本部署失败:" + health.failureReason()); + } else if (health.rolloutComplete()) { + succeedRollout(pipeline); + } else if (health.notReadyLongerThan(Instant.now(), ROLLOUT_TIMEOUT)) { + failRollout(pipeline, "发布生效超时,新版本未在规定时间内就绪。"); + } + // otherwise: still rolling out, leave ROLLING_OUT for the next tick + } catch (Exception exception) { + System.out.println("Error rollingOut pipeline instance: " + exception.getMessage()); } - // otherwise: still rolling out, leave ROLLING_OUT for the next tick - } catch (Exception exception) { - System.out.println("Error rollingOut pipeline instance: " + exception.getMessage()); } + } finally { + rolloutScanInProgress.set(false); } }