diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 2e76254dc97ab..bf75ff3209c30 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -3536,6 +3536,13 @@ public boolean isReplicationBacklogExist() { @Override public void checkGC() { + if (TopicName.get(topic).isSegment()) { + // Segment-backing topics are owned by ScalableTopicController; its GC tick + // decides when a sealed segment is drained + retention-expired and deletes + // the topic. Letting v4 inactive-topic-GC race against that would risk the + // broker quietly tearing down a segment the controller still considers live. + return; + } if (!isDeleteWhileInactive()) { // This topic is not included in GC return; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java index 4a7b15a1e152b..552ca321d8478 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java @@ -19,20 +19,30 @@ package org.apache.pulsar.broker.service.scalable; import io.github.merlimat.slog.Logger; +import java.time.Clock; import java.time.Duration; +import java.util.ArrayList; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import lombok.Getter; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.resources.ScalableTopicMetadata; import org.apache.pulsar.broker.resources.ScalableTopicResources; import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.TopicPoliciesService; import org.apache.pulsar.broker.service.TransportCnx; import org.apache.pulsar.common.api.proto.ScalableConsumerType; +import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.scalable.HashRange; import org.apache.pulsar.common.scalable.SegmentInfo; import org.apache.pulsar.common.scalable.SegmentTopicName; @@ -53,17 +63,27 @@ public class ScalableTopicController { private static final Logger LOG = Logger.get(ScalableTopicController.class); private final Logger log; + /** Default cadence for the sealed-segment GC tick on the leader. */ + static final Duration DEFAULT_GC_INTERVAL = Duration.ofSeconds(60); + @Getter private final TopicName topicName; private final ScalableTopicResources resources; private final BrokerService brokerService; private final LeaderElection leaderElection; + /** Wall-clock source used for sealed-segment retention math. Tests override. */ + private final Clock clock; + /** Cadence of the GC tick. Tests override to a small value. */ + private final Duration gcInterval; private volatile SegmentLayout currentLayout; /** Per-subscription consumer tracking. */ private final ConcurrentHashMap subscriptions = new ConcurrentHashMap<>(); + /** Sealed-segment GC scheduled task. Non-null only while this broker is leader. */ + private volatile ScheduledFuture gcTask; + @Getter private volatile LeaderElectionState leaderState = LeaderElectionState.NoLeader; @@ -73,9 +93,24 @@ public class ScalableTopicController { ScalableTopicResources resources, BrokerService brokerService, CoordinationService coordinationService) { + this(topicName, resources, brokerService, coordinationService, + Clock.systemUTC(), DEFAULT_GC_INTERVAL); + } + + /** + * Test constructor: overrides the wall-clock source and the GC tick cadence. + */ + ScalableTopicController(TopicName topicName, + ScalableTopicResources resources, + BrokerService brokerService, + CoordinationService coordinationService, + Clock clock, + Duration gcInterval) { this.topicName = topicName; this.resources = resources; this.brokerService = brokerService; + this.clock = clock; + this.gcInterval = gcInterval; this.log = LOG.with().attr("topic", topicName).build(); this.leaderElection = coordinationService.getLeaderElection( String.class, @@ -90,6 +125,12 @@ public class ScalableTopicController { */ private void onLeaderStateChange(LeaderElectionState state) { log.info().attr("state", state).log("Leader state change for scalable topic"); + if (state != LeaderElectionState.Leading) { + // Stepped down (or never was leader). Stop the GC tick so the deposed leader + // doesn't race the new one on layout writes / backing-topic deletes. The new + // leader's initialize() will reschedule. + cancelGcTask(); + } if (state == LeaderElectionState.NoLeader && !closed) { initialize().exceptionally(ex -> { log.warn().exceptionMessage(ex).log("Failed to re-elect after NoLeader"); @@ -119,12 +160,52 @@ public CompletableFuture initialize() { }) .thenCompose(__ -> { if (isLeader()) { + scheduleGcTask(); return restoreSessionsFromStore(); } return CompletableFuture.completedFuture(null); }); } + /** + * Schedule the periodic sealed-segment GC tick. Only fires on the controller leader; + * idempotent (re-entry just no-ops). Cancelled on close / leader-loss. + */ + private synchronized void scheduleGcTask() { + if (closed || gcTask != null) { + return; + } + gcTask = scheduler().scheduleAtFixedRate(this::runGcTickSafely, + gcInterval.toMillis(), gcInterval.toMillis(), TimeUnit.MILLISECONDS); + } + + private synchronized void cancelGcTask() { + if (gcTask != null) { + gcTask.cancel(false); + gcTask = null; + } + } + + private ScheduledExecutorService scheduler() { + return brokerService.getPulsar().getExecutor(); + } + + private void runGcTickSafely() { + if (!isLeader() || closed) { + return; + } + try { + runGcTickAsync().exceptionally(ex -> { + log.warn().exceptionMessage(ex).log("Scalable-topic GC tick failed"); + return null; + }); + } catch (Throwable t) { + // Defensive: scheduleAtFixedRate would suppress the next firing if a tick + // throws synchronously, so log and swallow here. + log.warn().exception(t).log("Scalable-topic GC tick threw"); + } + } + /** * Load persisted subscriptions and consumer registrations from the metadata store and * install them into per-subscription {@link SubscriptionCoordinator} instances. Called @@ -288,7 +369,7 @@ public CompletableFuture splitSegment(long segmentId) { // Single timestamp shared by the local preview and the CAS-retried metadata update, // so the children's createdAtMs and the parent's sealedAtMs always agree even if the // CAS retries due to concurrent writers. - final long nowMs = System.currentTimeMillis(); + final long nowMs = clock.millis(); // Compute the new layout locally to derive child segment info SegmentLayout newLayout = currentLayout.splitSegment(segmentId, nowMs); @@ -337,7 +418,7 @@ public CompletableFuture mergeSegments(long segmentId1, long segm // Single timestamp shared by the local preview and the CAS-retried metadata // update — see splitSegment for the rationale. - final long nowMs = System.currentTimeMillis(); + final long nowMs = clock.millis(); // Compute the new layout locally to derive merged segment info SegmentLayout newLayout = currentLayout.mergeSegments(segmentId1, segmentId2, nowMs); @@ -619,10 +700,274 @@ public CompletableFutureThe retention window is resolved from topic-policies on the parent + * {@code topic://...} → namespace policy → broker default, the same precedence + * Pulsar uses for regular topics. + * + *

Visible for tests; in production it's invoked by the scheduled task. + */ + CompletableFuture runGcTickAsync() { + if (!isLeader() || closed) { + return CompletableFuture.completedFuture(null); + } + final SegmentLayout layout = currentLayout; + if (layout == null) { + return CompletableFuture.completedFuture(null); + } + + // Candidates: sealed segments past their retention horizon. We resolve + // retention once per tick — cheap, and avoids per-segment policy lookups. + return resolveRetentionMillisAsync() + .thenCompose(retentionMs -> { + if (retentionMs == null) { + // Negative / unset → retain forever. No GC this tick. + return CompletableFuture.completedFuture(null); + } + long now = clock.millis(); + List candidates = new ArrayList<>(); + for (SegmentInfo seg : layout.getAllSegments().values()) { + if (seg.isSealed() && seg.sealedAtMs() > 0 + && (now - seg.sealedAtMs()) >= retentionMs) { + candidates.add(seg); + } + } + if (candidates.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + return pruneEligibleAsync(candidates); + }); + } + + /** + * For each candidate sealed segment, check that every existing subscription has + * drained it (backlog == 0); prune the ones that pass. The drain checks fan out + * concurrently, but the resulting layout mutation is coalesced into a single + * CAS write so multiple eligible segments don't compete on the same metadata znode. + * + *

Subscription-type behaviour. The drain check is the per-segment backlog + * admin endpoint — Pulsar's standard cursor-position view, which works the same way + * for STREAM (Exclusive) and QUEUE (Shared) subscriptions: a sealed segment with + * cursor at the end reports backlog 0. For CHECKPOINT subscriptions there is no + * broker-side cursor, the endpoint returns {@code NotFoundException}, and + * {@code isSegmentDrained} reports {@code false} — the segment is treated as + * "still in use" and never pruned while a CHECKPOINT subscription is registered. + * + *

Parent-vs-child ordering. Sealed segments form a DAG; pruning is allowed + * in any order because the active leaves always cover the full hash range, and the + * managed-ledger storage of each segment is independent. {@link SegmentLayout#pruneSegment} + * rewrites the parent/child edges, so consumers using the post-prune layout see the + * pruned segment as "no longer present" — equivalent to "drained" for parent-drain + * ordering. + */ + private CompletableFuture pruneEligibleAsync(List candidates) { + return resources.listSubscriptionsAsync(topicName) + .thenCompose(subs -> { + // Fan out drain checks; collect the survivors. + List> filtered = new ArrayList<>(); + for (SegmentInfo seg : candidates) { + filtered.add(prunable(seg, subs) + .thenApply(ok -> ok ? seg : null) + .exceptionally(ex -> { + log.warn().attr("segmentId", seg.segmentId()) + .exceptionMessage(ex) + .log("GC: failed to evaluate prunability;" + + " will retry on next tick"); + return null; + })); + } + return CompletableFuture.allOf(filtered.toArray(CompletableFuture[]::new)) + .thenApply(__ -> { + List drained = new ArrayList<>(); + for (var f : filtered) { + SegmentInfo s = f.join(); + if (s != null) { + drained.add(s); + } + } + return drained; + }); + }) + .thenCompose(this::pruneAllAsync); + } + + /** + * Coalesce all drained-and-eligible segments into a single layout-mutation CAS, + * then fan out the per-segment backing-topic deletes. This is the path that + * actually mutates state. Re-validates leadership before the CAS — drain checks + * can take seconds, leadership may have flipped in the meantime, and we don't + * want a deposed leader writing layout updates. + */ + private CompletableFuture pruneAllAsync(List drained) { + if (drained.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + if (!isLeader() || closed) { + return CompletableFuture.completedFuture(null); + } + for (SegmentInfo s : drained) { + log.info().attr("segmentId", s.segmentId()) + .attr("sealedAtMs", s.sealedAtMs()) + .log("GC: pruning sealed segment past retention"); + } + return resources.updateScalableTopicAsync(topicName, md -> { + SegmentLayout latest = SegmentLayout.fromMetadata(md); + SegmentLayout updated = latest; + for (SegmentInfo s : drained) { + // Re-validate per segment: another writer (or a previous failed + // tick of this same loop) may have already pruned it. + if (updated.getAllSegments().containsKey(s.segmentId())) { + updated = updated.pruneSegment(s.segmentId()); + } + } + return updated == latest ? md : updated.toMetadata(md.getProperties()); + }).thenCompose(__ -> resources.getScalableTopicMetadataAsync(topicName, true)) + .thenCompose(optMd -> { + currentLayout = SegmentLayout.fromMetadata(optMd.orElseThrow()); + return notifySubscriptions(currentLayout); + }) + .thenCompose(__ -> { + CompletableFuture[] deletes = drained.stream() + .map(this::deleteSegmentBackingTopic) + .toArray(CompletableFuture[]::new); + return CompletableFuture.allOf(deletes); + }) + .thenAccept(__ -> { + for (SegmentInfo s : drained) { + log.info().attr("segmentId", s.segmentId()) + .log("GC: segment pruned + backing topic deleted"); + } + }); + } + + private CompletableFuture prunable(SegmentInfo seg, List subs) { + if (subs.isEmpty()) { + // No subscribers ever attached / all unsubscribed → nothing left to drain. + return CompletableFuture.completedFuture(true); + } + CompletableFuture[] checks = subs.stream() + .map(sub -> isSegmentDrained(seg, sub)) + .toArray(CompletableFuture[]::new); + return CompletableFuture.allOf(checks) + .thenApply(__ -> { + for (CompletableFuture c : checks) { + if (!c.join()) { + return false; + } + } + return true; + }); + } + + /** + * Delete the segment's backing storage via the {@code scalableTopics} admin + * endpoint, which understands the {@code segment://} naming scheme and routes + * to the segment's owning broker. Failures are best-effort: the controller + * has already pruned the segment from the layout (the point of no return), + * so a failed delete is just leaked storage that the next tick will retry. + */ + private CompletableFuture deleteSegmentBackingTopic(SegmentInfo seg) { + String name = toSegmentPersistentName(seg); + try { + return brokerService.getPulsar().getAdminClient() + .scalableTopics().deleteSegmentAsync(name, /* force */ true) + .exceptionally(ex -> { + Throwable cause = + org.apache.pulsar.common.util.FutureUtil.unwrapCompletionException(ex); + if (cause instanceof org.apache.pulsar.client.admin.PulsarAdminException + .NotFoundException) { + // Already gone — fine. + return null; + } + log.warn().attr("segment", name).exceptionMessage(cause) + .log("GC: failed to delete backing segment topic;" + + " will retry on next tick"); + return null; + }); + } catch (PulsarServerException e) { + return CompletableFuture.failedFuture(e); + } + } + + /** + * Resolve the effective retention-time-in-millis for this scalable topic by + * layering: topic-policy on the parent {@code topic://...} → namespace policy → + * broker config default. Returns {@code null} if retention is unset or negative + * (= keep forever) — the GC tick treats that as "skip". + */ + private CompletableFuture resolveRetentionMillisAsync() { + TopicPoliciesService topicPoliciesService = + brokerService.getPulsar().getTopicPoliciesService(); + // Topic-level (override) layer. + return topicPoliciesService.getTopicPoliciesAsync(topicName, + TopicPoliciesService.GetType.LOCAL_ONLY) + .thenCompose(localOpt -> { + Optional rp = localOpt + .map(TopicPolicies::getRetentionPolicies) + .filter(java.util.Objects::nonNull); + if (rp.isPresent()) { + return CompletableFuture.completedFuture(toRetentionMillis(rp.get())); + } + // Namespace layer. + NamespaceName ns = topicName.getNamespaceObject(); + return brokerService.getPulsar().getPulsarResources() + .getNamespaceResources() + .getPoliciesAsync(ns) + .thenApply(nsOpt -> { + RetentionPolicies nsRp = nsOpt + .map(p -> p.retention_policies) + .orElse(null); + if (nsRp != null) { + return toRetentionMillis(nsRp); + } + return defaultRetentionMillisFromBrokerConfig(); + }); + }); + } + + private static Long toRetentionMillis(RetentionPolicies rp) { + if (rp.getRetentionTimeInMinutes() < 0) { + return null; // keep forever + } + return TimeUnit.MINUTES.toMillis(rp.getRetentionTimeInMinutes()); + } + + private Long defaultRetentionMillisFromBrokerConfig() { + var conf = brokerService.getPulsar().getConfig(); + if (conf == null) { + return null; + } + int min = conf.getDefaultRetentionTimeInMinutes(); + return min < 0 ? null : TimeUnit.MINUTES.toMillis(min); + } + + /** Test hook: count of sealed segments currently in the layout. */ + int sealedSegmentCount() { + SegmentLayout layout = currentLayout; + if (layout == null) { + return 0; + } + int n = 0; + for (SegmentInfo s : layout.getAllSegments().values()) { + if (s.isSealed()) { + n++; + } + } + return n; + } + // --- Lifecycle --- public CompletableFuture close() { closed = true; + cancelGcTask(); // Stop each coordinator's drain poller before clearing — otherwise the scheduler // task keeps running after the controller goes away. subscriptions.values().forEach(SubscriptionCoordinator::close); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java index 1ebd3f13ed085..c00b192350458 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java @@ -148,6 +148,13 @@ private ScalableTopicController newController(TopicName tn) { return new ScalableTopicController(tn, resources, brokerService, coordinationService); } + private ScalableTopicController newControllerWithClock(TopicName tn, + java.time.Clock clock, + java.time.Duration gcInterval) { + return new ScalableTopicController(tn, resources, brokerService, coordinationService, + clock, gcInterval); + } + // --- initialize() --- @Test @@ -449,6 +456,141 @@ public void testCloseReleasesLeadership() throws Exception { } } + // --- Sealed-segment GC --- + + /** + * After a split, the parent is sealed; with no live subscriptions and a small + * configured retention the GC tick should prune the parent from the layout and + * delete its backing topic. + */ + @Test + public void testGcTickPrunesDrainedSealedSegmentPastRetention() throws Exception { + // Inject GC-related mocks (topic-policies + namespace policies + delete). + installGcMocks(/* nsRetentionMinutes */ 1); + + // Use a controllable clock so we can fast-forward past the retention window. + long startMs = 1_700_000_000_000L; + AdjustableClock clock = new AdjustableClock(startMs); + if (controller != null) { + controller.close().join(); + } + controller = newControllerWithClock(topicName, clock, + java.time.Duration.ofHours(1)); // GC interval irrelevant — we drive ticks manually + controller.initialize().get(); + + int sealedBefore = controller.sealedSegmentCount(); + // Split segment 0 → seg 0 sealed at startMs, children created at startMs. + controller.splitSegment(0).get(); + assertEquals(controller.sealedSegmentCount(), sealedBefore + 1); + + // Tick at the seal time — retention not yet elapsed; nothing pruned. + controller.runGcTickAsync().get(); + assertTrue(controller.getLayout().get().getAllSegments().containsKey(0L), + "tick within retention window must not prune"); + + // Fast-forward past 1 minute; tick should now prune segment 0. + clock.set(startMs + java.util.concurrent.TimeUnit.MINUTES.toMillis(1) + 1_000L); + controller.runGcTickAsync().get(); + assertFalse(controller.getLayout().get().getAllSegments().containsKey(0L), + "tick past retention must prune the sealed segment"); + // Backing topic delete was issued via the segment-aware admin call. + verify(scalableTopics).deleteSegmentAsync(anyString(), anyBoolean()); + } + + /** + * If retention is set to "keep forever" (negative value), the GC tick is a no-op + * even for sealed + drained segments. + */ + @Test + public void testGcTickRespectsKeepForeverRetention() throws Exception { + installGcMocks(/* nsRetentionMinutes */ -1); + + long now = 1_700_000_000_000L; + if (controller != null) { + controller.close().join(); + } + java.time.Clock fixed = java.time.Clock.fixed( + java.time.Instant.ofEpochMilli(now + 365L * 86_400_000L), + java.time.ZoneOffset.UTC); + controller = newControllerWithClock(topicName, fixed, java.time.Duration.ofHours(1)); + controller.initialize().get(); + controller.splitSegment(0).get(); + + controller.runGcTickAsync().get(); + assertTrue(controller.getLayout().get().getAllSegments().containsKey(0L), + "negative retention must keep sealed segments forever"); + } + + /** Settable {@link java.time.Clock} for the GC tick tests. */ + private static final class AdjustableClock extends java.time.Clock { + private volatile long nowMs; + + AdjustableClock(long initialMs) { + this.nowMs = initialMs; + } + + void set(long nowMs) { + this.nowMs = nowMs; + } + + @Override + public java.time.ZoneId getZone() { + return java.time.ZoneOffset.UTC; + } + + @Override + public java.time.Clock withZone(java.time.ZoneId zone) { + return this; // tests don't care about zone + } + + @Override + public java.time.Instant instant() { + return java.time.Instant.ofEpochMilli(nowMs); + } + + @Override + public long millis() { + return nowMs; + } + } + + /** + * Wire up the mocks the GC tick needs: empty topic-policies (so retention falls + * through to namespace), a namespace policy with the requested retention, a + * "drained" segment-backlog response (cursor at end), and a successful topic + * delete admin call. + */ + @SuppressWarnings("unchecked") + private void installGcMocks(int nsRetentionMinutes) { + // Topic-policies service: no policies set on the topic. + var tps = mock(org.apache.pulsar.broker.service.TopicPoliciesService.class); + when(pulsar.getTopicPoliciesService()).thenReturn(tps); + when(tps.getTopicPoliciesAsync(any(), + any(org.apache.pulsar.broker.service.TopicPoliciesService.GetType.class))) + .thenReturn(CompletableFuture.completedFuture(Optional.empty())); + + // Namespace policy with the requested retention. Wired through PulsarResources. + var pulsarResources = mock(org.apache.pulsar.broker.resources.PulsarResources.class); + var namespaceResources = mock(org.apache.pulsar.broker.resources.NamespaceResources.class); + when(pulsar.getPulsarResources()).thenReturn(pulsarResources); + when(pulsarResources.getNamespaceResources()).thenReturn(namespaceResources); + org.apache.pulsar.common.policies.data.Policies nsPolicies = + new org.apache.pulsar.common.policies.data.Policies(); + nsPolicies.retention_policies = + new org.apache.pulsar.common.policies.data.RetentionPolicies( + nsRetentionMinutes, /* sizeMB */ -1); + when(namespaceResources.getPoliciesAsync(any())) + .thenReturn(CompletableFuture.completedFuture(Optional.of(nsPolicies))); + + // No active subscriptions on the parent → nothing to drain → segment is + // immediately considered prunable on retention expiry. + // (resources.listSubscriptionsAsync returns [] from the in-memory store by default.) + + // Backing-topic delete succeeds (segment-aware admin call). + when(scalableTopics.deleteSegmentAsync(anyString(), anyBoolean())) + .thenReturn(CompletableFuture.completedFuture(null)); + } + // --- ConsumerRegistration record sanity --- @Test