Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 22 additions & 5 deletions src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.db.commitlog.IntervalSet;
import org.apache.cassandra.db.compaction.AbstractCompactionTask;
import org.apache.cassandra.db.compaction.AbstractTableOperation;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.CompactionRealm;
Expand Down Expand Up @@ -3146,7 +3147,7 @@ public <V> V runWithCompactionsDisabled(Callable<V> callable, OperationType oper
*/
public <V> V runWithCompactionsDisabled(Callable<V> callable,
Predicate<SSTableReader> sstablesPredicate,
OperationType operationType,
OperationType operationType,
boolean interruptValidation,
boolean interruptViews,
boolean interruptIndexes,
Expand All @@ -3165,22 +3166,38 @@ public <V> V runWithCompactionsDisabled(Callable<V> callable,
try (CompactionManager.CompactionPauser pause = CompactionManager.instance.pauseGlobalCompaction();
CompactionManager.CompactionPauser pausedStrategies = pauseCompactionStrategies(toInterruptFor))
{
List<TableOperation> uninterruptibleTasks = CompactionManager.instance.getCompactionsMatching(toInterruptForMetadata,
List<TableOperation> uninterruptibleOps = CompactionManager.instance.getCompactionsMatching(toInterruptForMetadata,
sstablesPredicate,
(progress) -> progress.operationType().priority <= operationType.priority);
if (!uninterruptibleTasks.isEmpty())
if (!uninterruptibleOps.isEmpty())
{
logger.info("Unable to cancel in-progress compactions, since they're running with higher or same priority: {}. You can abort these operations using `nodetool stop`.",
uninterruptibleTasks.stream().map((compaction) -> String.format("%s@%s (%s)",
uninterruptibleOps.stream().map((compaction) -> String.format("%s@%s (%s)",
compaction.getProgress().operationType(),
compaction.getProgress().metadata().name,
compaction.getProgress().operationId()))
.collect(Collectors.joining(",")));
return null;
}

Collection<AbstractCompactionTask> uninterruptibleTasks = CompactionManager.instance.active.getScheduledTasksMatching(toInterruptFor,
sstablesPredicate,
task -> task.getCompactionType().priority <= operationType.priority);
if (!uninterruptibleTasks.isEmpty())
{
logger.info("Unable to cancel {} scheduled compactions with higher or same priority. You can abort these operations using `nodetool stop`.", uninterruptibleTasks.size());
return null;
}

// We have checked that there are no operations with overriding priority and can now stop all the tasks
// we find satisfying the sstables predicate. If new higher-priority tasks happen to appear in-between,
// we will still stop them; any task that appears at this point is in violation of the compaction pause
// we are operating under and is okay to cancel.

// Cancel scheduled compactions matching predicate. This must be done first because tasks progress from
// scheduled to active.
CompactionManager.instance.active.cancelScheduledTasksAffecting(toInterruptFor, sstablesPredicate);
CompactionManager.instance.active.cancelScheduledTasksAffecting(toInterruptFor, sstablesPredicate, trigger);

// interrupt in-progress compactions
CompactionManager.instance.interruptCompactionForCFs(toInterruptFor, sstablesPredicate, interruptValidation, trigger);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,18 +229,32 @@ public boolean switchToActive()
/**
* Reject/cancel the task if it affects any sstable that satisfies the given predicate.
*/
public boolean cancelIfAffects(CompactionRealm realm, Predicate<SSTableReader> sstablePredicate)
public boolean cancelIfAffects(CompactionRealm realm, Predicate<SSTableReader> sstablePredicate, TableOperation.StopTrigger trigger)
{
boolean affects = affectsAny(realm, sstablePredicate);
if (affects)
{
// Reject with an exception to notify observers task wasn't successful.
TimeUUID id = getTransaction().opId();
Throwable err = rejected(new CompactionInterruptedException(id, trigger));
if (err != null && !(err instanceof CompactionInterruptedException))
logger.warn("Failed to reject task with id={}", id, err);
}
return affects;
}

/**
* Returns true iff the task affects any sstable that satisfies the given predicate.
*/
public boolean affectsAny(CompactionRealm realm, Predicate<SSTableReader> sstablePredicate)
{
if (realm != this.realm)
return false;

for (SSTableReader r : transaction.originals())
{
if (sstablePredicate.test(r))
{
Throwables.maybeFail(rejected(null));
return true;
}
}
return false;
}
Expand Down
30 changes: 28 additions & 2 deletions src/java/org/apache/cassandra/db/compaction/ActiveOperations.java
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,33 @@ public void removeTaskFromScheduled(AbstractCompactionTask task)
scheduledTasks.remove(task);
}

public void cancelScheduledTasksAffecting(Iterable<ColumnFamilyStore> cfss, Predicate<SSTableReader> predicate)
public Collection<AbstractCompactionTask> getScheduledTasksMatching(Iterable<ColumnFamilyStore> cfss, Predicate<SSTableReader> predicate, Predicate<AbstractCompactionTask> taskPredicate)
{
List<AbstractCompactionTask> tasksCopy;
synchronized (scheduledTasks)
{
tasksCopy = new ArrayList<>(scheduledTasks);
}

List<AbstractCompactionTask> matching = new ArrayList<>(tasksCopy.size());
for (AbstractCompactionTask task : tasksCopy)
{
if (taskPredicate.test(task))
{
for (ColumnFamilyStore cfs : cfss)
{
if (task.affectsAny(cfs, predicate))
{
matching.add(task);
break;
}
}
}
}
return matching;
}

public void cancelScheduledTasksAffecting(Iterable<ColumnFamilyStore> cfss, Predicate<SSTableReader> predicate, TableOperation.StopTrigger trigger)
{
Iterable<AbstractCompactionTask> tasksCopy;
synchronized (scheduledTasks)
Expand All @@ -211,7 +237,7 @@ public void cancelScheduledTasksAffecting(Iterable<ColumnFamilyStore> cfss, Pred

for (AbstractCompactionTask task : tasksCopy)
for (ColumnFamilyStore cfs : cfss)
task.cancelIfAffects(cfs, predicate);
task.cancelIfAffects(cfs, predicate, trigger);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2516,7 +2516,7 @@ public void setMaxConcurrentAutoUpgradeTasks(int value)
}
}

public List<TableOperation> getCompactionsMatching(Iterable<TableMetadata> columnFamilies, Predicate<TableOperation.Progress> predicate)
public List<TableOperation> getCompactionsMatching(Iterable<TableMetadata> columnFamilies, Predicate<SSTableReader> sstablePredicate, Predicate<TableOperation.Progress> progressPredicate)
{
Preconditions.checkArgument(columnFamilies != null, "Attempted to getCompactionsMatching in CompactionManager with no columnFamilies specified.");

Expand All @@ -2527,7 +2527,7 @@ public List<TableOperation> getCompactionsMatching(Iterable<TableMetadata> colum
TableOperation.Progress progress = holder.getProgress();
if (progress.metadata() == null || Iterables.contains(columnFamilies, progress.metadata()))
{
if (predicate.test(progress))
if (progressPredicate.test(progress) && holder.shouldStop(sstablePredicate))
matched.add(holder);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public Throwable rejected(Throwable t)
}

@Override
public boolean cancelIfAffects(CompactionRealm realm, Predicate<SSTableReader> sstablePredicate)
public boolean cancelIfAffects(CompactionRealm realm, Predicate<SSTableReader> sstablePredicate, TableOperation.StopTrigger trigger)
{
// Leave cancellation to the individual tasks.
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ public void testStandardCompactionTaskCancellation() throws Throwable
Thread t = new Thread(() -> {
Uninterruptibles.awaitUninterruptibly(waitForBeginCompaction);
getCurrentColumnFamilyStore().getCompactionStrategyContainer().pause();
CompactionManager.instance.active.cancelScheduledTasksAffecting(cfss, Predicates.alwaysTrue());
CompactionManager.instance.active.cancelScheduledTasksAffecting(cfss, Predicates.alwaysTrue(), UNIT_TESTS);
CompactionManager.instance.interruptCompactionFor(Iterables.transform(cfss, ColumnFamilyStore::metadata),
Predicates.alwaysTrue(), false, UNIT_TESTS);
waitForStart.countDown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public void cancelledOnStart()
}
};
Assert.assertNotNull(task);
task.cancelIfAffects(cfs, Predicates.alwaysTrue());
task.cancelIfAffects(cfs, Predicates.alwaysTrue(), TableOperation.StopTrigger.UNIT_TESTS);
try
{
task.execute(CompactionManager.instance.active);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ public static void setUpClass()

private static final Logger logger = LoggerFactory.getLogger(RandomizedCancelCompactionsTest.class);

private static final int TEST_DURATION_SECONDS = 40;
private static final int MAX_CONCURRENT_COMPACTIONS = 40;
private static final int EXECUTOR_THREADS = 4; // Limited threads to keep tasks in queue
private static final int SSTABLE_COUNT = 500;
Expand All @@ -83,7 +82,19 @@ public static void setUpClass()
private static final double QUICK_EXIT_CHANCE = 0.05;

@Test
public void testRandomizedCancellation() throws Exception
public void testRandomizedCancellationAccepted() throws Exception
{
testRandomizedCancellation(OperationType.ANTICOMPACTION, 40);
}

@Test
public void testRandomizedCancellationReject() throws Exception
{
testRandomizedCancellation(OperationType.STREAM, 10);
}


public void testRandomizedCancellation(OperationType cancellationType, int durationInSeconds) throws Exception
{
ColumnFamilyStore cfs = MockSchema.newCFS();
List<SSTableReader> sstables = createSSTables(cfs, SSTABLE_COUNT, 0);
Expand All @@ -107,7 +118,7 @@ public void testRandomizedCancellation() throws Exception
AtomicInteger cancellationSuccess = new AtomicInteger(0);

long startTime = System.currentTimeMillis();
long endTime = startTime + TimeUnit.SECONDS.toMillis(TEST_DURATION_SECONDS);
long endTime = startTime + TimeUnit.SECONDS.toMillis(durationInSeconds);

try
{
Expand Down Expand Up @@ -142,6 +153,7 @@ public void testRandomizedCancellation() throws Exception
taskId,
random.nextDouble() < QUICK_EXIT_CHANCE ? (random.nextBoolean() ? 0 : -1)
: random.nextInt(MAX_COMPACTION_SLEEP_MS - MIN_COMPACTION_SLEEP_MS) + MIN_COMPACTION_SLEEP_MS,
endTime,
tasksCompleted,
tasksCancelled,
tasksCancelledBeforeStart,
Expand Down Expand Up @@ -202,7 +214,7 @@ public void testRandomizedCancellation() throws Exception
return true;
},
predicate,
OperationType.P0,
cancellationType,
false,
false,
false,
Expand All @@ -223,8 +235,8 @@ public void testRandomizedCancellation() throws Exception
});

// Wait for test duration
taskCreator.get(TEST_DURATION_SECONDS + 5, TimeUnit.SECONDS);
taskCanceller.get(TEST_DURATION_SECONDS + 5, TimeUnit.SECONDS);
taskCreator.get(durationInSeconds + 5, TimeUnit.SECONDS);
taskCanceller.get(durationInSeconds + 5, TimeUnit.SECONDS);

// Stop test
testRunning.set(false);
Expand Down Expand Up @@ -296,7 +308,10 @@ public void testRandomizedCancellation() throws Exception
tasksCancelledBeforeStart.get() + tasksCancelledAfterStart.get(),
tasksCancelled.get());

assertEquals("Cancellations should all succeed", cancellationAttempts.get(), cancellationSuccess.get());
if (cancellationType.priority <= OperationType.COMPACTION.priority)
assertEquals("Cancellations should all succeed", cancellationAttempts.get(), cancellationSuccess.get());
else
assertEquals("No tasks should be cancelled", 0, tasksCancelledBeforeStart.get() + tasksCancelledAfterStart.get());
}
finally
{
Expand Down Expand Up @@ -342,6 +357,7 @@ private static class RandomizedCompactionTask extends AbstractCompactionTask
private final Set<SSTableReader> sstables;
private final int taskId;
private final int sleepTimeMs;
private final long testEndTime;
private final AtomicInteger completedCounter;
private final AtomicInteger cancelledCounter;
private final AtomicInteger cancelledBeforeStartCounter;
Expand All @@ -363,6 +379,7 @@ public RandomizedCompactionTask(ColumnFamilyStore cfs,
Set<SSTableReader> sstables,
int taskId,
int sleepTimeMs,
long testEndTime,
AtomicInteger completedCounter,
AtomicInteger cancelledCounter,
AtomicInteger cancelledBeforeStartCounter,
Expand All @@ -373,6 +390,7 @@ public RandomizedCompactionTask(ColumnFamilyStore cfs,
this.sstables = sstables;
this.taskId = taskId;
this.sleepTimeMs = sleepTimeMs;
this.testEndTime = testEndTime;
this.completedCounter = completedCounter;
this.cancelledCounter = cancelledCounter;
this.cancelledBeforeStartCounter = cancelledBeforeStartCounter;
Expand Down Expand Up @@ -415,7 +433,11 @@ public void runMayThrow() throws InterruptedException

scanners = sstables.stream().map(SSTableReader::getScanner).collect(Collectors.toList());
controller = new CompactionController(cfs, sstables, Integer.MIN_VALUE);
ci = new CompactionIterator(transaction.opType(), scanners, controller, FBUtilities.nowInSeconds(), org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID());
ci = new CompactionIterator(transaction.opType(),
scanners,
controller,
FBUtilities.nowInSeconds(),
org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID());
TableOperation op = ci.getOperation();
closeable = opObserver.onOperationStart(op);
switchToActive();
Expand All @@ -432,7 +454,8 @@ public void runMayThrow() throws InterruptedException
logger.debug("Task {} stop requested after {}ms", taskId, slept);
break;
}
Thread.sleep(Math.min(100, sleepTimeMs - slept));
if (System.currentTimeMillis() < testEndTime) // finish quickly if the test run time is done
Thread.sleep(Math.min(100, sleepTimeMs - slept));
slept += 100;
}

Expand Down
Loading