Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,12 @@ public BigTableWriter(Descriptor descriptor,
Set<Component> indexComponents)
{
super(descriptor, components(metadata.getLocal(), indexComponents), lifecycleNewTracker, WRITER_OPTION, keyCount, repairedAt, pendingRepair, isTransient, metadata, metadataCollector, header, observers);
operationType = lifecycleNewTracker.opType();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

operationType is used by IndexWriter#constructor in the next line

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Silly me, missed it somehow.

iwriter = new IndexWriter(keyCount);

this.rowIndexEntrySerializer = new BigTableRowIndexEntry.Serializer(descriptor.version, header);
columnIndexWriter = new ColumnIndex(this.header, dataFile, descriptor.version, this.observers, rowIndexEntrySerializer.indexInfoSerializer());
txnProxy = new TransactionalProxy();
operationType = lifecycleNewTracker.opType();
}

private static Set<Component> components(TableMetadata metadata, Collection<Component> indexComponents)
Expand Down Expand Up @@ -310,7 +310,7 @@ class IndexWriter extends AbstractTransactional implements Transactional
indexFile = new SequentialWriter(descriptor.fileFor(Component.PRIMARY_INDEX), WRITER_OPTION);
builder = SSTableReaderBuilder.primaryIndexWriteTimeBuilder(descriptor, Component.PRIMARY_INDEX, operationType, false);
summary = new IndexSummaryBuilder(keyCount, metadata().params.minIndexInterval, Downsampling.BASE_SAMPLING_LEVEL);
bf = FilterFactory.getFilter(keyCount, metadata().params.bloomFilterFpChance);
bf = FilterFactory.getFilterForWrite(keyCount, metadata().params.bloomFilterFpChance, operationType);
// register listeners to be alerted when the data files are flushed
indexFile.setPostFlushListener(() -> summary.markIndexSynced(indexFile.getLastFlushOffset()));
dataFile.setPostFlushListener(() -> summary.markDataSynced(dataFile.getLastFlushOffset()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ class IndexWriter extends AbstractTransactional implements Transactional
}

partitionIndex = new PartitionIndexBuilder(partitionIndexFile, partitionIndexFHBuilder, descriptor.version.getByteComparableVersion());
bf = FilterFactory.getFilter(keyCount, table.params.bloomFilterFpChance);
bf = FilterFactory.getFilterForWrite(keyCount, table.params.bloomFilterFpChance, operationType);

// register listeners to be alerted when the data files are flushed
partitionIndexFile.setPostFlushListener(() -> partitionIndex.markPartitionIndexSynced(partitionIndexFile.getLastFlushOffset()));
Expand Down
14 changes: 14 additions & 0 deletions src/java/org/apache/cassandra/utils/BloomFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ public class BloomFilter extends WrappedSharedCloseable implements IFilter
@VisibleForTesting
public static boolean recreateOnFPChanceChange = Boolean.getBoolean(RECREATE_ON_FP_CHANCE_CHANGE);

/**
* If true, Bloom filters ignore the memory limit during flush. CNDB uses this to avoid missing Bloom filters
* when reloading from remote storage.
*/
public static final String IGNORE_MEMORY_LIMIT_ON_FLUSH_PROP = Config.PROPERTY_PREFIX + "bf.ignore_memory_limit_on_flush";

public static final MemoryLimiter memoryLimiter = new MemoryLimiter(maxMemory != 0 ? maxMemory : Long.MAX_VALUE,
"Allocating %s for Bloom filter would reach max of %s (current %s)");

Expand Down Expand Up @@ -93,6 +99,14 @@ private BloomFilter(BloomFilter copy)
this.bitset = copy.bitset;
}

/**
* @return true to ignore bloom filter memory limit during flush
*/
public static boolean ignoreMemoryLimitOnFlush()
{
return Boolean.getBoolean(IGNORE_MEMORY_LIMIT_ON_FLUSH_PROP);
}

/**
* @return true if sstable's bloom filter should be deserialized on read instead of when opening sstable. This
* doesn't affect flushed sstable because there is bloom filter deserialization
Expand Down
26 changes: 22 additions & 4 deletions src/java/org/apache/cassandra/utils/FilterFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.metrics.DefaultNameFactory;
import org.apache.cassandra.metrics.MetricNameFactory;
import org.apache.cassandra.metrics.MicrometerMetrics;
Expand Down Expand Up @@ -63,7 +64,7 @@ public static IFilter getFilter(long numElements, int targetBucketsPerElem, Memo
logger.warn("Cannot provide an optimal BloomFilter for {} elements ({}/{} buckets per element).", numElements, bucketsPerElement, targetBucketsPerElem);
}
BloomCalculations.BloomSpecification spec = BloomCalculations.computeBloomSpec(bucketsPerElement);
return createFilter(spec.K, numElements, spec.bucketsPerElement, memoryLimiter);
return createFilter(spec.K, numElements, spec.bucketsPerElement, memoryLimiter, true);
}

/**
Expand All @@ -80,22 +81,39 @@ public static IFilter getFilter(long numElements, double maxFalsePosProbability)
}

public static IFilter getFilter(long numElements, double maxFalsePosProbability, MemoryLimiter memoryLimiter)
{
return createFilter(numElements, maxFalsePosProbability, memoryLimiter, true);
}

public static IFilter getFilterForWrite(long numElements, double maxFalsePosProbability, OperationType operationType)
{
return getFilterForWrite(numElements, maxFalsePosProbability, operationType, BloomFilter.memoryLimiter);
}

@VisibleForTesting
static IFilter getFilterForWrite(long numElements, double maxFalsePosProbability, OperationType operationType, MemoryLimiter memoryLimiter)
{
boolean ignoreMemoryLimit = operationType == OperationType.FLUSH && BloomFilter.ignoreMemoryLimitOnFlush();
return createFilter(numElements, maxFalsePosProbability, memoryLimiter, !ignoreMemoryLimit);
}

private static IFilter createFilter(long numElements, double maxFalsePosProbability, MemoryLimiter memoryLimiter, boolean failOnExceedingLimit)
{
assert maxFalsePosProbability <= 1.0 : "Invalid probability";
if (maxFalsePosProbability == 1.0)
return AlwaysPresent;
int bucketsPerElement = BloomCalculations.maxBucketsPerElement(numElements);
BloomCalculations.BloomSpecification spec = BloomCalculations.computeBloomSpec(bucketsPerElement, maxFalsePosProbability);
return createFilter(spec.K, numElements, spec.bucketsPerElement, memoryLimiter);
return createFilter(spec.K, numElements, spec.bucketsPerElement, memoryLimiter, failOnExceedingLimit);
}

@SuppressWarnings("resource")
private static IFilter createFilter(int hash, long numElements, int bucketsPer, MemoryLimiter memoryLimiter)
private static IFilter createFilter(int hash, long numElements, int bucketsPer, MemoryLimiter memoryLimiter, boolean failOnExceedingLimit)
{
try
{
long numBits = (numElements * bucketsPer) + BITSET_EXCESS;
IBitSet bitset = new OffHeapBitSet(numBits, memoryLimiter);
IBitSet bitset = new OffHeapBitSet(numBits, memoryLimiter, failOnExceedingLimit);
return new BloomFilter(hash, bitset);
}
catch (MemoryLimiter.ReachedMemoryLimitException | OutOfMemoryError e)
Expand Down
4 changes: 2 additions & 2 deletions src/java/org/apache/cassandra/utils/obs/MemoryLimiter.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ public MemoryLimiter(long maxMemory, String exceptionFormat)
this.exceptionFormat = exceptionFormat;
}

public void increment(long bytesCount) throws ReachedMemoryLimitException
public void increment(long bytesCount, boolean failOnExceedingLimit) throws ReachedMemoryLimitException
{
assert bytesCount >= 0;
long bytesCountAfterAllocation = this.currentMemory.addAndGet(bytesCount);
// if overflow or exceeded max memory
if (bytesCountAfterAllocation < 0 || bytesCountAfterAllocation >= maxMemory)
if (bytesCountAfterAllocation < 0 || (failOnExceedingLimit && bytesCountAfterAllocation >= maxMemory))
{
this.currentMemory.addAndGet(-bytesCount);

Expand Down
10 changes: 5 additions & 5 deletions src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class OffHeapBitSet implements IBitSet
private final Memory bytes;
private final MemoryLimiter memoryLimiter;

public OffHeapBitSet(long numBits, MemoryLimiter memoryLimiter) throws MemoryLimiter.ReachedMemoryLimitException
public OffHeapBitSet(long numBits, MemoryLimiter memoryLimiter, boolean failOnExceedingLimit) throws MemoryLimiter.ReachedMemoryLimitException
{
this.memoryLimiter = memoryLimiter;
// returns the number of 64 bit words it would take to hold numBits
Expand All @@ -53,7 +53,7 @@ public OffHeapBitSet(long numBits, MemoryLimiter memoryLimiter) throws MemoryLim
throw new UnsupportedOperationException("Bloom filter size is > 16GB, reduce the bloom_filter_fp_chance");

long byteCount = wordCount * 8L;
bytes = allocate(byteCount, memoryLimiter); // Can possibly throw OOM, but we handle it in the caller
bytes = allocate(byteCount, memoryLimiter, failOnExceedingLimit); // Can possibly throw OOM, but we handle it in the caller
// flush/clear the existing memory.
clear();
}
Expand All @@ -64,9 +64,9 @@ private OffHeapBitSet(Memory bytes, MemoryLimiter memoryLimiter)
this.bytes = bytes;
}

private static Memory allocate(long byteCount, MemoryLimiter memoryLimiter) throws MemoryLimiter.ReachedMemoryLimitException
private static Memory allocate(long byteCount, MemoryLimiter memoryLimiter, boolean failOnExceedingLimit) throws MemoryLimiter.ReachedMemoryLimitException
{
memoryLimiter.increment(byteCount);
memoryLimiter.increment(byteCount, failOnExceedingLimit);
try
{
return Memory.allocate(byteCount);
Expand Down Expand Up @@ -170,7 +170,7 @@ public long serializedSize()
public static <I extends InputStream & DataInput> OffHeapBitSet deserialize(I in, boolean oldBfFormat, MemoryLimiter memoryLimiter) throws IOException, MemoryLimiter.ReachedMemoryLimitException
{
long byteCount = in.readInt() * 8L;
Memory memory = allocate(byteCount, memoryLimiter);
Memory memory = allocate(byteCount, memoryLimiter, true);
if (oldBfFormat)
{
for (long i = 0; i < byteCount; )
Expand Down
80 changes: 78 additions & 2 deletions test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,15 @@
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadExecutionController;
import org.apache.cassandra.db.RowUpdateBuilder;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.db.lifecycle.View;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
import org.apache.cassandra.db.rows.EncodingStats;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.dht.IPartitioner;
Expand Down Expand Up @@ -183,6 +186,7 @@ public void Cleanup() {
Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD).truncateBlocking();
Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD2).truncateBlocking();
BloomFilter.recreateOnFPChanceChange = false;
System.clearProperty(BloomFilter.IGNORE_MEMORY_LIMIT_ON_FLUSH_PROP);

Throwable exceptions = null;
for (Ref<?> ref : refsToRelease)
Expand All @@ -201,6 +205,9 @@ public void Cleanup() {
fail("Unable to release all tracked references " + exceptions);

refsToRelease.clear();

// wait for async cleanup completion to avoid impacting BF memory limiter
LifecycleTransaction.waitForDeletions();
}

@Test
Expand Down Expand Up @@ -1366,6 +1373,31 @@ private SSTableReader getNewSSTable(ColumnFamilyStore cfs, int numKeys, int step
return Sets.difference(cfs.getLiveSSTables(), before).iterator().next();
}

private SSTableReader getNewSSTable(ColumnFamilyStore cfs, int numKeys, int step, OperationType operationType)
{
NavigableMap<DecoratedKey, PartitionUpdate> updates = new TreeMap<>();
for (int j = 0; j < numKeys; j += step)
{
PartitionUpdate update = new RowUpdateBuilder(cfs.metadata(), j, String.valueOf(j))
.clustering("0")
.add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
.buildUpdate();
updates.put(update.partitionKey(), update);
}

SerializationHeader header = new SerializationHeader(true, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS);
LifecycleTransaction txn = LifecycleTransaction.offline(operationType, cfs.metadata);
SSTableMultiWriter writer = cfs.createSSTableMultiWriter(cfs.newSSTableDescriptor(cfs.getDirectories().getDirectoryForNewSSTables()),
numKeys, 0, null, false, header, txn);
try (SSTableTxnWriter txnWriter = new SSTableTxnWriter(txn, writer))
{
for (PartitionUpdate update : updates.values())
txnWriter.append(update.unfilteredIterator());

return txnWriter.finish(true, cfs.getStorageHandler()).iterator().next();
}
}

@Test
public void testGetApproximateKeyCount() throws Exception
{
Expand Down Expand Up @@ -1495,17 +1527,61 @@ public void testBloomFilterIsCreatedOnLoad() throws IOException

@Test
public void testSSTableFlushBloomFilterReachedLimit() throws Exception
{
testSSTableFlushBloomFilterMemoryLimit(false);
}

@Test
public void testSSTableFlushBloomFilterIgnoreMemoryLimit() throws Exception
{
testSSTableFlushBloomFilterMemoryLimit(true);
}

private void testSSTableFlushBloomFilterMemoryLimit(boolean ignoreMemoryLimitOnFlush) throws Exception
{
final int numKeys = 100; // will use about 128 bytes
final Keyspace keyspace = Keyspace.open(KEYSPACE1);
final ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD);

SSTableReader sstable;
long bfSpace = BloomFilter.memoryLimiter.maxMemory - BloomFilter.memoryLimiter.memoryAllocated() - 100;
long bfSpace = BloomFilter.memoryLimiter.maxMemory - BloomFilter.memoryLimiter.memoryAllocated() - 100;
try
{
BloomFilter.memoryLimiter.increment(bfSpace);
System.setProperty(BloomFilter.IGNORE_MEMORY_LIMIT_ON_FLUSH_PROP, Boolean.toString(ignoreMemoryLimitOnFlush));
BloomFilter.memoryLimiter.increment(bfSpace, true);
sstable = getNewSSTable(cfs, numKeys, 1);
if (ignoreMemoryLimitOnFlush)
{
Assert.assertTrue(PathUtils.exists(sstable.descriptor.pathFor(Component.FILTER)));
Assert.assertNotSame(FilterFactory.AlwaysPresent, sstable.getBloomFilter());
}
else
{
Assert.assertFalse(PathUtils.exists(sstable.descriptor.pathFor(Component.FILTER)));
Assert.assertSame(FilterFactory.AlwaysPresent, sstable.getBloomFilter());
}
}
finally
{
// reset
BloomFilter.memoryLimiter.decrement(bfSpace);
}
}

@Test
public void testSSTableNonFlushBloomFilterDoesNotIgnoreMemoryLimit() throws Exception
{
final int numKeys = 100; // will use about 128 bytes
final Keyspace keyspace = Keyspace.open(KEYSPACE1);
final ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD);

SSTableReader sstable;
long bfSpace = BloomFilter.memoryLimiter.maxMemory - BloomFilter.memoryLimiter.memoryAllocated() - 100;
try
{
System.setProperty(BloomFilter.IGNORE_MEMORY_LIMIT_ON_FLUSH_PROP, "true");
BloomFilter.memoryLimiter.increment(bfSpace, true);
sstable = trackReleaseableRef(() -> getNewSSTable(cfs, numKeys, 1, OperationType.WRITE));
Assert.assertFalse(PathUtils.exists(sstable.descriptor.pathFor(Component.FILTER)));
Assert.assertSame(FilterFactory.AlwaysPresent, sstable.getBloomFilter());
}
Expand Down
60 changes: 59 additions & 1 deletion test/unit/org/apache/cassandra/utils/BloomFilterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.io.util.DataOutputBuffer;
Expand All @@ -54,6 +55,8 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;

public class BloomFilterTest
Expand Down Expand Up @@ -102,6 +105,7 @@ public void setup()
@After
public void destroy()
{
System.clearProperty(BloomFilter.IGNORE_MEMORY_LIMIT_ON_FLUSH_PROP);
bfInvHashes.close();
assertEquals(0, memoryLimiter.memoryAllocated());
}
Expand Down Expand Up @@ -297,6 +301,60 @@ public void testMaxMemoryExceeded()
}
}

@Test
public void testMaxMemoryExceededForNonFlush()
{
long allocSize = 2L * (1 << 20);
double fpChance = 0.01;

long initialOOM = FilterFactory.metrics.oomErrors();
MemoryLimiter memoryLimiter = new MemoryLimiter(1, "Allocating %s for bloom filter would reach max of %s (current %s)");
try (IFilter filter = FilterFactory.getFilterForWrite(allocSize, fpChance, OperationType.COMPACTION, memoryLimiter))
{
assertSame(FilterFactory.AlwaysPresent, filter);
assertEquals(initialOOM + 1, FilterFactory.metrics.oomErrors());
}

System.setProperty(BloomFilter.IGNORE_MEMORY_LIMIT_ON_FLUSH_PROP, "true");
try (IFilter filter = FilterFactory.getFilterForWrite(allocSize, fpChance, OperationType.COMPACTION, memoryLimiter))
{
assertSame(FilterFactory.AlwaysPresent, filter);
assertEquals(initialOOM + 2, FilterFactory.metrics.oomErrors());
}
}

@Test
public void testMaxMemoryExceededForFlush()
{
long allocSize = 2L * (1 << 20);
double fpChance = 0.01;

long initialOOM = FilterFactory.metrics.oomErrors();
MemoryLimiter memoryLimiter = new MemoryLimiter(1, "Allocating %s for bloom filter would reach max of %s (current %s)");
try (IFilter filter = FilterFactory.getFilterForWrite(allocSize, fpChance, OperationType.FLUSH, memoryLimiter))
{
assertSame(FilterFactory.AlwaysPresent, filter);
assertEquals(initialOOM + 1, FilterFactory.metrics.oomErrors());
}
}

@Test
public void testMaxMemoryExceededForFlushIgnoreMemoryLimit()
{
long allocSize = 2L * (1 << 20);
double fpChance = 0.01;

long initialOOM = FilterFactory.metrics.oomErrors();
MemoryLimiter memoryLimiter = new MemoryLimiter(1, "Allocating %s for bloom filter would reach max of %s (current %s)");
System.setProperty(BloomFilter.IGNORE_MEMORY_LIMIT_ON_FLUSH_PROP, "true");
try (IFilter filter = FilterFactory.getFilterForWrite(allocSize, fpChance, OperationType.FLUSH, memoryLimiter))
{
assertNotSame(FilterFactory.AlwaysPresent, filter);
assertTrue(filter.offHeapSize() > 0);
assertEquals(initialOOM, FilterFactory.metrics.oomErrors());
}
}

@Test
public void testMaxMemoryExceededOnDeserialize() throws IOException
{
Expand Down Expand Up @@ -372,4 +430,4 @@ public void testBloomFilterSize()
}
}
}
}
}
Loading