Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

@Slf4j
Expand All @@ -26,14 +27,19 @@ public PipelineNotificationListener(ExternalMessageService externalMessageServic
this.userService = userService;
}

@Async("notificationTaskExecutor")
@EventListener
public void onPipelineNotification(PipelineNotificationEvent event) {
if (event == null || event.operatorId() == null || event.operatorId().isBlank()) {
log.debug("Skip pipeline notification because operatorId is empty");
return;
}

externalMessageService.sendToUser(event.operatorId(), buildMessage(event));
try {
externalMessageService.sendToUser(event.operatorId(), buildMessage(event));
} catch (Exception exception) {
log.warn("Failed to send pipeline notification for pipeline {}", event.pipelineId(), exception);
}
}

private ExternalUserMessage buildMessage(PipelineNotificationEvent event) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.github.wellch4n.oops.infrastructure.config;

import java.util.concurrent.ThreadPoolExecutor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration
@EnableAsync
public class AsyncConfiguration {

@Bean
public AsyncTaskExecutor notificationTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("notification-");
executor.setCorePoolSize(2);
executor.setMaxPoolSize(4);
executor.setQueueCapacity(200);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(10);
return executor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.StringUtils;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.scheduling.annotation.Scheduled;
Expand All @@ -34,6 +35,8 @@
public class PipelineInstanceScanJob {
private static final Duration ROLLOUT_TIMEOUT = Duration.ofMinutes(5);

private final AtomicBoolean pipelineJobScanInProgress = new AtomicBoolean(false);
private final AtomicBoolean rolloutScanInProgress = new AtomicBoolean(false);
private final ApplicationRepository applicationRepository;
private final PipelineRepository pipelineRepository;
private final EnvironmentService environmentService;
Expand All @@ -60,24 +63,28 @@ public PipelineInstanceScanJob(ApplicationRepository applicationRepository,
this.applicationRuntimeGateway = applicationRuntimeGateway;
}

@Scheduled(fixedRate = 5000)
@Scheduled(fixedDelay = 5000)
public void scanPipelineJobs() {
List<Pipeline> runningPipelines = pipelineRepository.findAllByStatus(PipelineStatus.RUNNING);
for (Pipeline pipeline : runningPipelines) {
try {
if (!pipelineJobScanInProgress.compareAndSet(false, true)) {
return;
}
try {
List<Pipeline> runningPipelines = pipelineRepository.findAllByStatus(PipelineStatus.RUNNING);
for (Pipeline pipeline : runningPipelines) {
try {

if (pipelineStateMachine.isTerminal(pipeline.getStatus())) {
continue;
}
if (pipelineStateMachine.isTerminal(pipeline.getStatus())) {
continue;
}

String environmentName = pipeline.getEnvironment();
Environment environment = environmentService.getEnvironment(environmentName);
if (environment == null) {
throw new IllegalStateException("Environment not found: " + environmentName);
}
String environmentName = pipeline.getEnvironment();
Environment environment = environmentService.getEnvironment(environmentName);
if (environment == null) {
throw new IllegalStateException("Environment not found: " + environmentName);
}

PipelineJobStatus jobStatus = pipelineJobGateway.getStatus(environment, pipeline.getName());
if (jobStatus == PipelineJobStatus.SUCCEEDED) {
PipelineJobStatus jobStatus = pipelineJobGateway.getStatus(environment, pipeline.getName());
if (jobStatus == PipelineJobStatus.SUCCEEDED) {
if (DeployMode.MANUAL.equals(pipeline.getDeployMode())) {
pipelineStateMachine.ensureCanTransition(PipelineStatus.RUNNING, PipelineStatus.BUILD_SUCCEEDED);
int updated = pipelineRepository.updateStatusIfMatch(
Expand Down Expand Up @@ -136,22 +143,25 @@ public void scanPipelineJobs() {
));
}
}
} catch (Exception e) {
System.out.println("Error scanning pipeline instance: " + e.getMessage());
String message = StringUtils.defaultIfBlank(e.getMessage(), "发布任务执行失败,请查看日志。");
int deployingUpdated = pipelineRepository.updateStatusAndMessageIfMatch(
pipeline.getId(), PipelineStatus.DEPLOYING, PipelineStatus.ERROR, message
);
int runningUpdated = pipelineRepository.updateStatusAndMessageIfMatch(
pipeline.getId(), PipelineStatus.RUNNING, PipelineStatus.ERROR, message
);
if (deployingUpdated > 0 || runningUpdated > 0) {
pipeline.markFailed(message);
eventPublisher.publishEvent(PipelineNotificationEvent.of(
pipeline, PipelineNotificationType.FAILED, message
));
} catch (Exception exception) {
System.out.println("Error scanning pipeline instance: " + exception.getMessage());
String message = StringUtils.defaultIfBlank(exception.getMessage(), "发布任务执行失败,请查看日志。");
int deployingUpdated = pipelineRepository.updateStatusAndMessageIfMatch(
pipeline.getId(), PipelineStatus.DEPLOYING, PipelineStatus.ERROR, message
);
int runningUpdated = pipelineRepository.updateStatusAndMessageIfMatch(
pipeline.getId(), PipelineStatus.RUNNING, PipelineStatus.ERROR, message
);
if (deployingUpdated > 0 || runningUpdated > 0) {
pipeline.markFailed(message);
eventPublisher.publishEvent(PipelineNotificationEvent.of(
pipeline, PipelineNotificationType.FAILED, message
));
}
}
}
} finally {
pipelineJobScanInProgress.set(false);
}
}

Expand All @@ -160,32 +170,39 @@ public void scanPipelineJobs() {
* live StatefulSet rollout: a converged rollout marks it SUCCEEDED, a missing workload, fatal pod state, or
* prolonged not-ready state marks it ERROR, and anything in between leaves it ROLLING_OUT for the next tick.
*/
@Scheduled(fixedRate = 5000)
@Scheduled(fixedDelay = 5000)
public void scanRollingOutPipelines() {
List<Pipeline> rollingOutPipelines = pipelineRepository.findAllByStatus(PipelineStatus.ROLLING_OUT);
for (Pipeline pipeline : rollingOutPipelines) {
try {
Environment environment = environmentService.getEnvironment(pipeline.getEnvironment());
if (environment == null) {
throw new IllegalStateException("Environment not found: " + pipeline.getEnvironment());
}
if (!rolloutScanInProgress.compareAndSet(false, true)) {
return;
}
try {
List<Pipeline> rollingOutPipelines = pipelineRepository.findAllByStatus(PipelineStatus.ROLLING_OUT);
for (Pipeline pipeline : rollingOutPipelines) {
try {
Environment environment = environmentService.getEnvironment(pipeline.getEnvironment());
if (environment == null) {
throw new IllegalStateException("Environment not found: " + pipeline.getEnvironment());
}

DeploymentHealth health = applicationRuntimeGateway.getDeploymentHealth(
environment, pipeline.getNamespace(), pipeline.getApplicationName());

if (health.workloadMissing()) {
failRollout(pipeline, "新版本部署失败:StatefulSet 不存在。");
} else if (health.hasFailure()) {
failRollout(pipeline, "新版本部署失败:" + health.failureReason());
} else if (health.rolloutComplete()) {
succeedRollout(pipeline);
} else if (health.notReadyLongerThan(Instant.now(), ROLLOUT_TIMEOUT)) {
failRollout(pipeline, "发布生效超时,新版本未在规定时间内就绪。");
DeploymentHealth health = applicationRuntimeGateway.getDeploymentHealth(
environment, pipeline.getNamespace(), pipeline.getApplicationName());

if (health.workloadMissing()) {
failRollout(pipeline, "新版本部署失败:StatefulSet 不存在。");
} else if (health.hasFailure()) {
failRollout(pipeline, "新版本部署失败:" + health.failureReason());
} else if (health.rolloutComplete()) {
succeedRollout(pipeline);
} else if (health.notReadyLongerThan(Instant.now(), ROLLOUT_TIMEOUT)) {
failRollout(pipeline, "发布生效超时,新版本未在规定时间内就绪。");
}
// otherwise: still rolling out, leave ROLLING_OUT for the next tick
} catch (Exception exception) {
System.out.println("Error rollingOut pipeline instance: " + exception.getMessage());
}
// otherwise: still rolling out, leave ROLLING_OUT for the next tick
} catch (Exception exception) {
System.out.println("Error rollingOut pipeline instance: " + exception.getMessage());
}
} finally {
rolloutScanInProgress.set(false);
}
}

Expand Down
Loading