Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.bethibande.repository.cache;

import com.bethibande.repository.k8s.KubernetesSupport;
import com.github.benmanes.caffeine.cache.Cache;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.web.client.HttpResponse;
import jakarta.enterprise.context.ApplicationScoped;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

@ApplicationScoped
public class DistributedCacheRegistry {

private static final Logger LOGGER = LoggerFactory.getLogger(DistributedCacheRegistry.class);

private final Map<String, Cache<String, ?>> caches = new HashMap<>();

private final KubernetesSupport kubernetesSupport;

public DistributedCacheRegistry(final KubernetesSupport kubernetesSupport) {
this.kubernetesSupport = kubernetesSupport;
}

public void register(final String name, final Cache<String, ?> cache) {
this.caches.put(name, cache);
}

/**
* Will invalidate the given key in the given cache only locally. This request will not propagate to other instances.
* It's used internally by the {@link #invalidateAll(String, String)} logic
* @param cache The registered name of the cache
* @param key The key to invalidate
*/
public void invalidateLocal(final String cache, final String key) {
final Cache<String, ?> localCache = this.caches.get(cache);
if (localCache != null) localCache.invalidate(key);
}

/**
* Will invalidate the given key in the given cache across all known instances.
* @param cache The registered name of the cache
* @param key The key you wish to invalidate
*/
public void invalidateAll(final String cache, final String key) {
if (!this.kubernetesSupport.isServiceDiscoveryEnabled()) return;

final List<Future<HttpResponse<Buffer>>> futures = this.kubernetesSupport.broadcastHttp(
(baseUrl, webClient) -> webClient.delete(baseUrl + "/api/v1/cache/" + cache + "/" + key).send()
);

for (int i = 0; i < futures.size(); i++) {
futures.get(i).onFailure(ex -> LOGGER.error("Failed to invalidate key {} for cache {}", key, cache, ex));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,17 @@
import com.bethibande.repository.jobs.impl.JobTask;
import com.bethibande.repository.k8s.KubernetesLeaderService;
import com.bethibande.repository.k8s.KubernetesSupport;
import com.bethibande.repository.util.HttpClientUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.quarkus.narayana.jta.QuarkusTransaction;
import io.quarkus.virtual.threads.VirtualThreads;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import java.net.http.HttpResponse;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.function.Function;

@ApplicationScoped
public class BuiltinJobScheduler {
Expand All @@ -36,15 +34,15 @@ public class BuiltinJobScheduler {
@VirtualThreads
protected Executor executor;

public <C> CompletableFuture<Void> runOnce(final JobTask<C> task, final C config) {
public <C> CompletionStage<Void> runOnce(final JobTask<C> task, final C config) {
return this.schedule(task, config, "* * * * *", true, true);
}

public <C> CompletableFuture<Void> schedule(final JobTask<C> task,
final C config,
final String cron,
final boolean immediate,
final boolean deleteAfterRun) {
public <C> CompletionStage<Void> schedule(final JobTask<C> task,
final C config,
final String cron,
final boolean immediate,
final boolean deleteAfterRun) {
final String configJson;
try {
configJson = this.objectMapper.writeValueAsString(config);
Expand All @@ -53,39 +51,54 @@ public <C> CompletableFuture<Void> schedule(final JobTask<C> task,
}

if (this.kubernetesSupport.isEnabled() && this.kubernetesSupport.isServiceDiscoveryEnabled()) {
final ScheduledJobDTOWithoutId body = new ScheduledJobDTOWithoutId(
task.getJobType(),
configJson,
cron,
deleteAfterRun
);

return CompletableFuture.supplyAsync(
() -> this.kubernetesLeaderService.sendHTTPRequestToLeader(
"/api/v1/job/schedule?now=%s".formatted(immediate),
HttpResponse.BodyHandlers.discarding(),
builder -> builder.method("POST", HttpClientUtil.jsonBodyPublisher(body))
.header("Content-Type", "application/json")
).<Void>thenApply(response -> {
if (response.statusCode() != 200) {
throw new RuntimeException("Failed to schedule job on leader node, status: " + response.statusCode() + " - " + response.body());
}
return null;
}), this.executor).thenCompose(Function.identity());
return scheduleRemote(configJson, task, cron, immediate, deleteAfterRun);
} else {
final ScheduledJob job = new ScheduledJob();
job.type = task.getJobType();
job.settings = configJson;
job.cronSchedule = cron;
job.deleteAfterRun = deleteAfterRun;
job.nextRunAt = immediate ? Instant.now() : null;

return CompletableFuture.runAsync(() -> {
QuarkusTransaction.requiringNew().run(job::persist);

scheduler.schedule(job, Instant.now());
}, this.executor);
return scheduleLocal(configJson, task, cron, immediate, deleteAfterRun);
}
}

private CompletionStage<Void> scheduleRemote(final String configJson,
final JobTask<?> task,
final String cron,
final boolean immediate,
final boolean deleteAfterRun) {
final ScheduledJobDTOWithoutId body = new ScheduledJobDTOWithoutId(
task.getJobType(),
configJson,
cron,
deleteAfterRun
);

return this.kubernetesLeaderService.sendHTTPRequestToLeader(
(baseUrl, webClient) -> webClient.post(baseUrl + "/api/v1/job/schedule?now=%s".formatted(immediate))
.putHeader("Content-Type", "application/json")
.sendJson(body)
).<Void>map(response -> {
if (response.statusCode() != 200) {
throw new RuntimeException("Failed to schedule job on leader node, status: " + response.statusCode() + " - " + response.bodyAsString());
}

return null;
}).toCompletionStage();
}

private CompletableFuture<Void> scheduleLocal(final String configJson,
final JobTask<?> task,
final String cron,
final boolean immediate,
final boolean deleteAfterRun) {
final ScheduledJob job = new ScheduledJob();
job.type = task.getJobType();
job.settings = configJson;
job.cronSchedule = cron;
job.deleteAfterRun = deleteAfterRun;
job.nextRunAt = immediate ? Instant.now() : null;

return CompletableFuture.runAsync(() -> {
QuarkusTransaction.requiringNew().run(job::persist);

scheduler.schedule(job, Instant.now());
}, this.executor);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
import io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.LeaseLock;
import io.quarkus.runtime.StartupEvent;
import io.quarkus.virtual.threads.VirtualThreads;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClient;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
Expand All @@ -16,16 +20,14 @@

import java.net.InetAddress;
import java.net.URI;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import java.util.function.BiFunction;
import java.util.function.Consumer;

@ApplicationScoped
Expand Down Expand Up @@ -152,19 +154,13 @@ private void onNewLeader(final String leader) {
post(callback -> callback.onNewLeader(leader));
}

public <T> CompletableFuture<HttpResponse<T>> sendHTTPRequestToLeader(final String path,
final HttpResponse.BodyHandler<T> bodyHandler,
final Consumer<HttpRequest.Builder> customizer) {
public Future<HttpResponse<Buffer>> sendHTTPRequestToLeader(final BiFunction<String, WebClient, Future<HttpResponse<Buffer>>> fn) {
final InetAddress address = this.kubernetesSupport.podNameToClusterIP(this.leader);
final String host = this.kubernetesSupport.addressToHostname(address);

final URI uri = URI.create("http://%s:%d%s".formatted(host, this.managementPort, path));
final HttpRequest.Builder builder = HttpRequest.newBuilder()
.uri(uri);
final String baseUrl = "http://%s:%d".formatted(host, this.managementPort);

customizer.accept(builder);

return this.kubernetesSupport.httpClient.sendAsync(builder.build(), bodyHandler);
return fn.apply(baseUrl, this.kubernetesSupport.webClient);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
import io.fabric8.kubernetes.client.readiness.Readiness;
import io.quarkus.runtime.Startup;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.web.client.WebClient;
import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
Expand All @@ -25,17 +29,12 @@

import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.BiFunction;

@Startup
@ApplicationScoped
Expand All @@ -52,6 +51,9 @@ public class KubernetesSupport {
@Inject
protected KubernetesClient client;

@Inject
protected Vertx vertx;

private String namespace;

@ConfigProperty(name = "repository.scheduler.discovery-service")
Expand All @@ -66,7 +68,7 @@ public class KubernetesSupport {
@ConfigProperty(name = "repository.distributed")
protected boolean distributedAllowed;

protected final HttpClient httpClient = HttpClient.newHttpClient();
protected WebClient webClient;

protected boolean kubernetesSupport = true;
protected boolean serviceDiscovery = false;
Expand Down Expand Up @@ -127,11 +129,13 @@ && hasPermission("watch", LEASE_NAME, COORDINATION_API_GROUP)

this.canInspectServices = hasPermission("get", "services", "");

this.webClient = WebClient.create(this.vertx);

initDiscovery();
}

protected void initDiscovery() {
if (!this.canListPods() || !this.canInspectServices() || !this.distributedAllowed) return;
if (!this.canListPods() || !this.canInspectServices() || !this.distributedAllowed) return;

final Service service = this.client.services()
.inNamespace(getNamespace())
Expand Down Expand Up @@ -310,24 +314,21 @@ public String addressToHostname(final InetAddress address) {
}

/**
* Broadcasts an HTTP request to the management port of all replicas
* @param path The request path i. e. /api/v1...
* Broadcasts an HTTP request to the management port of all replicas.
* The function will supply the base URL for each endpoint and the webclient that should be used to send the request.
*/
public List<CompletableFuture<HttpResponse<Void>>> broadcastHttp(final String path, final Consumer<HttpRequest.Builder> requestBuilder) {
public List<Future<io.vertx.ext.web.client.HttpResponse<Buffer>>> broadcastHttp(final BiFunction<String, WebClient, Future<io.vertx.ext.web.client.HttpResponse<Buffer>>> fn) {
if (!this.serviceDiscovery) return Collections.emptyList();

final List<CompletableFuture<HttpResponse<Void>>> futures = new ArrayList<>();
final List<Future<io.vertx.ext.web.client.HttpResponse<Buffer>>> futures = new ArrayList<>();
final List<InetAddress> addresses = getAllPodClusterIPs();
for (int i = 0; i < addresses.size(); i++) {
final InetAddress address = addresses.get(i);
final String hostname = addressToHostname(address);

final URI uri = URI.create("http://%s:%d%s".formatted(hostname, this.managementPort, path));
final HttpRequest.Builder builder = HttpRequest.newBuilder()
.uri(uri);
final String baseUrl = "http://%s:%d".formatted(hostname, this.managementPort);

requestBuilder.accept(builder);
futures.add(this.httpClient.sendAsync(builder.build(), HttpResponse.BodyHandlers.discarding()));
futures.add(fn.apply(baseUrl, this.webClient));
}

return futures;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import jakarta.inject.Inject;
import jakarta.transaction.Transactional;

import java.net.http.HttpRequest;
import java.util.Comparator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
Expand Down Expand Up @@ -158,8 +157,7 @@ public void updateConfig(final SMTPConfig config) {

if (this.kubernetesSupport.isServiceDiscoveryEnabled()) {
this.kubernetesSupport.broadcastHttp(
"/api/v1/mail/config/update",
builder -> builder.method("PUT", HttpRequest.BodyPublishers.noBody())
(baseURL, webClient) -> webClient.put(baseURL + "/api/v1/mail/config/update").send()
);
}
}
Expand Down
Loading
Loading