From 3c9a04c2821edfdbe35d3f7005305ac58f05dffc Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Mon, 4 May 2026 17:58:09 -0700 Subject: [PATCH 1/2] [feat][broker] PIP-468: sealed-segment retention GC for scalable topics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit After a split/merge the parent segment is sealed and accepts no further writes; eventually its data ages out. There was no mechanism to actually delete it. This adds: 1. Single owner of segment-topic deletion. The v4 inactive-topic GC would otherwise race the controller for whole-topic deletion of segment-backing topics. PersistentTopic#checkGC now early-returns when the topic is in the segment:// domain — the controller is the sole lifecycle owner. 2. Periodic GC tick on the ScalableTopicController leader. Each tick: - Resolves the effective retention from topic-policies → namespace policy → broker default. Negative ⇒ keep forever, tick is a no-op. - Picks sealed segments where (now - sealedAtMs) >= retentionMs. - For each candidate, polls every existing subscription's backlog on that segment via the existing /segments/.../backlog admin endpoint. All-zero ⇒ prunable. - CAS-prunes the layout (re-validating against the latest layout to handle a concurrent prune by a former leader gracefully), reloads, notifies subscriptions, then deletes the backing topic via admin.topics().deleteAsync(force=true). - Layout-prune is the point of no return; backing-topic delete is best-effort and retried on subsequent ticks. The clock is injectable (java.time.Clock) so tests can fast-forward past retention deterministically. splitSegment/mergeSegments now read the wall-clock through the same Clock so that test ticks compute consistent ages. Tests: - testGcTickPrunesDrainedSealedSegmentPastRetention — split, tick within retention (no prune), advance past retention, tick again, assert pruned + delete called. - testGcTickRespectsKeepForeverRetention — negative retention leaves the segment in place even after a year of clock advance. --- .../service/persistent/PersistentTopic.java | 7 + .../scalable/ScalableTopicController.java | 300 +++++++++++++++++- .../scalable/ScalableTopicControllerTest.java | 142 +++++++++ 3 files changed, 447 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 2e76254dc97ab..bf75ff3209c30 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -3536,6 +3536,13 @@ public boolean isReplicationBacklogExist() { @Override public void checkGC() { + if (TopicName.get(topic).isSegment()) { + // Segment-backing topics are owned by ScalableTopicController; its GC tick + // decides when a sealed segment is drained + retention-expired and deletes + // the topic. Letting v4 inactive-topic-GC race against that would risk the + // broker quietly tearing down a segment the controller still considers live. + return; + } if (!isDeleteWhileInactive()) { // This topic is not included in GC return; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java index 4a7b15a1e152b..f6a7918a90e71 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java @@ -19,20 +19,30 @@ package org.apache.pulsar.broker.service.scalable; import io.github.merlimat.slog.Logger; +import java.time.Clock; import java.time.Duration; +import java.util.ArrayList; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import lombok.Getter; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.resources.ScalableTopicMetadata; import org.apache.pulsar.broker.resources.ScalableTopicResources; import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.TopicPoliciesService; import org.apache.pulsar.broker.service.TransportCnx; import org.apache.pulsar.common.api.proto.ScalableConsumerType; +import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.scalable.HashRange; import org.apache.pulsar.common.scalable.SegmentInfo; import org.apache.pulsar.common.scalable.SegmentTopicName; @@ -53,17 +63,27 @@ public class ScalableTopicController { private static final Logger LOG = Logger.get(ScalableTopicController.class); private final Logger log; + /** Default cadence for the sealed-segment GC tick on the leader. */ + static final Duration DEFAULT_GC_INTERVAL = Duration.ofSeconds(60); + @Getter private final TopicName topicName; private final ScalableTopicResources resources; private final BrokerService brokerService; private final LeaderElection leaderElection; + /** Wall-clock source used for sealed-segment retention math. Tests override. */ + private final Clock clock; + /** Cadence of the GC tick. Tests override to a small value. */ + private final Duration gcInterval; private volatile SegmentLayout currentLayout; /** Per-subscription consumer tracking. */ private final ConcurrentHashMap subscriptions = new ConcurrentHashMap<>(); + /** Sealed-segment GC scheduled task. Non-null only while this broker is leader. */ + private volatile ScheduledFuture gcTask; + @Getter private volatile LeaderElectionState leaderState = LeaderElectionState.NoLeader; @@ -73,9 +93,24 @@ public class ScalableTopicController { ScalableTopicResources resources, BrokerService brokerService, CoordinationService coordinationService) { + this(topicName, resources, brokerService, coordinationService, + Clock.systemUTC(), DEFAULT_GC_INTERVAL); + } + + /** + * Test constructor: overrides the wall-clock source and the GC tick cadence. + */ + ScalableTopicController(TopicName topicName, + ScalableTopicResources resources, + BrokerService brokerService, + CoordinationService coordinationService, + Clock clock, + Duration gcInterval) { this.topicName = topicName; this.resources = resources; this.brokerService = brokerService; + this.clock = clock; + this.gcInterval = gcInterval; this.log = LOG.with().attr("topic", topicName).build(); this.leaderElection = coordinationService.getLeaderElection( String.class, @@ -90,6 +125,12 @@ public class ScalableTopicController { */ private void onLeaderStateChange(LeaderElectionState state) { log.info().attr("state", state).log("Leader state change for scalable topic"); + if (state != LeaderElectionState.Leading) { + // Stepped down (or never was leader). Stop the GC tick so the deposed leader + // doesn't race the new one on layout writes / backing-topic deletes. The new + // leader's initialize() will reschedule. + cancelGcTask(); + } if (state == LeaderElectionState.NoLeader && !closed) { initialize().exceptionally(ex -> { log.warn().exceptionMessage(ex).log("Failed to re-elect after NoLeader"); @@ -119,12 +160,52 @@ public CompletableFuture initialize() { }) .thenCompose(__ -> { if (isLeader()) { + scheduleGcTask(); return restoreSessionsFromStore(); } return CompletableFuture.completedFuture(null); }); } + /** + * Schedule the periodic sealed-segment GC tick. Only fires on the controller leader; + * idempotent (re-entry just no-ops). Cancelled on close / leader-loss. + */ + private synchronized void scheduleGcTask() { + if (closed || gcTask != null) { + return; + } + gcTask = scheduler().scheduleAtFixedRate(this::runGcTickSafely, + gcInterval.toMillis(), gcInterval.toMillis(), TimeUnit.MILLISECONDS); + } + + private synchronized void cancelGcTask() { + if (gcTask != null) { + gcTask.cancel(false); + gcTask = null; + } + } + + private ScheduledExecutorService scheduler() { + return brokerService.getPulsar().getExecutor(); + } + + private void runGcTickSafely() { + if (!isLeader() || closed) { + return; + } + try { + runGcTickAsync().exceptionally(ex -> { + log.warn().exceptionMessage(ex).log("Scalable-topic GC tick failed"); + return null; + }); + } catch (Throwable t) { + // Defensive: scheduleAtFixedRate would suppress the next firing if a tick + // throws synchronously, so log and swallow here. + log.warn().exception(t).log("Scalable-topic GC tick threw"); + } + } + /** * Load persisted subscriptions and consumer registrations from the metadata store and * install them into per-subscription {@link SubscriptionCoordinator} instances. Called @@ -288,7 +369,7 @@ public CompletableFuture splitSegment(long segmentId) { // Single timestamp shared by the local preview and the CAS-retried metadata update, // so the children's createdAtMs and the parent's sealedAtMs always agree even if the // CAS retries due to concurrent writers. - final long nowMs = System.currentTimeMillis(); + final long nowMs = clock.millis(); // Compute the new layout locally to derive child segment info SegmentLayout newLayout = currentLayout.splitSegment(segmentId, nowMs); @@ -337,7 +418,7 @@ public CompletableFuture mergeSegments(long segmentId1, long segm // Single timestamp shared by the local preview and the CAS-retried metadata // update — see splitSegment for the rationale. - final long nowMs = System.currentTimeMillis(); + final long nowMs = clock.millis(); // Compute the new layout locally to derive merged segment info SegmentLayout newLayout = currentLayout.mergeSegments(segmentId1, segmentId2, nowMs); @@ -619,10 +700,225 @@ public CompletableFutureThe retention window is resolved from topic-policies on the parent + * {@code topic://...} → namespace policy → broker default, the same precedence + * Pulsar uses for regular topics. + * + *

Visible for tests; in production it's invoked by the scheduled task. + */ + CompletableFuture runGcTickAsync() { + if (!isLeader() || closed) { + return CompletableFuture.completedFuture(null); + } + final SegmentLayout layout = currentLayout; + if (layout == null) { + return CompletableFuture.completedFuture(null); + } + + // Candidates: sealed segments past their retention horizon. We resolve + // retention once per tick — cheap, and avoids per-segment policy lookups. + return resolveRetentionMillisAsync() + .thenCompose(retentionMs -> { + if (retentionMs == null) { + // Negative / unset → retain forever. No GC this tick. + return CompletableFuture.completedFuture(null); + } + long now = clock.millis(); + List candidates = new ArrayList<>(); + for (SegmentInfo seg : layout.getAllSegments().values()) { + if (seg.isSealed() && seg.sealedAtMs() > 0 + && (now - seg.sealedAtMs()) >= retentionMs) { + candidates.add(seg); + } + } + if (candidates.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + return pruneEligibleAsync(candidates); + }); + } + + /** + * For each candidate sealed segment, check that every existing subscription has + * drained it (backlog == 0); prune the ones that pass. Failures on individual + * segments are logged and skipped — the next tick retries. + */ + private CompletableFuture pruneEligibleAsync(List candidates) { + return resources.listSubscriptionsAsync(topicName) + .thenCompose(subs -> { + List> perSegment = new ArrayList<>(); + for (SegmentInfo seg : candidates) { + perSegment.add(prunable(seg, subs) + .thenCompose(prunable -> prunable + ? pruneSegmentAsync(seg) + : CompletableFuture.completedFuture(null)) + .exceptionally(ex -> { + log.warn().attr("segmentId", seg.segmentId()) + .exceptionMessage(ex) + .log("GC: failed to evaluate / prune segment;" + + " will retry on next tick"); + return null; + })); + } + return CompletableFuture.allOf( + perSegment.toArray(CompletableFuture[]::new)); + }); + } + + private CompletableFuture prunable(SegmentInfo seg, List subs) { + if (subs.isEmpty()) { + // No subscribers ever attached / all unsubscribed → nothing left to drain. + return CompletableFuture.completedFuture(true); + } + CompletableFuture[] checks = subs.stream() + .map(sub -> isSegmentDrained(seg, sub)) + .toArray(CompletableFuture[]::new); + return CompletableFuture.allOf(checks) + .thenApply(__ -> { + for (CompletableFuture c : checks) { + if (!c.join()) { + return false; + } + } + return true; + }); + } + + /** + * Apply a single segment's prune: CAS the layout to drop it, then delete the + * backing managed-ledger topic. Layout-prune is the point of no return — once it + * lands, no consumer will ever subscribe to the segment again. Backing-topic + * delete failures are logged and retried on subsequent ticks. + */ + private CompletableFuture pruneSegmentAsync(SegmentInfo seg) { + long segmentId = seg.segmentId(); + log.info().attr("segmentId", segmentId) + .attr("sealedAtMs", seg.sealedAtMs()) + .log("GC: pruning sealed segment past retention"); + + return resources.updateScalableTopicAsync(topicName, md -> { + SegmentLayout latest = SegmentLayout.fromMetadata(md); + // Re-validate against the latest layout: another writer may have already + // pruned this segment, or replaced it via a follow-up split / merge. If + // it's gone, leave the metadata untouched (no-op CAS). + if (!latest.getAllSegments().containsKey(segmentId)) { + return md; + } + SegmentLayout updated = latest.pruneSegment(segmentId); + return updated.toMetadata(md.getProperties()); + }).thenCompose(__ -> resources.getScalableTopicMetadataAsync(topicName, true)) + .thenCompose(optMd -> { + currentLayout = SegmentLayout.fromMetadata(optMd.orElseThrow()); + return notifySubscriptions(currentLayout); + }) + .thenCompose(__ -> deleteSegmentBackingTopic(seg)) + .thenAccept(__ -> log.info().attr("segmentId", segmentId) + .log("GC: segment pruned + backing topic deleted")); + } + + private CompletableFuture deleteSegmentBackingTopic(SegmentInfo seg) { + String name = toSegmentUnderlyingPersistentName(seg); + try { + return brokerService.getPulsar().getAdminClient() + .topics().deleteAsync(name, /* force */ true) + .exceptionally(ex -> { + Throwable cause = + org.apache.pulsar.common.util.FutureUtil.unwrapCompletionException(ex); + if (cause instanceof org.apache.pulsar.client.admin.PulsarAdminException + .NotFoundException) { + // Already gone — fine. + return null; + } + log.warn().attr("segment", name).exceptionMessage(cause) + .log("GC: failed to delete backing segment topic;" + + " will retry on next tick"); + return null; + }); + } catch (PulsarServerException e) { + return CompletableFuture.failedFuture(e); + } + } + + /** + * Resolve the effective retention-time-in-millis for this scalable topic by + * layering: topic-policy on the parent {@code topic://...} → namespace policy → + * broker config default. Returns {@code null} if retention is unset or negative + * (= keep forever) — the GC tick treats that as "skip". + */ + private CompletableFuture resolveRetentionMillisAsync() { + TopicPoliciesService topicPoliciesService = + brokerService.getPulsar().getTopicPoliciesService(); + // Topic-level (override) layer. + return topicPoliciesService.getTopicPoliciesAsync(topicName, + TopicPoliciesService.GetType.LOCAL_ONLY) + .thenCompose(localOpt -> { + Optional rp = localOpt + .map(TopicPolicies::getRetentionPolicies) + .filter(java.util.Objects::nonNull); + if (rp.isPresent()) { + return CompletableFuture.completedFuture(toRetentionMillis(rp.get())); + } + // Namespace layer. + NamespaceName ns = topicName.getNamespaceObject(); + return brokerService.getPulsar().getPulsarResources() + .getNamespaceResources() + .getPoliciesAsync(ns) + .thenApply(nsOpt -> { + RetentionPolicies nsRp = nsOpt + .map(p -> p.retention_policies) + .orElse(null); + if (nsRp != null) { + return toRetentionMillis(nsRp); + } + return defaultRetentionMillisFromBrokerConfig(); + }); + }); + } + + private static Long toRetentionMillis(RetentionPolicies rp) { + if (rp.getRetentionTimeInMinutes() < 0) { + return null; // keep forever + } + return TimeUnit.MINUTES.toMillis(rp.getRetentionTimeInMinutes()); + } + + private Long defaultRetentionMillisFromBrokerConfig() { + var conf = brokerService.getPulsar().getConfig(); + if (conf == null) { + return null; + } + int min = conf.getDefaultRetentionTimeInMinutes(); + return min < 0 ? null : TimeUnit.MINUTES.toMillis(min); + } + + /** Test hook: count of sealed segments currently in the layout. */ + int sealedSegmentCount() { + SegmentLayout layout = currentLayout; + if (layout == null) { + return 0; + } + int n = 0; + for (SegmentInfo s : layout.getAllSegments().values()) { + if (s.isSealed()) { + n++; + } + } + return n; + } + // --- Lifecycle --- public CompletableFuture close() { closed = true; + cancelGcTask(); // Stop each coordinator's drain poller before clearing — otherwise the scheduler // task keeps running after the controller goes away. subscriptions.values().forEach(SubscriptionCoordinator::close); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java index 1ebd3f13ed085..2416b27d97a8b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java @@ -148,6 +148,13 @@ private ScalableTopicController newController(TopicName tn) { return new ScalableTopicController(tn, resources, brokerService, coordinationService); } + private ScalableTopicController newControllerWithClock(TopicName tn, + java.time.Clock clock, + java.time.Duration gcInterval) { + return new ScalableTopicController(tn, resources, brokerService, coordinationService, + clock, gcInterval); + } + // --- initialize() --- @Test @@ -449,6 +456,141 @@ public void testCloseReleasesLeadership() throws Exception { } } + // --- Sealed-segment GC --- + + /** + * After a split, the parent is sealed; with no live subscriptions and a small + * configured retention the GC tick should prune the parent from the layout and + * delete its backing topic. + */ + @Test + public void testGcTickPrunesDrainedSealedSegmentPastRetention() throws Exception { + // Inject GC-related mocks (topic-policies + namespace policies + delete). + installGcMocks(/* nsRetentionMinutes */ 1); + + // Use a controllable clock so we can fast-forward past the retention window. + long startMs = 1_700_000_000_000L; + AdjustableClock clock = new AdjustableClock(startMs); + if (controller != null) { + controller.close().join(); + } + controller = newControllerWithClock(topicName, clock, + java.time.Duration.ofHours(1)); // GC interval irrelevant — we drive ticks manually + controller.initialize().get(); + + int sealedBefore = controller.sealedSegmentCount(); + // Split segment 0 → seg 0 sealed at startMs, children created at startMs. + controller.splitSegment(0).get(); + assertEquals(controller.sealedSegmentCount(), sealedBefore + 1); + + // Tick at the seal time — retention not yet elapsed; nothing pruned. + controller.runGcTickAsync().get(); + assertTrue(controller.getLayout().get().getAllSegments().containsKey(0L), + "tick within retention window must not prune"); + + // Fast-forward past 1 minute; tick should now prune segment 0. + clock.set(startMs + java.util.concurrent.TimeUnit.MINUTES.toMillis(1) + 1_000L); + controller.runGcTickAsync().get(); + assertFalse(controller.getLayout().get().getAllSegments().containsKey(0L), + "tick past retention must prune the sealed segment"); + // Backing topic delete was issued. + verify(topics).deleteAsync(anyString(), anyBoolean()); + } + + /** + * If retention is set to "keep forever" (negative value), the GC tick is a no-op + * even for sealed + drained segments. + */ + @Test + public void testGcTickRespectsKeepForeverRetention() throws Exception { + installGcMocks(/* nsRetentionMinutes */ -1); + + long now = 1_700_000_000_000L; + if (controller != null) { + controller.close().join(); + } + java.time.Clock fixed = java.time.Clock.fixed( + java.time.Instant.ofEpochMilli(now + 365L * 86_400_000L), + java.time.ZoneOffset.UTC); + controller = newControllerWithClock(topicName, fixed, java.time.Duration.ofHours(1)); + controller.initialize().get(); + controller.splitSegment(0).get(); + + controller.runGcTickAsync().get(); + assertTrue(controller.getLayout().get().getAllSegments().containsKey(0L), + "negative retention must keep sealed segments forever"); + } + + /** Settable {@link java.time.Clock} for the GC tick tests. */ + private static final class AdjustableClock extends java.time.Clock { + private volatile long nowMs; + + AdjustableClock(long initialMs) { + this.nowMs = initialMs; + } + + void set(long nowMs) { + this.nowMs = nowMs; + } + + @Override + public java.time.ZoneId getZone() { + return java.time.ZoneOffset.UTC; + } + + @Override + public java.time.Clock withZone(java.time.ZoneId zone) { + return this; // tests don't care about zone + } + + @Override + public java.time.Instant instant() { + return java.time.Instant.ofEpochMilli(nowMs); + } + + @Override + public long millis() { + return nowMs; + } + } + + /** + * Wire up the mocks the GC tick needs: empty topic-policies (so retention falls + * through to namespace), a namespace policy with the requested retention, a + * "drained" segment-backlog response (cursor at end), and a successful topic + * delete admin call. + */ + @SuppressWarnings("unchecked") + private void installGcMocks(int nsRetentionMinutes) { + // Topic-policies service: no policies set on the topic. + var tps = mock(org.apache.pulsar.broker.service.TopicPoliciesService.class); + when(pulsar.getTopicPoliciesService()).thenReturn(tps); + when(tps.getTopicPoliciesAsync(any(), + any(org.apache.pulsar.broker.service.TopicPoliciesService.GetType.class))) + .thenReturn(CompletableFuture.completedFuture(Optional.empty())); + + // Namespace policy with the requested retention. Wired through PulsarResources. + var pulsarResources = mock(org.apache.pulsar.broker.resources.PulsarResources.class); + var namespaceResources = mock(org.apache.pulsar.broker.resources.NamespaceResources.class); + when(pulsar.getPulsarResources()).thenReturn(pulsarResources); + when(pulsarResources.getNamespaceResources()).thenReturn(namespaceResources); + org.apache.pulsar.common.policies.data.Policies nsPolicies = + new org.apache.pulsar.common.policies.data.Policies(); + nsPolicies.retention_policies = + new org.apache.pulsar.common.policies.data.RetentionPolicies( + nsRetentionMinutes, /* sizeMB */ -1); + when(namespaceResources.getPoliciesAsync(any())) + .thenReturn(CompletableFuture.completedFuture(Optional.of(nsPolicies))); + + // No active subscriptions on the parent → nothing to drain → segment is + // immediately considered prunable on retention expiry. + // (resources.listSubscriptionsAsync returns [] from the in-memory store by default.) + + // Backing-topic delete succeeds. + when(topics.deleteAsync(anyString(), anyBoolean())) + .thenReturn(CompletableFuture.completedFuture(null)); + } + // --- ConsumerRegistration record sanity --- @Test From 3b1dc1a5ccbaf7ea538c0e29eff760720aae1ac8 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Tue, 5 May 2026 14:59:50 -0700 Subject: [PATCH 2/2] [improve][broker] PIP-468: address review on segment GC MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Coalesce all eligible prunes for a tick into a single CAS write on the metadata znode (was: one CAS per segment, with retry storms when more than one or two were eligible). The metadata-reload + subscription notify also collapse to a single round-trip. - Re-check isLeader() (and closed) just before the CAS, since drain checks can take seconds and leadership may have flipped meanwhile. - Use the segment-aware scalableTopics().deleteSegmentAsync admin call instead of a hand-built persistent:// URL — this is the same primitive ScalableTopicService uses internally and routes correctly to the segment's owning broker. - Rename the lambda parameter that shadowed the prunable() method. - Add docstring on pruneEligibleAsync explaining behaviour for STREAM / QUEUE / CHECKPOINT subscriptions and parent-vs-child pruning order: CHECKPOINT subs have no broker-side cursor so the backlog endpoint returns NotFound → false, keeping the segment pinned (safe default). --- .../scalable/ScalableTopicController.java | 135 ++++++++++++------ .../scalable/ScalableTopicControllerTest.java | 8 +- 2 files changed, 96 insertions(+), 47 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java index f6a7918a90e71..552ca321d8478 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java @@ -748,29 +748,103 @@ CompletableFuture runGcTickAsync() { /** * For each candidate sealed segment, check that every existing subscription has - * drained it (backlog == 0); prune the ones that pass. Failures on individual - * segments are logged and skipped — the next tick retries. + * drained it (backlog == 0); prune the ones that pass. The drain checks fan out + * concurrently, but the resulting layout mutation is coalesced into a single + * CAS write so multiple eligible segments don't compete on the same metadata znode. + * + *

Subscription-type behaviour. The drain check is the per-segment backlog + * admin endpoint — Pulsar's standard cursor-position view, which works the same way + * for STREAM (Exclusive) and QUEUE (Shared) subscriptions: a sealed segment with + * cursor at the end reports backlog 0. For CHECKPOINT subscriptions there is no + * broker-side cursor, the endpoint returns {@code NotFoundException}, and + * {@code isSegmentDrained} reports {@code false} — the segment is treated as + * "still in use" and never pruned while a CHECKPOINT subscription is registered. + * + *

Parent-vs-child ordering. Sealed segments form a DAG; pruning is allowed + * in any order because the active leaves always cover the full hash range, and the + * managed-ledger storage of each segment is independent. {@link SegmentLayout#pruneSegment} + * rewrites the parent/child edges, so consumers using the post-prune layout see the + * pruned segment as "no longer present" — equivalent to "drained" for parent-drain + * ordering. */ private CompletableFuture pruneEligibleAsync(List candidates) { return resources.listSubscriptionsAsync(topicName) .thenCompose(subs -> { - List> perSegment = new ArrayList<>(); + // Fan out drain checks; collect the survivors. + List> filtered = new ArrayList<>(); for (SegmentInfo seg : candidates) { - perSegment.add(prunable(seg, subs) - .thenCompose(prunable -> prunable - ? pruneSegmentAsync(seg) - : CompletableFuture.completedFuture(null)) + filtered.add(prunable(seg, subs) + .thenApply(ok -> ok ? seg : null) .exceptionally(ex -> { log.warn().attr("segmentId", seg.segmentId()) .exceptionMessage(ex) - .log("GC: failed to evaluate / prune segment;" + .log("GC: failed to evaluate prunability;" + " will retry on next tick"); return null; })); } - return CompletableFuture.allOf( - perSegment.toArray(CompletableFuture[]::new)); - }); + return CompletableFuture.allOf(filtered.toArray(CompletableFuture[]::new)) + .thenApply(__ -> { + List drained = new ArrayList<>(); + for (var f : filtered) { + SegmentInfo s = f.join(); + if (s != null) { + drained.add(s); + } + } + return drained; + }); + }) + .thenCompose(this::pruneAllAsync); + } + + /** + * Coalesce all drained-and-eligible segments into a single layout-mutation CAS, + * then fan out the per-segment backing-topic deletes. This is the path that + * actually mutates state. Re-validates leadership before the CAS — drain checks + * can take seconds, leadership may have flipped in the meantime, and we don't + * want a deposed leader writing layout updates. + */ + private CompletableFuture pruneAllAsync(List drained) { + if (drained.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + if (!isLeader() || closed) { + return CompletableFuture.completedFuture(null); + } + for (SegmentInfo s : drained) { + log.info().attr("segmentId", s.segmentId()) + .attr("sealedAtMs", s.sealedAtMs()) + .log("GC: pruning sealed segment past retention"); + } + return resources.updateScalableTopicAsync(topicName, md -> { + SegmentLayout latest = SegmentLayout.fromMetadata(md); + SegmentLayout updated = latest; + for (SegmentInfo s : drained) { + // Re-validate per segment: another writer (or a previous failed + // tick of this same loop) may have already pruned it. + if (updated.getAllSegments().containsKey(s.segmentId())) { + updated = updated.pruneSegment(s.segmentId()); + } + } + return updated == latest ? md : updated.toMetadata(md.getProperties()); + }).thenCompose(__ -> resources.getScalableTopicMetadataAsync(topicName, true)) + .thenCompose(optMd -> { + currentLayout = SegmentLayout.fromMetadata(optMd.orElseThrow()); + return notifySubscriptions(currentLayout); + }) + .thenCompose(__ -> { + CompletableFuture[] deletes = drained.stream() + .map(this::deleteSegmentBackingTopic) + .toArray(CompletableFuture[]::new); + return CompletableFuture.allOf(deletes); + }) + .thenAccept(__ -> { + for (SegmentInfo s : drained) { + log.info().attr("segmentId", s.segmentId()) + .log("GC: segment pruned + backing topic deleted"); + } + }); } private CompletableFuture prunable(SegmentInfo seg, List subs) { @@ -793,42 +867,17 @@ private CompletableFuture prunable(SegmentInfo seg, List subs) } /** - * Apply a single segment's prune: CAS the layout to drop it, then delete the - * backing managed-ledger topic. Layout-prune is the point of no return — once it - * lands, no consumer will ever subscribe to the segment again. Backing-topic - * delete failures are logged and retried on subsequent ticks. + * Delete the segment's backing storage via the {@code scalableTopics} admin + * endpoint, which understands the {@code segment://} naming scheme and routes + * to the segment's owning broker. Failures are best-effort: the controller + * has already pruned the segment from the layout (the point of no return), + * so a failed delete is just leaked storage that the next tick will retry. */ - private CompletableFuture pruneSegmentAsync(SegmentInfo seg) { - long segmentId = seg.segmentId(); - log.info().attr("segmentId", segmentId) - .attr("sealedAtMs", seg.sealedAtMs()) - .log("GC: pruning sealed segment past retention"); - - return resources.updateScalableTopicAsync(topicName, md -> { - SegmentLayout latest = SegmentLayout.fromMetadata(md); - // Re-validate against the latest layout: another writer may have already - // pruned this segment, or replaced it via a follow-up split / merge. If - // it's gone, leave the metadata untouched (no-op CAS). - if (!latest.getAllSegments().containsKey(segmentId)) { - return md; - } - SegmentLayout updated = latest.pruneSegment(segmentId); - return updated.toMetadata(md.getProperties()); - }).thenCompose(__ -> resources.getScalableTopicMetadataAsync(topicName, true)) - .thenCompose(optMd -> { - currentLayout = SegmentLayout.fromMetadata(optMd.orElseThrow()); - return notifySubscriptions(currentLayout); - }) - .thenCompose(__ -> deleteSegmentBackingTopic(seg)) - .thenAccept(__ -> log.info().attr("segmentId", segmentId) - .log("GC: segment pruned + backing topic deleted")); - } - private CompletableFuture deleteSegmentBackingTopic(SegmentInfo seg) { - String name = toSegmentUnderlyingPersistentName(seg); + String name = toSegmentPersistentName(seg); try { return brokerService.getPulsar().getAdminClient() - .topics().deleteAsync(name, /* force */ true) + .scalableTopics().deleteSegmentAsync(name, /* force */ true) .exceptionally(ex -> { Throwable cause = org.apache.pulsar.common.util.FutureUtil.unwrapCompletionException(ex); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java index 2416b27d97a8b..c00b192350458 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java @@ -493,8 +493,8 @@ public void testGcTickPrunesDrainedSealedSegmentPastRetention() throws Exception controller.runGcTickAsync().get(); assertFalse(controller.getLayout().get().getAllSegments().containsKey(0L), "tick past retention must prune the sealed segment"); - // Backing topic delete was issued. - verify(topics).deleteAsync(anyString(), anyBoolean()); + // Backing topic delete was issued via the segment-aware admin call. + verify(scalableTopics).deleteSegmentAsync(anyString(), anyBoolean()); } /** @@ -586,8 +586,8 @@ private void installGcMocks(int nsRetentionMinutes) { // immediately considered prunable on retention expiry. // (resources.listSubscriptionsAsync returns [] from the in-memory store by default.) - // Backing-topic delete succeeds. - when(topics.deleteAsync(anyString(), anyBoolean())) + // Backing-topic delete succeeds (segment-aware admin call). + when(scalableTopics.deleteSegmentAsync(anyString(), anyBoolean())) .thenReturn(CompletableFuture.completedFuture(null)); }