From db49aaf386c1fb0c1cf9a1c8d5a21b9a0060e565 Mon Sep 17 00:00:00 2001 From: Branimir Lambov Date: Mon, 20 Apr 2026 17:43:51 +0300 Subject: [PATCH 1/3] HCD-200: Incremental repair is unable to lock sstables under load (#2266) [HCD-200](https://datastax.jira.com/browse/HCD-200) Changes the tracking of compaction tasks to include scheduled in addition to active tasks by introducing a new queue where `AbstractCompactionTask`s register themselves. When the tasks execute, `CompactionTask`s move from the scheduled tasks to the active operations list. Other types of tasks are expected to finish quickly and are removed from the scheduled tasks when they complete. `runWithCompactionsDisabled` cancels tasks in both lists and should now be able to reliably cancel all compactions in flight. [HCD-200]: https://datastax.jira.com/browse/HCD-200 --- .../cassandra/db/ColumnFamilyStore.java | 20 +++++++++++--- .../db/compaction/AbstractCompactionTask.java | 14 +++++++--- .../db/compaction/ActiveOperations.java | 26 +++++++++++++++++++ .../db/compaction/CompactionManager.java | 4 +-- .../RandomizedCancelCompactionsTest.java | 8 ++++-- 5 files changed, 61 insertions(+), 11 deletions(-) diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 877f9736187e..b7f83746c615 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -88,6 +88,7 @@ import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.commitlog.CommitLogPosition; import org.apache.cassandra.db.commitlog.IntervalSet; +import org.apache.cassandra.db.compaction.AbstractCompactionTask; import org.apache.cassandra.db.compaction.AbstractTableOperation; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.compaction.CompactionRealm; @@ -3146,7 +3147,7 @@ public V runWithCompactionsDisabled(Callable callable, OperationType oper */ public V runWithCompactionsDisabled(Callable callable, Predicate sstablesPredicate, - OperationType operationType, + OperationType operationType, boolean interruptValidation, boolean interruptViews, boolean interruptIndexes, @@ -3165,12 +3166,13 @@ public V runWithCompactionsDisabled(Callable callable, try (CompactionManager.CompactionPauser pause = CompactionManager.instance.pauseGlobalCompaction(); CompactionManager.CompactionPauser pausedStrategies = pauseCompactionStrategies(toInterruptFor)) { - List uninterruptibleTasks = CompactionManager.instance.getCompactionsMatching(toInterruptForMetadata, + List uninterruptibleOps = CompactionManager.instance.getCompactionsMatching(toInterruptForMetadata, + sstablesPredicate, (progress) -> progress.operationType().priority <= operationType.priority); - if (!uninterruptibleTasks.isEmpty()) + if (!uninterruptibleOps.isEmpty()) { logger.info("Unable to cancel in-progress compactions, since they're running with higher or same priority: {}. You can abort these operations using `nodetool stop`.", - uninterruptibleTasks.stream().map((compaction) -> String.format("%s@%s (%s)", + uninterruptibleOps.stream().map((compaction) -> String.format("%s@%s (%s)", compaction.getProgress().operationType(), compaction.getProgress().metadata().name, compaction.getProgress().operationId())) @@ -3178,9 +3180,19 @@ public V runWithCompactionsDisabled(Callable callable, return null; } + Collection uninterruptibleTasks = CompactionManager.instance.active.getScheduledTasksMatching(toInterruptFor, + sstablesPredicate, + task -> task.getCompactionType().priority <= operationType.priority); + if (!uninterruptibleTasks.isEmpty()) + { + logger.info("Unable to cancel {} scheduled compactions with higher or same priority. You can abort these operations using `nodetool stop`.", uninterruptibleTasks.size()); + return null; + } + // Cancel scheduled compactions matching predicate. This must be done first because tasks progress from // scheduled to active. CompactionManager.instance.active.cancelScheduledTasksAffecting(toInterruptFor, sstablesPredicate); + // interrupt in-progress compactions CompactionManager.instance.interruptCompactionForCFs(toInterruptFor, sstablesPredicate, interruptValidation, trigger); diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java index b7b02f783621..6dbb85be73b9 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java @@ -230,6 +230,17 @@ public boolean switchToActive() * Reject/cancel the task if it affects any sstable that satisfies the given predicate. */ public boolean cancelIfAffects(CompactionRealm realm, Predicate sstablePredicate) + { + boolean affects = affectsAny(realm, sstablePredicate); + if (affects) + Throwables.maybeFail(rejected(null)); + return affects; + } + + /** + * Returns true iff the task affects any sstable that satisfies the given predicate. + */ + public boolean affectsAny(CompactionRealm realm, Predicate sstablePredicate) { if (realm != this.realm) return false; @@ -237,10 +248,7 @@ public boolean cancelIfAffects(CompactionRealm realm, Predicate s for (SSTableReader r : transaction.originals()) { if (sstablePredicate.test(r)) - { - Throwables.maybeFail(rejected(null)); return true; - } } return false; } diff --git a/src/java/org/apache/cassandra/db/compaction/ActiveOperations.java b/src/java/org/apache/cassandra/db/compaction/ActiveOperations.java index 5fa92a4e7628..b177bae256f4 100644 --- a/src/java/org/apache/cassandra/db/compaction/ActiveOperations.java +++ b/src/java/org/apache/cassandra/db/compaction/ActiveOperations.java @@ -201,6 +201,32 @@ public void removeTaskFromScheduled(AbstractCompactionTask task) scheduledTasks.remove(task); } + public Collection getScheduledTasksMatching(Iterable cfss, Predicate predicate, Predicate taskPredicate) + { + List tasksCopy; + synchronized (scheduledTasks) + { + tasksCopy = new ArrayList<>(scheduledTasks); + } + + List matching = new ArrayList<>(tasksCopy.size()); + for (AbstractCompactionTask task : tasksCopy) + { + if (taskPredicate.test(task)) + { + for (ColumnFamilyStore cfs : cfss) + { + if (task.affectsAny(cfs, predicate)) + { + matching.add(task); + break; + } + } + } + } + return matching; + } + public void cancelScheduledTasksAffecting(Iterable cfss, Predicate predicate) { Iterable tasksCopy; diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 7a3920913e16..164198754d74 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -2516,7 +2516,7 @@ public void setMaxConcurrentAutoUpgradeTasks(int value) } } - public List getCompactionsMatching(Iterable columnFamilies, Predicate predicate) + public List getCompactionsMatching(Iterable columnFamilies, Predicate sstablePredicate, Predicate progressPredicate) { Preconditions.checkArgument(columnFamilies != null, "Attempted to getCompactionsMatching in CompactionManager with no columnFamilies specified."); @@ -2527,7 +2527,7 @@ public List getCompactionsMatching(Iterable colum TableOperation.Progress progress = holder.getProgress(); if (progress.metadata() == null || Iterables.contains(columnFamilies, progress.metadata())) { - if (predicate.test(progress)) + if (progressPredicate.test(progress) && holder.shouldStop(sstablePredicate)) matched.add(holder); } } diff --git a/test/unit/org/apache/cassandra/db/compaction/RandomizedCancelCompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/RandomizedCancelCompactionsTest.java index d17a11f648a8..17a8cf122c04 100644 --- a/test/unit/org/apache/cassandra/db/compaction/RandomizedCancelCompactionsTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/RandomizedCancelCompactionsTest.java @@ -202,7 +202,7 @@ public void testRandomizedCancellation() throws Exception return true; }, predicate, - OperationType.P0, + OperationType.ANTICOMPACTION, false, false, false, @@ -415,7 +415,11 @@ public void runMayThrow() throws InterruptedException scanners = sstables.stream().map(SSTableReader::getScanner).collect(Collectors.toList()); controller = new CompactionController(cfs, sstables, Integer.MIN_VALUE); - ci = new CompactionIterator(transaction.opType(), scanners, controller, FBUtilities.nowInSeconds(), org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID()); + ci = new CompactionIterator(transaction.opType(), + scanners, + controller, + FBUtilities.nowInSeconds(), + org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID()); TableOperation op = ci.getOperation(); closeable = opObserver.onOperationStart(op); switchToActive(); From ecd4cf913cbe7782303ae615789c7a932f066684 Mon Sep 17 00:00:00 2001 From: blambov Date: Wed, 13 May 2026 12:17:57 +0300 Subject: [PATCH 2/3] Add a low-priority interrupt test --- .../cassandra/db/ColumnFamilyStore.java | 5 +++ .../RandomizedCancelCompactionsTest.java | 35 ++++++++++++++----- 2 files changed, 32 insertions(+), 8 deletions(-) diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index b7f83746c615..12effee31c9d 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -3189,6 +3189,11 @@ public V runWithCompactionsDisabled(Callable callable, return null; } + // We have checked that there are no operations with overriding priority and can now stop all the tasks + // we find satisfying the sstables predicate. If new higher-priority tasks happen to appear in-between, + // we will still stop them; any task that appears at this point is in violation of the compaction pause + // we are operating under and is okay to cancel. + // Cancel scheduled compactions matching predicate. This must be done first because tasks progress from // scheduled to active. CompactionManager.instance.active.cancelScheduledTasksAffecting(toInterruptFor, sstablesPredicate); diff --git a/test/unit/org/apache/cassandra/db/compaction/RandomizedCancelCompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/RandomizedCancelCompactionsTest.java index 17a8cf122c04..26af1f33c7cd 100644 --- a/test/unit/org/apache/cassandra/db/compaction/RandomizedCancelCompactionsTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/RandomizedCancelCompactionsTest.java @@ -74,7 +74,6 @@ public static void setUpClass() private static final Logger logger = LoggerFactory.getLogger(RandomizedCancelCompactionsTest.class); - private static final int TEST_DURATION_SECONDS = 40; private static final int MAX_CONCURRENT_COMPACTIONS = 40; private static final int EXECUTOR_THREADS = 4; // Limited threads to keep tasks in queue private static final int SSTABLE_COUNT = 500; @@ -83,7 +82,19 @@ public static void setUpClass() private static final double QUICK_EXIT_CHANCE = 0.05; @Test - public void testRandomizedCancellation() throws Exception + public void testRandomizedCancellationAccepted() throws Exception + { + testRandomizedCancellation(OperationType.ANTICOMPACTION, 40); + } + + @Test + public void testRandomizedCancellationReject() throws Exception + { + testRandomizedCancellation(OperationType.STREAM, 10); + } + + + public void testRandomizedCancellation(OperationType cancellationType, int durationInSeconds) throws Exception { ColumnFamilyStore cfs = MockSchema.newCFS(); List sstables = createSSTables(cfs, SSTABLE_COUNT, 0); @@ -107,7 +118,7 @@ public void testRandomizedCancellation() throws Exception AtomicInteger cancellationSuccess = new AtomicInteger(0); long startTime = System.currentTimeMillis(); - long endTime = startTime + TimeUnit.SECONDS.toMillis(TEST_DURATION_SECONDS); + long endTime = startTime + TimeUnit.SECONDS.toMillis(durationInSeconds); try { @@ -142,6 +153,7 @@ public void testRandomizedCancellation() throws Exception taskId, random.nextDouble() < QUICK_EXIT_CHANCE ? (random.nextBoolean() ? 0 : -1) : random.nextInt(MAX_COMPACTION_SLEEP_MS - MIN_COMPACTION_SLEEP_MS) + MIN_COMPACTION_SLEEP_MS, + endTime, tasksCompleted, tasksCancelled, tasksCancelledBeforeStart, @@ -202,7 +214,7 @@ public void testRandomizedCancellation() throws Exception return true; }, predicate, - OperationType.ANTICOMPACTION, + cancellationType, false, false, false, @@ -223,8 +235,8 @@ public void testRandomizedCancellation() throws Exception }); // Wait for test duration - taskCreator.get(TEST_DURATION_SECONDS + 5, TimeUnit.SECONDS); - taskCanceller.get(TEST_DURATION_SECONDS + 5, TimeUnit.SECONDS); + taskCreator.get(durationInSeconds + 5, TimeUnit.SECONDS); + taskCanceller.get(durationInSeconds + 5, TimeUnit.SECONDS); // Stop test testRunning.set(false); @@ -296,7 +308,10 @@ public void testRandomizedCancellation() throws Exception tasksCancelledBeforeStart.get() + tasksCancelledAfterStart.get(), tasksCancelled.get()); - assertEquals("Cancellations should all succeed", cancellationAttempts.get(), cancellationSuccess.get()); + if (cancellationType.priority <= OperationType.COMPACTION.priority) + assertEquals("Cancellations should all succeed", cancellationAttempts.get(), cancellationSuccess.get()); + else + assertEquals("No tasks should be cancelled", 0, tasksCancelledBeforeStart.get() + tasksCancelledAfterStart.get()); } finally { @@ -342,6 +357,7 @@ private static class RandomizedCompactionTask extends AbstractCompactionTask private final Set sstables; private final int taskId; private final int sleepTimeMs; + private final long testEndTime; private final AtomicInteger completedCounter; private final AtomicInteger cancelledCounter; private final AtomicInteger cancelledBeforeStartCounter; @@ -363,6 +379,7 @@ public RandomizedCompactionTask(ColumnFamilyStore cfs, Set sstables, int taskId, int sleepTimeMs, + long testEndTime, AtomicInteger completedCounter, AtomicInteger cancelledCounter, AtomicInteger cancelledBeforeStartCounter, @@ -373,6 +390,7 @@ public RandomizedCompactionTask(ColumnFamilyStore cfs, this.sstables = sstables; this.taskId = taskId; this.sleepTimeMs = sleepTimeMs; + this.testEndTime = testEndTime; this.completedCounter = completedCounter; this.cancelledCounter = cancelledCounter; this.cancelledBeforeStartCounter = cancelledBeforeStartCounter; @@ -436,7 +454,8 @@ public void runMayThrow() throws InterruptedException logger.debug("Task {} stop requested after {}ms", taskId, slept); break; } - Thread.sleep(Math.min(100, sleepTimeMs - slept)); + if (System.currentTimeMillis() < testEndTime) // finish quickly if the test run time is done + Thread.sleep(Math.min(100, sleepTimeMs - slept)); slept += 100; } From 31cd6216afccfa22de7203a9c796ef7caf987027 Mon Sep 17 00:00:00 2001 From: blambov Date: Wed, 13 May 2026 15:10:02 +0300 Subject: [PATCH 3/3] Reject with an exception to notify observers task wasn't successful --- .../org/apache/cassandra/db/ColumnFamilyStore.java | 2 +- .../db/compaction/AbstractCompactionTask.java | 10 ++++++++-- .../cassandra/db/compaction/ActiveOperations.java | 4 ++-- .../db/compaction/CompositeCompactionTask.java | 2 +- .../cassandra/db/compaction/CancelCompactionsTest.java | 2 +- .../cassandra/db/compaction/CompactionTaskTest.java | 2 +- 6 files changed, 14 insertions(+), 8 deletions(-) diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 12effee31c9d..208e6cc38228 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -3196,7 +3196,7 @@ public V runWithCompactionsDisabled(Callable callable, // Cancel scheduled compactions matching predicate. This must be done first because tasks progress from // scheduled to active. - CompactionManager.instance.active.cancelScheduledTasksAffecting(toInterruptFor, sstablesPredicate); + CompactionManager.instance.active.cancelScheduledTasksAffecting(toInterruptFor, sstablesPredicate, trigger); // interrupt in-progress compactions CompactionManager.instance.interruptCompactionForCFs(toInterruptFor, sstablesPredicate, interruptValidation, trigger); diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java index 6dbb85be73b9..70c7c5ad64a3 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java @@ -229,11 +229,17 @@ public boolean switchToActive() /** * Reject/cancel the task if it affects any sstable that satisfies the given predicate. */ - public boolean cancelIfAffects(CompactionRealm realm, Predicate sstablePredicate) + public boolean cancelIfAffects(CompactionRealm realm, Predicate sstablePredicate, TableOperation.StopTrigger trigger) { boolean affects = affectsAny(realm, sstablePredicate); if (affects) - Throwables.maybeFail(rejected(null)); + { + // Reject with an exception to notify observers task wasn't successful. + TimeUUID id = getTransaction().opId(); + Throwable err = rejected(new CompactionInterruptedException(id, trigger)); + if (err != null && !(err instanceof CompactionInterruptedException)) + logger.warn("Failed to reject task with id={}", id, err); + } return affects; } diff --git a/src/java/org/apache/cassandra/db/compaction/ActiveOperations.java b/src/java/org/apache/cassandra/db/compaction/ActiveOperations.java index b177bae256f4..fbc7783185c4 100644 --- a/src/java/org/apache/cassandra/db/compaction/ActiveOperations.java +++ b/src/java/org/apache/cassandra/db/compaction/ActiveOperations.java @@ -227,7 +227,7 @@ public Collection getScheduledTasksMatching(Iterable cfss, Predicate predicate) + public void cancelScheduledTasksAffecting(Iterable cfss, Predicate predicate, TableOperation.StopTrigger trigger) { Iterable tasksCopy; synchronized (scheduledTasks) @@ -237,7 +237,7 @@ public void cancelScheduledTasksAffecting(Iterable cfss, Pred for (AbstractCompactionTask task : tasksCopy) for (ColumnFamilyStore cfs : cfss) - task.cancelIfAffects(cfs, predicate); + task.cancelIfAffects(cfs, predicate, trigger); } @VisibleForTesting diff --git a/src/java/org/apache/cassandra/db/compaction/CompositeCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompositeCompactionTask.java index 0ef137f291ea..27620fa7128b 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompositeCompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompositeCompactionTask.java @@ -79,7 +79,7 @@ public Throwable rejected(Throwable t) } @Override - public boolean cancelIfAffects(CompactionRealm realm, Predicate sstablePredicate) + public boolean cancelIfAffects(CompactionRealm realm, Predicate sstablePredicate, TableOperation.StopTrigger trigger) { // Leave cancellation to the individual tasks. return false; diff --git a/test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java index 197c2ba5a5da..df2af9858a2f 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java @@ -598,7 +598,7 @@ public void testStandardCompactionTaskCancellation() throws Throwable Thread t = new Thread(() -> { Uninterruptibles.awaitUninterruptibly(waitForBeginCompaction); getCurrentColumnFamilyStore().getCompactionStrategyContainer().pause(); - CompactionManager.instance.active.cancelScheduledTasksAffecting(cfss, Predicates.alwaysTrue()); + CompactionManager.instance.active.cancelScheduledTasksAffecting(cfss, Predicates.alwaysTrue(), UNIT_TESTS); CompactionManager.instance.interruptCompactionFor(Iterables.transform(cfss, ColumnFamilyStore::metadata), Predicates.alwaysTrue(), false, UNIT_TESTS); waitForStart.countDown(); diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java index 46f566440333..1759008be6c6 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java @@ -182,7 +182,7 @@ public void cancelledOnStart() } }; Assert.assertNotNull(task); - task.cancelIfAffects(cfs, Predicates.alwaysTrue()); + task.cancelIfAffects(cfs, Predicates.alwaysTrue(), TableOperation.StopTrigger.UNIT_TESTS); try { task.execute(CompactionManager.instance.active);