From 37d70bee9f8543f6d73003826fa2a94b069c32d9 Mon Sep 17 00:00:00 2001 From: wellCh4n Date: Wed, 10 Jun 2026 00:26:00 +0800 Subject: [PATCH] fix(pipeline): reduce scanner connection pressure Move pipeline notifications onto a bounded async executor so external messaging cannot hold up publish or scanner threads. Run pipeline scanner ticks with fixed delay and lightweight overlap guards to avoid piling up work when Kubernetes calls are slow. Co-authored-by: Codex --- .../event/PipelineNotificationListener.java | 8 +- .../config/AsyncConfiguration.java | 26 ++++ .../scheduler/PipelineInstanceScanJob.java | 117 ++++++++++-------- 3 files changed, 100 insertions(+), 51 deletions(-) create mode 100644 src/main/java/com/github/wellch4n/oops/infrastructure/config/AsyncConfiguration.java diff --git a/src/main/java/com/github/wellch4n/oops/application/event/PipelineNotificationListener.java b/src/main/java/com/github/wellch4n/oops/application/event/PipelineNotificationListener.java index 0b268bd..32a22ef 100644 --- a/src/main/java/com/github/wellch4n/oops/application/event/PipelineNotificationListener.java +++ b/src/main/java/com/github/wellch4n/oops/application/event/PipelineNotificationListener.java @@ -9,6 +9,7 @@ import java.util.List; import lombok.extern.slf4j.Slf4j; import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; @Slf4j @@ -26,6 +27,7 @@ public PipelineNotificationListener(ExternalMessageService externalMessageServic this.userService = userService; } + @Async("notificationTaskExecutor") @EventListener public void onPipelineNotification(PipelineNotificationEvent event) { if (event == null || event.operatorId() == null || event.operatorId().isBlank()) { @@ -33,7 +35,11 @@ public void onPipelineNotification(PipelineNotificationEvent event) { return; } - externalMessageService.sendToUser(event.operatorId(), buildMessage(event)); + try { + externalMessageService.sendToUser(event.operatorId(), buildMessage(event)); + } catch (Exception exception) { + log.warn("Failed to send pipeline notification for pipeline {}", event.pipelineId(), exception); + } } private ExternalUserMessage buildMessage(PipelineNotificationEvent event) { diff --git a/src/main/java/com/github/wellch4n/oops/infrastructure/config/AsyncConfiguration.java b/src/main/java/com/github/wellch4n/oops/infrastructure/config/AsyncConfiguration.java new file mode 100644 index 0000000..dac07d4 --- /dev/null +++ b/src/main/java/com/github/wellch4n/oops/infrastructure/config/AsyncConfiguration.java @@ -0,0 +1,26 @@ +package com.github.wellch4n.oops.infrastructure.config; + +import java.util.concurrent.ThreadPoolExecutor; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.task.AsyncTaskExecutor; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +@Configuration +@EnableAsync +public class AsyncConfiguration { + + @Bean + public AsyncTaskExecutor notificationTaskExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setThreadNamePrefix("notification-"); + executor.setCorePoolSize(2); + executor.setMaxPoolSize(4); + executor.setQueueCapacity(200); + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + executor.setWaitForTasksToCompleteOnShutdown(true); + executor.setAwaitTerminationSeconds(10); + return executor; + } +} diff --git a/src/main/java/com/github/wellch4n/oops/infrastructure/scheduler/PipelineInstanceScanJob.java b/src/main/java/com/github/wellch4n/oops/infrastructure/scheduler/PipelineInstanceScanJob.java index 2818894..97830e2 100644 --- a/src/main/java/com/github/wellch4n/oops/infrastructure/scheduler/PipelineInstanceScanJob.java +++ b/src/main/java/com/github/wellch4n/oops/infrastructure/scheduler/PipelineInstanceScanJob.java @@ -20,6 +20,7 @@ import java.time.Duration; import java.time.Instant; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang3.StringUtils; import org.springframework.context.ApplicationEventPublisher; import org.springframework.scheduling.annotation.Scheduled; @@ -34,6 +35,8 @@ public class PipelineInstanceScanJob { private static final Duration ROLLOUT_TIMEOUT = Duration.ofMinutes(5); + private final AtomicBoolean pipelineJobScanInProgress = new AtomicBoolean(false); + private final AtomicBoolean rolloutScanInProgress = new AtomicBoolean(false); private final ApplicationRepository applicationRepository; private final PipelineRepository pipelineRepository; private final EnvironmentService environmentService; @@ -60,24 +63,28 @@ public PipelineInstanceScanJob(ApplicationRepository applicationRepository, this.applicationRuntimeGateway = applicationRuntimeGateway; } - @Scheduled(fixedRate = 5000) + @Scheduled(fixedDelay = 5000) public void scanPipelineJobs() { - List runningPipelines = pipelineRepository.findAllByStatus(PipelineStatus.RUNNING); - for (Pipeline pipeline : runningPipelines) { - try { + if (!pipelineJobScanInProgress.compareAndSet(false, true)) { + return; + } + try { + List runningPipelines = pipelineRepository.findAllByStatus(PipelineStatus.RUNNING); + for (Pipeline pipeline : runningPipelines) { + try { - if (pipelineStateMachine.isTerminal(pipeline.getStatus())) { - continue; - } + if (pipelineStateMachine.isTerminal(pipeline.getStatus())) { + continue; + } - String environmentName = pipeline.getEnvironment(); - Environment environment = environmentService.getEnvironment(environmentName); - if (environment == null) { - throw new IllegalStateException("Environment not found: " + environmentName); - } + String environmentName = pipeline.getEnvironment(); + Environment environment = environmentService.getEnvironment(environmentName); + if (environment == null) { + throw new IllegalStateException("Environment not found: " + environmentName); + } - PipelineJobStatus jobStatus = pipelineJobGateway.getStatus(environment, pipeline.getName()); - if (jobStatus == PipelineJobStatus.SUCCEEDED) { + PipelineJobStatus jobStatus = pipelineJobGateway.getStatus(environment, pipeline.getName()); + if (jobStatus == PipelineJobStatus.SUCCEEDED) { if (DeployMode.MANUAL.equals(pipeline.getDeployMode())) { pipelineStateMachine.ensureCanTransition(PipelineStatus.RUNNING, PipelineStatus.BUILD_SUCCEEDED); int updated = pipelineRepository.updateStatusIfMatch( @@ -136,22 +143,25 @@ public void scanPipelineJobs() { )); } } - } catch (Exception e) { - System.out.println("Error scanning pipeline instance: " + e.getMessage()); - String message = StringUtils.defaultIfBlank(e.getMessage(), "发布任务执行失败,请查看日志。"); - int deployingUpdated = pipelineRepository.updateStatusAndMessageIfMatch( - pipeline.getId(), PipelineStatus.DEPLOYING, PipelineStatus.ERROR, message - ); - int runningUpdated = pipelineRepository.updateStatusAndMessageIfMatch( - pipeline.getId(), PipelineStatus.RUNNING, PipelineStatus.ERROR, message - ); - if (deployingUpdated > 0 || runningUpdated > 0) { - pipeline.markFailed(message); - eventPublisher.publishEvent(PipelineNotificationEvent.of( - pipeline, PipelineNotificationType.FAILED, message - )); + } catch (Exception exception) { + System.out.println("Error scanning pipeline instance: " + exception.getMessage()); + String message = StringUtils.defaultIfBlank(exception.getMessage(), "发布任务执行失败,请查看日志。"); + int deployingUpdated = pipelineRepository.updateStatusAndMessageIfMatch( + pipeline.getId(), PipelineStatus.DEPLOYING, PipelineStatus.ERROR, message + ); + int runningUpdated = pipelineRepository.updateStatusAndMessageIfMatch( + pipeline.getId(), PipelineStatus.RUNNING, PipelineStatus.ERROR, message + ); + if (deployingUpdated > 0 || runningUpdated > 0) { + pipeline.markFailed(message); + eventPublisher.publishEvent(PipelineNotificationEvent.of( + pipeline, PipelineNotificationType.FAILED, message + )); + } } } + } finally { + pipelineJobScanInProgress.set(false); } } @@ -160,32 +170,39 @@ public void scanPipelineJobs() { * live StatefulSet rollout: a converged rollout marks it SUCCEEDED, a missing workload, fatal pod state, or * prolonged not-ready state marks it ERROR, and anything in between leaves it ROLLING_OUT for the next tick. */ - @Scheduled(fixedRate = 5000) + @Scheduled(fixedDelay = 5000) public void scanRollingOutPipelines() { - List rollingOutPipelines = pipelineRepository.findAllByStatus(PipelineStatus.ROLLING_OUT); - for (Pipeline pipeline : rollingOutPipelines) { - try { - Environment environment = environmentService.getEnvironment(pipeline.getEnvironment()); - if (environment == null) { - throw new IllegalStateException("Environment not found: " + pipeline.getEnvironment()); - } + if (!rolloutScanInProgress.compareAndSet(false, true)) { + return; + } + try { + List rollingOutPipelines = pipelineRepository.findAllByStatus(PipelineStatus.ROLLING_OUT); + for (Pipeline pipeline : rollingOutPipelines) { + try { + Environment environment = environmentService.getEnvironment(pipeline.getEnvironment()); + if (environment == null) { + throw new IllegalStateException("Environment not found: " + pipeline.getEnvironment()); + } - DeploymentHealth health = applicationRuntimeGateway.getDeploymentHealth( - environment, pipeline.getNamespace(), pipeline.getApplicationName()); - - if (health.workloadMissing()) { - failRollout(pipeline, "新版本部署失败:StatefulSet 不存在。"); - } else if (health.hasFailure()) { - failRollout(pipeline, "新版本部署失败:" + health.failureReason()); - } else if (health.rolloutComplete()) { - succeedRollout(pipeline); - } else if (health.notReadyLongerThan(Instant.now(), ROLLOUT_TIMEOUT)) { - failRollout(pipeline, "发布生效超时,新版本未在规定时间内就绪。"); + DeploymentHealth health = applicationRuntimeGateway.getDeploymentHealth( + environment, pipeline.getNamespace(), pipeline.getApplicationName()); + + if (health.workloadMissing()) { + failRollout(pipeline, "新版本部署失败:StatefulSet 不存在。"); + } else if (health.hasFailure()) { + failRollout(pipeline, "新版本部署失败:" + health.failureReason()); + } else if (health.rolloutComplete()) { + succeedRollout(pipeline); + } else if (health.notReadyLongerThan(Instant.now(), ROLLOUT_TIMEOUT)) { + failRollout(pipeline, "发布生效超时,新版本未在规定时间内就绪。"); + } + // otherwise: still rolling out, leave ROLLING_OUT for the next tick + } catch (Exception exception) { + System.out.println("Error rollingOut pipeline instance: " + exception.getMessage()); } - // otherwise: still rolling out, leave ROLLING_OUT for the next tick - } catch (Exception exception) { - System.out.println("Error rollingOut pipeline instance: " + exception.getMessage()); } + } finally { + rolloutScanInProgress.set(false); } }