diff --git a/server/src/main/java/com/bethibande/repository/cache/DistributedCacheRegistry.java b/server/src/main/java/com/bethibande/repository/cache/DistributedCacheRegistry.java new file mode 100644 index 0000000..5158c27 --- /dev/null +++ b/server/src/main/java/com/bethibande/repository/cache/DistributedCacheRegistry.java @@ -0,0 +1,61 @@ +package com.bethibande.repository.cache; + +import com.bethibande.repository.k8s.KubernetesSupport; +import com.github.benmanes.caffeine.cache.Cache; +import io.vertx.core.Future; +import io.vertx.core.buffer.Buffer; +import io.vertx.ext.web.client.HttpResponse; +import jakarta.enterprise.context.ApplicationScoped; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@ApplicationScoped +public class DistributedCacheRegistry { + + private static final Logger LOGGER = LoggerFactory.getLogger(DistributedCacheRegistry.class); + + private final Map> caches = new HashMap<>(); + + private final KubernetesSupport kubernetesSupport; + + public DistributedCacheRegistry(final KubernetesSupport kubernetesSupport) { + this.kubernetesSupport = kubernetesSupport; + } + + public void register(final String name, final Cache cache) { + this.caches.put(name, cache); + } + + /** + * Will invalidate the given key in the given cache only locally. This request will not propagate to other instances. + * It's used internally by the {@link #invalidateAll(String, String)} logic + * @param cache The registered name of the cache + * @param key The key to invalidate + */ + public void invalidateLocal(final String cache, final String key) { + final Cache localCache = this.caches.get(cache); + if (localCache != null) localCache.invalidate(key); + } + + /** + * Will invalidate the given key in the given cache across all known instances. + * @param cache The registered name of the cache + * @param key The key you wish to invalidate + */ + public void invalidateAll(final String cache, final String key) { + if (!this.kubernetesSupport.isServiceDiscoveryEnabled()) return; + + final List>> futures = this.kubernetesSupport.broadcastHttp( + (baseUrl, webClient) -> webClient.delete(baseUrl + "/api/v1/cache/" + cache + "/" + key).send() + ); + + for (int i = 0; i < futures.size(); i++) { + futures.get(i).onFailure(ex -> LOGGER.error("Failed to invalidate key {} for cache {}", key, cache, ex)); + } + } + +} diff --git a/server/src/main/java/com/bethibande/repository/jobs/BuiltinJobScheduler.java b/server/src/main/java/com/bethibande/repository/jobs/BuiltinJobScheduler.java index 644a647..1da4809 100644 --- a/server/src/main/java/com/bethibande/repository/jobs/BuiltinJobScheduler.java +++ b/server/src/main/java/com/bethibande/repository/jobs/BuiltinJobScheduler.java @@ -3,7 +3,6 @@ import com.bethibande.repository.jobs.impl.JobTask; import com.bethibande.repository.k8s.KubernetesLeaderService; import com.bethibande.repository.k8s.KubernetesSupport; -import com.bethibande.repository.util.HttpClientUtil; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import io.quarkus.narayana.jta.QuarkusTransaction; @@ -11,11 +10,10 @@ import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; -import java.net.http.HttpResponse; import java.time.Instant; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; -import java.util.function.Function; @ApplicationScoped public class BuiltinJobScheduler { @@ -36,15 +34,15 @@ public class BuiltinJobScheduler { @VirtualThreads protected Executor executor; - public CompletableFuture runOnce(final JobTask task, final C config) { + public CompletionStage runOnce(final JobTask task, final C config) { return this.schedule(task, config, "* * * * *", true, true); } - public CompletableFuture schedule(final JobTask task, - final C config, - final String cron, - final boolean immediate, - final boolean deleteAfterRun) { + public CompletionStage schedule(final JobTask task, + final C config, + final String cron, + final boolean immediate, + final boolean deleteAfterRun) { final String configJson; try { configJson = this.objectMapper.writeValueAsString(config); @@ -53,39 +51,54 @@ public CompletableFuture schedule(final JobTask task, } if (this.kubernetesSupport.isEnabled() && this.kubernetesSupport.isServiceDiscoveryEnabled()) { - final ScheduledJobDTOWithoutId body = new ScheduledJobDTOWithoutId( - task.getJobType(), - configJson, - cron, - deleteAfterRun - ); - - return CompletableFuture.supplyAsync( - () -> this.kubernetesLeaderService.sendHTTPRequestToLeader( - "/api/v1/job/schedule?now=%s".formatted(immediate), - HttpResponse.BodyHandlers.discarding(), - builder -> builder.method("POST", HttpClientUtil.jsonBodyPublisher(body)) - .header("Content-Type", "application/json") - ).thenApply(response -> { - if (response.statusCode() != 200) { - throw new RuntimeException("Failed to schedule job on leader node, status: " + response.statusCode() + " - " + response.body()); - } - return null; - }), this.executor).thenCompose(Function.identity()); + return scheduleRemote(configJson, task, cron, immediate, deleteAfterRun); } else { - final ScheduledJob job = new ScheduledJob(); - job.type = task.getJobType(); - job.settings = configJson; - job.cronSchedule = cron; - job.deleteAfterRun = deleteAfterRun; - job.nextRunAt = immediate ? Instant.now() : null; - - return CompletableFuture.runAsync(() -> { - QuarkusTransaction.requiringNew().run(job::persist); - - scheduler.schedule(job, Instant.now()); - }, this.executor); + return scheduleLocal(configJson, task, cron, immediate, deleteAfterRun); } } + private CompletionStage scheduleRemote(final String configJson, + final JobTask task, + final String cron, + final boolean immediate, + final boolean deleteAfterRun) { + final ScheduledJobDTOWithoutId body = new ScheduledJobDTOWithoutId( + task.getJobType(), + configJson, + cron, + deleteAfterRun + ); + + return this.kubernetesLeaderService.sendHTTPRequestToLeader( + (baseUrl, webClient) -> webClient.post(baseUrl + "/api/v1/job/schedule?now=%s".formatted(immediate)) + .putHeader("Content-Type", "application/json") + .sendJson(body) + ).map(response -> { + if (response.statusCode() != 200) { + throw new RuntimeException("Failed to schedule job on leader node, status: " + response.statusCode() + " - " + response.bodyAsString()); + } + + return null; + }).toCompletionStage(); + } + + private CompletableFuture scheduleLocal(final String configJson, + final JobTask task, + final String cron, + final boolean immediate, + final boolean deleteAfterRun) { + final ScheduledJob job = new ScheduledJob(); + job.type = task.getJobType(); + job.settings = configJson; + job.cronSchedule = cron; + job.deleteAfterRun = deleteAfterRun; + job.nextRunAt = immediate ? Instant.now() : null; + + return CompletableFuture.runAsync(() -> { + QuarkusTransaction.requiringNew().run(job::persist); + + scheduler.schedule(job, Instant.now()); + }, this.executor); + } + } diff --git a/server/src/main/java/com/bethibande/repository/k8s/KubernetesLeaderService.java b/server/src/main/java/com/bethibande/repository/k8s/KubernetesLeaderService.java index e447fe8..5a41a24 100644 --- a/server/src/main/java/com/bethibande/repository/k8s/KubernetesLeaderService.java +++ b/server/src/main/java/com/bethibande/repository/k8s/KubernetesLeaderService.java @@ -7,6 +7,10 @@ import io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.LeaseLock; import io.quarkus.runtime.StartupEvent; import io.quarkus.virtual.threads.VirtualThreads; +import io.vertx.core.Future; +import io.vertx.core.buffer.Buffer; +import io.vertx.ext.web.client.HttpResponse; +import io.vertx.ext.web.client.WebClient; import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.event.Observes; import jakarta.inject.Inject; @@ -16,16 +20,14 @@ import java.net.InetAddress; import java.net.URI; -import java.net.http.HttpRequest; -import java.net.http.HttpResponse; import java.time.Duration; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.LockSupport; +import java.util.function.BiFunction; import java.util.function.Consumer; @ApplicationScoped @@ -152,19 +154,13 @@ private void onNewLeader(final String leader) { post(callback -> callback.onNewLeader(leader)); } - public CompletableFuture> sendHTTPRequestToLeader(final String path, - final HttpResponse.BodyHandler bodyHandler, - final Consumer customizer) { + public Future> sendHTTPRequestToLeader(final BiFunction>> fn) { final InetAddress address = this.kubernetesSupport.podNameToClusterIP(this.leader); final String host = this.kubernetesSupport.addressToHostname(address); - final URI uri = URI.create("http://%s:%d%s".formatted(host, this.managementPort, path)); - final HttpRequest.Builder builder = HttpRequest.newBuilder() - .uri(uri); + final String baseUrl = "http://%s:%d".formatted(host, this.managementPort); - customizer.accept(builder); - - return this.kubernetesSupport.httpClient.sendAsync(builder.build(), bodyHandler); + return fn.apply(baseUrl, this.kubernetesSupport.webClient); } } diff --git a/server/src/main/java/com/bethibande/repository/k8s/KubernetesSupport.java b/server/src/main/java/com/bethibande/repository/k8s/KubernetesSupport.java index 78407f8..8ba86f2 100644 --- a/server/src/main/java/com/bethibande/repository/k8s/KubernetesSupport.java +++ b/server/src/main/java/com/bethibande/repository/k8s/KubernetesSupport.java @@ -16,6 +16,10 @@ import io.fabric8.kubernetes.client.KubernetesClientBuilder; import io.fabric8.kubernetes.client.readiness.Readiness; import io.quarkus.runtime.Startup; +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.ext.web.client.WebClient; import jakarta.annotation.PostConstruct; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; @@ -25,17 +29,12 @@ import java.net.Inet6Address; import java.net.InetAddress; -import java.net.URI; -import java.net.http.HttpClient; -import java.net.http.HttpRequest; -import java.net.http.HttpResponse; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; +import java.util.function.BiFunction; @Startup @ApplicationScoped @@ -52,6 +51,9 @@ public class KubernetesSupport { @Inject protected KubernetesClient client; + @Inject + protected Vertx vertx; + private String namespace; @ConfigProperty(name = "repository.scheduler.discovery-service") @@ -66,7 +68,7 @@ public class KubernetesSupport { @ConfigProperty(name = "repository.distributed") protected boolean distributedAllowed; - protected final HttpClient httpClient = HttpClient.newHttpClient(); + protected WebClient webClient; protected boolean kubernetesSupport = true; protected boolean serviceDiscovery = false; @@ -127,11 +129,13 @@ && hasPermission("watch", LEASE_NAME, COORDINATION_API_GROUP) this.canInspectServices = hasPermission("get", "services", ""); + this.webClient = WebClient.create(this.vertx); + initDiscovery(); } protected void initDiscovery() { - if (!this.canListPods() || !this.canInspectServices() || !this.distributedAllowed) return; + if (!this.canListPods() || !this.canInspectServices() || !this.distributedAllowed) return; final Service service = this.client.services() .inNamespace(getNamespace()) @@ -310,24 +314,21 @@ public String addressToHostname(final InetAddress address) { } /** - * Broadcasts an HTTP request to the management port of all replicas - * @param path The request path i. e. /api/v1... + * Broadcasts an HTTP request to the management port of all replicas. + * The function will supply the base URL for each endpoint and the webclient that should be used to send the request. */ - public List>> broadcastHttp(final String path, final Consumer requestBuilder) { + public List>> broadcastHttp(final BiFunction>> fn) { if (!this.serviceDiscovery) return Collections.emptyList(); - final List>> futures = new ArrayList<>(); + final List>> futures = new ArrayList<>(); final List addresses = getAllPodClusterIPs(); for (int i = 0; i < addresses.size(); i++) { final InetAddress address = addresses.get(i); final String hostname = addressToHostname(address); - final URI uri = URI.create("http://%s:%d%s".formatted(hostname, this.managementPort, path)); - final HttpRequest.Builder builder = HttpRequest.newBuilder() - .uri(uri); + final String baseUrl = "http://%s:%d".formatted(hostname, this.managementPort); - requestBuilder.accept(builder); - futures.add(this.httpClient.sendAsync(builder.build(), HttpResponse.BodyHandlers.discarding())); + futures.add(fn.apply(baseUrl, this.webClient)); } return futures; diff --git a/server/src/main/java/com/bethibande/repository/mail/MailerService.java b/server/src/main/java/com/bethibande/repository/mail/MailerService.java index a58cdef..5627918 100644 --- a/server/src/main/java/com/bethibande/repository/mail/MailerService.java +++ b/server/src/main/java/com/bethibande/repository/mail/MailerService.java @@ -14,7 +14,6 @@ import jakarta.inject.Inject; import jakarta.transaction.Transactional; -import java.net.http.HttpRequest; import java.util.Comparator; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -158,8 +157,7 @@ public void updateConfig(final SMTPConfig config) { if (this.kubernetesSupport.isServiceDiscoveryEnabled()) { this.kubernetesSupport.broadcastHttp( - "/api/v1/mail/config/update", - builder -> builder.method("PUT", HttpRequest.BodyPublishers.noBody()) + (baseURL, webClient) -> webClient.put(baseURL + "/api/v1/mail/config/update").send() ); } } diff --git a/server/src/main/java/com/bethibande/repository/security/UserSessionService.java b/server/src/main/java/com/bethibande/repository/security/UserSessionService.java index f5de4e5..39c174c 100644 --- a/server/src/main/java/com/bethibande/repository/security/UserSessionService.java +++ b/server/src/main/java/com/bethibande/repository/security/UserSessionService.java @@ -1,8 +1,11 @@ package com.bethibande.repository.security; +import com.bethibande.repository.cache.DistributedCacheRegistry; import com.bethibande.repository.jpa.security.RefreshToken; import com.bethibande.repository.jpa.security.UserSession; import com.bethibande.repository.jpa.user.User; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; import io.quarkus.cache.CacheInvalidate; import io.quarkus.cache.CacheResult; import jakarta.enterprise.context.ApplicationScoped; @@ -21,16 +24,29 @@ public class UserSessionService { public static final Duration SESSION_LIFETIME = Duration.ofHours(1); public static final Duration REFRESH_TOKEN_LIFETIME = Duration.ofDays(7); + private final Cache userSessionCache = Caffeine.newBuilder() + .expireAfterWrite(SESSION_LIFETIME) + .maximumSize(1_000) + .build(); + + private final DistributedCacheRegistry cacheRegistry; + + public UserSessionService(final DistributedCacheRegistry cacheRegistry) { + this.cacheRegistry = cacheRegistry; + + cacheRegistry.register(USER_SESSION_CACHE, this.userSessionCache); + } + @Transactional - @CacheResult(cacheName = USER_SESSION_CACHE) public UserSession getSessionByToken(final String token) { - return UserSession.find("token = ?1", token).firstResult(); + return this.userSessionCache.get(token, _ -> UserSession.find("token = ?1", token).firstResult()); } @Transactional - @CacheInvalidate(cacheName = USER_SESSION_CACHE) public void invalidateSession(final String token) { UserSession.delete("token = ?1", token); + + this.cacheRegistry.invalidateAll(USER_SESSION_CACHE, token); } public boolean isValid(final UserSession session) { @@ -49,7 +65,6 @@ public UserSession createSessionForUser(final User user) { return session; } - @Transactional public RefreshToken createRefreshTokenForUser(final User user) { final RefreshToken token = new RefreshToken(); diff --git a/server/src/main/java/com/bethibande/repository/web/api/AuthenticationEndpoint.java b/server/src/main/java/com/bethibande/repository/web/api/AuthenticationEndpoint.java index 38f2f03..e561d0e 100644 --- a/server/src/main/java/com/bethibande/repository/web/api/AuthenticationEndpoint.java +++ b/server/src/main/java/com/bethibande/repository/web/api/AuthenticationEndpoint.java @@ -31,7 +31,7 @@ import java.time.Duration; import java.time.Instant; import java.util.Objects; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; @ApplicationScoped @Path("/api/v1/auth") @@ -70,7 +70,7 @@ protected boolean isDevMode() { @PermitAll @Path("/reset-request") @RateLimited(bucket = "password-reset", identityResolver = IpResolver.class) - public CompletableFuture resetPassword(final @QueryParam("email") String email) { + public CompletionStage resetPassword(final @QueryParam("email") String email) { return this.builtinJobScheduler.runOnce( this.passwordResetTask, new PasswordResetTask.Config(email) @@ -95,8 +95,10 @@ public record PasswordResetCredentials( @RateLimited(bucket = "password-reset", identityResolver = IpResolver.class) public Response resetPassword(final PasswordResetCredentials credentials) { final PasswordResetToken token = PasswordResetToken.find("token = ?1", credentials.token).firstResult(); - if (token == null || token.isExpired(Instant.now())) return Response.status(Response.Status.BAD_REQUEST).build(); - if (!token.user.email.equalsIgnoreCase(credentials.email)) return Response.status(Response.Status.BAD_REQUEST).build(); + if (token == null || token.isExpired(Instant.now())) + return Response.status(Response.Status.BAD_REQUEST).build(); + if (!token.user.email.equalsIgnoreCase(credentials.email)) + return Response.status(Response.Status.BAD_REQUEST).build(); final User user = token.user; user.password = BcryptUtil.bcryptHash(credentials.newPassword); @@ -153,8 +155,11 @@ public Response refresh(final @CookieParam(REFRESH_TOKEN_COOKIE_NAME) String ref final Instant now = Instant.now(); if (token == null || token.isExpired(now)) throw new BadRequestException("Invalid refresh token"); - token.delete(); + + final UserSession session = identity.getAttribute(SecurityAttributes.SESSION); + if (session != null) userSessionService.invalidateSession(session.token); // Invalidate current session + return doLogin(token.user); } diff --git a/server/src/main/java/com/bethibande/repository/web/api/CacheEndpoint.java b/server/src/main/java/com/bethibande/repository/web/api/CacheEndpoint.java new file mode 100644 index 0000000..b697eee --- /dev/null +++ b/server/src/main/java/com/bethibande/repository/web/api/CacheEndpoint.java @@ -0,0 +1,23 @@ +package com.bethibande.repository.web.api; + +import com.bethibande.repository.cache.DistributedCacheRegistry; +import jakarta.annotation.security.RolesAllowed; +import jakarta.inject.Inject; +import jakarta.ws.rs.DELETE; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.PathParam; + +@Path("/api/v1/cache") +@RolesAllowed("SYSTEM") +public class CacheEndpoint { + + @Inject + protected DistributedCacheRegistry distributedCacheRegistry; + + @DELETE + @Path("/{cache}/{key}") + public void delete(final @PathParam("cache") String cache, final @PathParam("key") String key) { + this.distributedCacheRegistry.invalidateLocal(cache, key); + } + +}