diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java index 120cb4663201..3482e561dc8d 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java @@ -95,12 +95,12 @@ public BigTableWriter(Descriptor descriptor, Set indexComponents) { super(descriptor, components(metadata.getLocal(), indexComponents), lifecycleNewTracker, WRITER_OPTION, keyCount, repairedAt, pendingRepair, isTransient, metadata, metadataCollector, header, observers); + operationType = lifecycleNewTracker.opType(); iwriter = new IndexWriter(keyCount); this.rowIndexEntrySerializer = new BigTableRowIndexEntry.Serializer(descriptor.version, header); columnIndexWriter = new ColumnIndex(this.header, dataFile, descriptor.version, this.observers, rowIndexEntrySerializer.indexInfoSerializer()); txnProxy = new TransactionalProxy(); - operationType = lifecycleNewTracker.opType(); } private static Set components(TableMetadata metadata, Collection indexComponents) @@ -310,7 +310,7 @@ class IndexWriter extends AbstractTransactional implements Transactional indexFile = new SequentialWriter(descriptor.fileFor(Component.PRIMARY_INDEX), WRITER_OPTION); builder = SSTableReaderBuilder.primaryIndexWriteTimeBuilder(descriptor, Component.PRIMARY_INDEX, operationType, false); summary = new IndexSummaryBuilder(keyCount, metadata().params.minIndexInterval, Downsampling.BASE_SAMPLING_LEVEL); - bf = FilterFactory.getFilter(keyCount, metadata().params.bloomFilterFpChance); + bf = FilterFactory.getFilterForWrite(keyCount, metadata().params.bloomFilterFpChance, operationType); // register listeners to be alerted when the data files are flushed indexFile.setPostFlushListener(() -> summary.markIndexSynced(indexFile.getLastFlushOffset())); dataFile.setPostFlushListener(() -> summary.markDataSynced(dataFile.getLastFlushOffset())); diff --git a/src/java/org/apache/cassandra/io/sstable/format/trieindex/TrieIndexSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/trieindex/TrieIndexSSTableWriter.java index 427b378dc75c..f8de4d50dc60 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/trieindex/TrieIndexSSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/trieindex/TrieIndexSSTableWriter.java @@ -347,7 +347,7 @@ class IndexWriter extends AbstractTransactional implements Transactional } partitionIndex = new PartitionIndexBuilder(partitionIndexFile, partitionIndexFHBuilder, descriptor.version.getByteComparableVersion()); - bf = FilterFactory.getFilter(keyCount, table.params.bloomFilterFpChance); + bf = FilterFactory.getFilterForWrite(keyCount, table.params.bloomFilterFpChance, operationType); // register listeners to be alerted when the data files are flushed partitionIndexFile.setPostFlushListener(() -> partitionIndex.markPartitionIndexSynced(partitionIndexFile.getLastFlushOffset())); diff --git a/src/java/org/apache/cassandra/utils/BloomFilter.java b/src/java/org/apache/cassandra/utils/BloomFilter.java index bb1fa87c5207..e017b7ffbad0 100644 --- a/src/java/org/apache/cassandra/utils/BloomFilter.java +++ b/src/java/org/apache/cassandra/utils/BloomFilter.java @@ -63,6 +63,12 @@ public class BloomFilter extends WrappedSharedCloseable implements IFilter @VisibleForTesting public static boolean recreateOnFPChanceChange = Boolean.getBoolean(RECREATE_ON_FP_CHANCE_CHANGE); + /** + * If true, Bloom filters ignore the memory limit during flush. CNDB uses this to avoid missing Bloom filters + * when reloading from remote storage. + */ + public static final String IGNORE_MEMORY_LIMIT_ON_FLUSH_PROP = Config.PROPERTY_PREFIX + "bf.ignore_memory_limit_on_flush"; + public static final MemoryLimiter memoryLimiter = new MemoryLimiter(maxMemory != 0 ? maxMemory : Long.MAX_VALUE, "Allocating %s for Bloom filter would reach max of %s (current %s)"); @@ -93,6 +99,14 @@ private BloomFilter(BloomFilter copy) this.bitset = copy.bitset; } + /** + * @return true to ignore bloom filter memory limit during flush + */ + public static boolean ignoreMemoryLimitOnFlush() + { + return Boolean.getBoolean(IGNORE_MEMORY_LIMIT_ON_FLUSH_PROP); + } + /** * @return true if sstable's bloom filter should be deserialized on read instead of when opening sstable. This * doesn't affect flushed sstable because there is bloom filter deserialization diff --git a/src/java/org/apache/cassandra/utils/FilterFactory.java b/src/java/org/apache/cassandra/utils/FilterFactory.java index 0888280fd98f..7b71769342b3 100644 --- a/src/java/org/apache/cassandra/utils/FilterFactory.java +++ b/src/java/org/apache/cassandra/utils/FilterFactory.java @@ -24,6 +24,7 @@ import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Tags; +import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.metrics.DefaultNameFactory; import org.apache.cassandra.metrics.MetricNameFactory; import org.apache.cassandra.metrics.MicrometerMetrics; @@ -63,7 +64,7 @@ public static IFilter getFilter(long numElements, int targetBucketsPerElem, Memo logger.warn("Cannot provide an optimal BloomFilter for {} elements ({}/{} buckets per element).", numElements, bucketsPerElement, targetBucketsPerElem); } BloomCalculations.BloomSpecification spec = BloomCalculations.computeBloomSpec(bucketsPerElement); - return createFilter(spec.K, numElements, spec.bucketsPerElement, memoryLimiter); + return createFilter(spec.K, numElements, spec.bucketsPerElement, memoryLimiter, true); } /** @@ -80,22 +81,39 @@ public static IFilter getFilter(long numElements, double maxFalsePosProbability) } public static IFilter getFilter(long numElements, double maxFalsePosProbability, MemoryLimiter memoryLimiter) + { + return createFilter(numElements, maxFalsePosProbability, memoryLimiter, true); + } + + public static IFilter getFilterForWrite(long numElements, double maxFalsePosProbability, OperationType operationType) + { + return getFilterForWrite(numElements, maxFalsePosProbability, operationType, BloomFilter.memoryLimiter); + } + + @VisibleForTesting + static IFilter getFilterForWrite(long numElements, double maxFalsePosProbability, OperationType operationType, MemoryLimiter memoryLimiter) + { + boolean ignoreMemoryLimit = operationType == OperationType.FLUSH && BloomFilter.ignoreMemoryLimitOnFlush(); + return createFilter(numElements, maxFalsePosProbability, memoryLimiter, !ignoreMemoryLimit); + } + + private static IFilter createFilter(long numElements, double maxFalsePosProbability, MemoryLimiter memoryLimiter, boolean failOnExceedingLimit) { assert maxFalsePosProbability <= 1.0 : "Invalid probability"; if (maxFalsePosProbability == 1.0) return AlwaysPresent; int bucketsPerElement = BloomCalculations.maxBucketsPerElement(numElements); BloomCalculations.BloomSpecification spec = BloomCalculations.computeBloomSpec(bucketsPerElement, maxFalsePosProbability); - return createFilter(spec.K, numElements, spec.bucketsPerElement, memoryLimiter); + return createFilter(spec.K, numElements, spec.bucketsPerElement, memoryLimiter, failOnExceedingLimit); } @SuppressWarnings("resource") - private static IFilter createFilter(int hash, long numElements, int bucketsPer, MemoryLimiter memoryLimiter) + private static IFilter createFilter(int hash, long numElements, int bucketsPer, MemoryLimiter memoryLimiter, boolean failOnExceedingLimit) { try { long numBits = (numElements * bucketsPer) + BITSET_EXCESS; - IBitSet bitset = new OffHeapBitSet(numBits, memoryLimiter); + IBitSet bitset = new OffHeapBitSet(numBits, memoryLimiter, failOnExceedingLimit); return new BloomFilter(hash, bitset); } catch (MemoryLimiter.ReachedMemoryLimitException | OutOfMemoryError e) diff --git a/src/java/org/apache/cassandra/utils/obs/MemoryLimiter.java b/src/java/org/apache/cassandra/utils/obs/MemoryLimiter.java index 295a223ae25e..0715829c5006 100644 --- a/src/java/org/apache/cassandra/utils/obs/MemoryLimiter.java +++ b/src/java/org/apache/cassandra/utils/obs/MemoryLimiter.java @@ -35,12 +35,12 @@ public MemoryLimiter(long maxMemory, String exceptionFormat) this.exceptionFormat = exceptionFormat; } - public void increment(long bytesCount) throws ReachedMemoryLimitException + public void increment(long bytesCount, boolean failOnExceedingLimit) throws ReachedMemoryLimitException { assert bytesCount >= 0; long bytesCountAfterAllocation = this.currentMemory.addAndGet(bytesCount); // if overflow or exceeded max memory - if (bytesCountAfterAllocation < 0 || bytesCountAfterAllocation >= maxMemory) + if (bytesCountAfterAllocation < 0 || (failOnExceedingLimit && bytesCountAfterAllocation >= maxMemory)) { this.currentMemory.addAndGet(-bytesCount); diff --git a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java index 3826175b69df..f9320c098242 100644 --- a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java +++ b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java @@ -44,7 +44,7 @@ public class OffHeapBitSet implements IBitSet private final Memory bytes; private final MemoryLimiter memoryLimiter; - public OffHeapBitSet(long numBits, MemoryLimiter memoryLimiter) throws MemoryLimiter.ReachedMemoryLimitException + public OffHeapBitSet(long numBits, MemoryLimiter memoryLimiter, boolean failOnExceedingLimit) throws MemoryLimiter.ReachedMemoryLimitException { this.memoryLimiter = memoryLimiter; // returns the number of 64 bit words it would take to hold numBits @@ -53,7 +53,7 @@ public OffHeapBitSet(long numBits, MemoryLimiter memoryLimiter) throws MemoryLim throw new UnsupportedOperationException("Bloom filter size is > 16GB, reduce the bloom_filter_fp_chance"); long byteCount = wordCount * 8L; - bytes = allocate(byteCount, memoryLimiter); // Can possibly throw OOM, but we handle it in the caller + bytes = allocate(byteCount, memoryLimiter, failOnExceedingLimit); // Can possibly throw OOM, but we handle it in the caller // flush/clear the existing memory. clear(); } @@ -64,9 +64,9 @@ private OffHeapBitSet(Memory bytes, MemoryLimiter memoryLimiter) this.bytes = bytes; } - private static Memory allocate(long byteCount, MemoryLimiter memoryLimiter) throws MemoryLimiter.ReachedMemoryLimitException + private static Memory allocate(long byteCount, MemoryLimiter memoryLimiter, boolean failOnExceedingLimit) throws MemoryLimiter.ReachedMemoryLimitException { - memoryLimiter.increment(byteCount); + memoryLimiter.increment(byteCount, failOnExceedingLimit); try { return Memory.allocate(byteCount); @@ -170,7 +170,7 @@ public long serializedSize() public static OffHeapBitSet deserialize(I in, boolean oldBfFormat, MemoryLimiter memoryLimiter) throws IOException, MemoryLimiter.ReachedMemoryLimitException { long byteCount = in.readInt() * 8L; - Memory memory = allocate(byteCount, memoryLimiter); + Memory memory = allocate(byteCount, memoryLimiter, true); if (oldBfFormat) { for (long i = 0; i < byteCount; ) diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java index a6093590ddfa..e9c8642efe1d 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java @@ -70,12 +70,15 @@ import org.apache.cassandra.db.ReadCommand; import org.apache.cassandra.db.ReadExecutionController; import org.apache.cassandra.db.RowUpdateBuilder; +import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.db.lifecycle.SSTableSet; import org.apache.cassandra.db.lifecycle.View; +import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; +import org.apache.cassandra.db.rows.EncodingStats; import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.dht.IPartitioner; @@ -183,6 +186,7 @@ public void Cleanup() { Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD).truncateBlocking(); Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD2).truncateBlocking(); BloomFilter.recreateOnFPChanceChange = false; + System.clearProperty(BloomFilter.IGNORE_MEMORY_LIMIT_ON_FLUSH_PROP); Throwable exceptions = null; for (Ref ref : refsToRelease) @@ -201,6 +205,9 @@ public void Cleanup() { fail("Unable to release all tracked references " + exceptions); refsToRelease.clear(); + + // wait for async cleanup completion to avoid impacting BF memory limiter + LifecycleTransaction.waitForDeletions(); } @Test @@ -1366,6 +1373,31 @@ private SSTableReader getNewSSTable(ColumnFamilyStore cfs, int numKeys, int step return Sets.difference(cfs.getLiveSSTables(), before).iterator().next(); } + private SSTableReader getNewSSTable(ColumnFamilyStore cfs, int numKeys, int step, OperationType operationType) + { + NavigableMap updates = new TreeMap<>(); + for (int j = 0; j < numKeys; j += step) + { + PartitionUpdate update = new RowUpdateBuilder(cfs.metadata(), j, String.valueOf(j)) + .clustering("0") + .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER) + .buildUpdate(); + updates.put(update.partitionKey(), update); + } + + SerializationHeader header = new SerializationHeader(true, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS); + LifecycleTransaction txn = LifecycleTransaction.offline(operationType, cfs.metadata); + SSTableMultiWriter writer = cfs.createSSTableMultiWriter(cfs.newSSTableDescriptor(cfs.getDirectories().getDirectoryForNewSSTables()), + numKeys, 0, null, false, header, txn); + try (SSTableTxnWriter txnWriter = new SSTableTxnWriter(txn, writer)) + { + for (PartitionUpdate update : updates.values()) + txnWriter.append(update.unfilteredIterator()); + + return txnWriter.finish(true, cfs.getStorageHandler()).iterator().next(); + } + } + @Test public void testGetApproximateKeyCount() throws Exception { @@ -1495,17 +1527,61 @@ public void testBloomFilterIsCreatedOnLoad() throws IOException @Test public void testSSTableFlushBloomFilterReachedLimit() throws Exception + { + testSSTableFlushBloomFilterMemoryLimit(false); + } + + @Test + public void testSSTableFlushBloomFilterIgnoreMemoryLimit() throws Exception + { + testSSTableFlushBloomFilterMemoryLimit(true); + } + + private void testSSTableFlushBloomFilterMemoryLimit(boolean ignoreMemoryLimitOnFlush) throws Exception { final int numKeys = 100; // will use about 128 bytes final Keyspace keyspace = Keyspace.open(KEYSPACE1); final ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD); SSTableReader sstable; - long bfSpace = BloomFilter.memoryLimiter.maxMemory - BloomFilter.memoryLimiter.memoryAllocated() - 100; + long bfSpace = BloomFilter.memoryLimiter.maxMemory - BloomFilter.memoryLimiter.memoryAllocated() - 100; try { - BloomFilter.memoryLimiter.increment(bfSpace); + System.setProperty(BloomFilter.IGNORE_MEMORY_LIMIT_ON_FLUSH_PROP, Boolean.toString(ignoreMemoryLimitOnFlush)); + BloomFilter.memoryLimiter.increment(bfSpace, true); sstable = getNewSSTable(cfs, numKeys, 1); + if (ignoreMemoryLimitOnFlush) + { + Assert.assertTrue(PathUtils.exists(sstable.descriptor.pathFor(Component.FILTER))); + Assert.assertNotSame(FilterFactory.AlwaysPresent, sstable.getBloomFilter()); + } + else + { + Assert.assertFalse(PathUtils.exists(sstable.descriptor.pathFor(Component.FILTER))); + Assert.assertSame(FilterFactory.AlwaysPresent, sstable.getBloomFilter()); + } + } + finally + { + // reset + BloomFilter.memoryLimiter.decrement(bfSpace); + } + } + + @Test + public void testSSTableNonFlushBloomFilterDoesNotIgnoreMemoryLimit() throws Exception + { + final int numKeys = 100; // will use about 128 bytes + final Keyspace keyspace = Keyspace.open(KEYSPACE1); + final ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD); + + SSTableReader sstable; + long bfSpace = BloomFilter.memoryLimiter.maxMemory - BloomFilter.memoryLimiter.memoryAllocated() - 100; + try + { + System.setProperty(BloomFilter.IGNORE_MEMORY_LIMIT_ON_FLUSH_PROP, "true"); + BloomFilter.memoryLimiter.increment(bfSpace, true); + sstable = trackReleaseableRef(() -> getNewSSTable(cfs, numKeys, 1, OperationType.WRITE)); Assert.assertFalse(PathUtils.exists(sstable.descriptor.pathFor(Component.FILTER))); Assert.assertSame(FilterFactory.AlwaysPresent, sstable.getBloomFilter()); } diff --git a/test/unit/org/apache/cassandra/utils/BloomFilterTest.java b/test/unit/org/apache/cassandra/utils/BloomFilterTest.java index 4b466e68cf4a..95f3b9366db1 100644 --- a/test/unit/org/apache/cassandra/utils/BloomFilterTest.java +++ b/test/unit/org/apache/cassandra/utils/BloomFilterTest.java @@ -37,6 +37,7 @@ import io.micrometer.core.instrument.Tags; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.io.util.DataOutputBuffer; @@ -54,6 +55,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; public class BloomFilterTest @@ -102,6 +105,7 @@ public void setup() @After public void destroy() { + System.clearProperty(BloomFilter.IGNORE_MEMORY_LIMIT_ON_FLUSH_PROP); bfInvHashes.close(); assertEquals(0, memoryLimiter.memoryAllocated()); } @@ -297,6 +301,60 @@ public void testMaxMemoryExceeded() } } + @Test + public void testMaxMemoryExceededForNonFlush() + { + long allocSize = 2L * (1 << 20); + double fpChance = 0.01; + + long initialOOM = FilterFactory.metrics.oomErrors(); + MemoryLimiter memoryLimiter = new MemoryLimiter(1, "Allocating %s for bloom filter would reach max of %s (current %s)"); + try (IFilter filter = FilterFactory.getFilterForWrite(allocSize, fpChance, OperationType.COMPACTION, memoryLimiter)) + { + assertSame(FilterFactory.AlwaysPresent, filter); + assertEquals(initialOOM + 1, FilterFactory.metrics.oomErrors()); + } + + System.setProperty(BloomFilter.IGNORE_MEMORY_LIMIT_ON_FLUSH_PROP, "true"); + try (IFilter filter = FilterFactory.getFilterForWrite(allocSize, fpChance, OperationType.COMPACTION, memoryLimiter)) + { + assertSame(FilterFactory.AlwaysPresent, filter); + assertEquals(initialOOM + 2, FilterFactory.metrics.oomErrors()); + } + } + + @Test + public void testMaxMemoryExceededForFlush() + { + long allocSize = 2L * (1 << 20); + double fpChance = 0.01; + + long initialOOM = FilterFactory.metrics.oomErrors(); + MemoryLimiter memoryLimiter = new MemoryLimiter(1, "Allocating %s for bloom filter would reach max of %s (current %s)"); + try (IFilter filter = FilterFactory.getFilterForWrite(allocSize, fpChance, OperationType.FLUSH, memoryLimiter)) + { + assertSame(FilterFactory.AlwaysPresent, filter); + assertEquals(initialOOM + 1, FilterFactory.metrics.oomErrors()); + } + } + + @Test + public void testMaxMemoryExceededForFlushIgnoreMemoryLimit() + { + long allocSize = 2L * (1 << 20); + double fpChance = 0.01; + + long initialOOM = FilterFactory.metrics.oomErrors(); + MemoryLimiter memoryLimiter = new MemoryLimiter(1, "Allocating %s for bloom filter would reach max of %s (current %s)"); + System.setProperty(BloomFilter.IGNORE_MEMORY_LIMIT_ON_FLUSH_PROP, "true"); + try (IFilter filter = FilterFactory.getFilterForWrite(allocSize, fpChance, OperationType.FLUSH, memoryLimiter)) + { + assertNotSame(FilterFactory.AlwaysPresent, filter); + assertTrue(filter.offHeapSize() > 0); + assertEquals(initialOOM, FilterFactory.metrics.oomErrors()); + } + } + @Test public void testMaxMemoryExceededOnDeserialize() throws IOException { @@ -372,4 +430,4 @@ public void testBloomFilterSize() } } } -} \ No newline at end of file +} diff --git a/test/unit/org/apache/cassandra/utils/obs/OffHeapBitSetTest.java b/test/unit/org/apache/cassandra/utils/obs/OffHeapBitSetTest.java index 87dbd192f7bf..09c1594cf474 100644 --- a/test/unit/org/apache/cassandra/utils/obs/OffHeapBitSetTest.java +++ b/test/unit/org/apache/cassandra/utils/obs/OffHeapBitSetTest.java @@ -29,6 +29,8 @@ import org.apache.cassandra.io.util.DataOutputBuffer; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -47,7 +49,7 @@ static void compare(IBitSet bs, IBitSet newbs) private void testOffHeapSerialization(boolean oldBfFormat) throws Exception { - try (OffHeapBitSet bs = new OffHeapBitSet(100000, memoryLimiter)) + try (OffHeapBitSet bs = new OffHeapBitSet(100000, memoryLimiter, true)) { for (long i = 0; i < bs.capacity(); i++) if (random.nextBoolean()) @@ -78,7 +80,7 @@ public void testSerialization() throws Exception public void testBitSetGetClear() throws Exception { int size = Integer.MAX_VALUE / 4000; - try (OffHeapBitSet bs = new OffHeapBitSet(size, memoryLimiter)) + try (OffHeapBitSet bs = new OffHeapBitSet(size, memoryLimiter, true)) { List randomBits = Lists.newArrayList(); for (int i = 0; i < 10; i++) @@ -98,17 +100,38 @@ public void testBitSetGetClear() throws Exception } } + @Test + public void testDoNotFailOnExceedingLimit() throws MemoryLimiter.ReachedMemoryLimitException + { + MemoryLimiter memoryLimiter = new MemoryLimiter(1000L, "Allocating %s for bloom filter would reach max of %s (current %s)"); + long numBits = 10000000L; + try (OffHeapBitSet bs = new OffHeapBitSet(numBits, memoryLimiter, false)) + { + assertThat(bs).isNotNull(); + assertThat(memoryLimiter.memoryAllocated()).isGreaterThan(memoryLimiter.maxMemory); + } + } + + @Test + public void testFailOnExceedingLimit() + { + MemoryLimiter memoryLimiter = new MemoryLimiter(1000L, "Allocating %s for bloom filter would reach max of %s (current %s)"); + long numBits = 10000000L; + assertThatThrownBy(() -> new OffHeapBitSet(numBits, memoryLimiter, true)) + .isInstanceOf(MemoryLimiter.ReachedMemoryLimitException.class); + } + @Test(expected = UnsupportedOperationException.class) public void testUnsupportedLargeSize() throws Exception { long size = 64L * Integer.MAX_VALUE + 1; // Max size 16G * 8 bits - OffHeapBitSet bs = new OffHeapBitSet(size, memoryLimiter); + OffHeapBitSet bs = new OffHeapBitSet(size, memoryLimiter, true); } @Test public void testInvalidIndex() throws Exception { - OffHeapBitSet bs = new OffHeapBitSet(10, memoryLimiter); + OffHeapBitSet bs = new OffHeapBitSet(10, memoryLimiter, true); int invalidIdx[] = {-1, 64, 1000}; for (int i : invalidIdx)