diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/FDBRawRecord.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/FDBRawRecord.java
index 552aa4660f..a393d84bf0 100644
--- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/FDBRawRecord.java
+++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/FDBRawRecord.java
@@ -20,6 +20,7 @@
package com.apple.foundationdb.record.provider.foundationdb;
+import com.apple.foundationdb.annotation.API;
import com.apple.foundationdb.tuple.Tuple;
import javax.annotation.Nonnull;
@@ -33,7 +34,8 @@
* any splits have been removed), and its version. It also includes sizing information describing
* the record's on-disk footprint.
*/
-class FDBRawRecord implements FDBStoredSizes {
+@API(API.Status.INTERNAL)
+public class FDBRawRecord implements FDBStoredSizes {
@Nonnull private final Tuple primaryKey;
@Nonnull private final byte[] rawRecord;
@Nullable private final FDBRecordVersion version;
diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/SplitHelper.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/SplitHelper.java
index 19e7151c30..d4861f7e60 100644
--- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/SplitHelper.java
+++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/SplitHelper.java
@@ -33,6 +33,7 @@
import com.apple.foundationdb.record.FDBRecordStoreProperties;
import com.apple.foundationdb.record.RecordCoreArgumentException;
import com.apple.foundationdb.record.RecordCoreException;
+import com.apple.foundationdb.record.RecordCoreInternalException;
import com.apple.foundationdb.record.RecordCoreStorageException;
import com.apple.foundationdb.record.RecordCursor;
import com.apple.foundationdb.record.RecordCursorContinuation;
@@ -132,7 +133,11 @@ public static void saveWithSplit(@Nonnull final FDBRecordContext context, @Nonnu
.addLogInfo(LogMessageKeys.SUBSPACE, ByteArrayUtil2.loggable(subspace.pack()))
.addLogInfo(LogMessageKeys.VERSION, version);
}
- final Transaction tr = context.ensureActive();
+ boolean hasVersionInKey = key.hasIncompleteVersionstamp();
+ if ((version != null) && hasVersionInKey) {
+ // Cannot have versionStamps in BOTH key and value
+ throw new RecordCoreArgumentException("Cannot save versionStamp in both key and value");
+ }
if (serialized.length > SplitHelper.SPLIT_RECORD_SIZE) {
if (!splitLongRecords) {
throw new RecordCoreException("Record is too long to be stored in a single value; consider split_long_records")
@@ -140,9 +145,14 @@ public static void saveWithSplit(@Nonnull final FDBRecordContext context, @Nonnu
.addLogInfo(LogMessageKeys.SUBSPACE, ByteArrayUtil2.loggable(subspace.pack()))
.addLogInfo(LogMessageKeys.VALUE_SIZE, serialized.length);
}
- writeSplitRecord(context, subspace, key, serialized, clearBasedOnPreviousSizeInfo, previousSizeInfo, sizeInfo);
+ writeSplitRecord(context, subspace, key, serialized, hasVersionInKey, clearBasedOnPreviousSizeInfo, previousSizeInfo, sizeInfo);
} else {
- if (splitLongRecords || previousSizeInfo == null || previousSizeInfo.isVersionedInline()) {
+ // An incomplete version in the key means that we shouldn't delete previous k/v pairs using these keys since,
+ // in the DB, from previous transactions, they would have been completed the versions already (and so wouldn't match)
+ if (!hasVersionInKey && (splitLongRecords || previousSizeInfo == null || previousSizeInfo.isVersionedInline())) {
+ // Note that the clearPreviousSplitRecords also removes version splits from the context cache
+ // This is not currently supported for the case where we have versions in the keys since we can't trace the old values down
+ // Will be improved on in a separate PR
clearPreviousSplitRecord(context, subspace, key, clearBasedOnPreviousSizeInfo, previousSizeInfo);
}
final Tuple recordKey;
@@ -151,8 +161,7 @@ public static void saveWithSplit(@Nonnull final FDBRecordContext context, @Nonnu
} else {
recordKey = key;
}
- final byte[] keyBytes = subspace.pack(recordKey);
- tr.set(keyBytes, serialized);
+ final byte[] keyBytes = writeSplitValue(context, subspace, recordKey, serialized);
if (sizeInfo != null) {
sizeInfo.set(keyBytes, serialized);
sizeInfo.setSplit(false);
@@ -164,11 +173,14 @@ public static void saveWithSplit(@Nonnull final FDBRecordContext context, @Nonnu
@SuppressWarnings("PMD.CloseResource")
private static void writeSplitRecord(@Nonnull final FDBRecordContext context, @Nonnull final Subspace subspace,
@Nonnull final Tuple key, @Nonnull final byte[] serialized,
+ boolean hasVersionInKey,
final boolean clearBasedOnPreviousSizeInfo, @Nullable final FDBStoredSizes previousSizeInfo,
@Nullable SizeInfo sizeInfo) {
- final Transaction tr = context.ensureActive();
- final Subspace keySplitSubspace = subspace.subspace(key);
- clearPreviousSplitRecord(context, subspace, key, clearBasedOnPreviousSizeInfo, previousSizeInfo);
+ if (!hasVersionInKey) {
+ // Note that the clearPreviousSplitRecords also removes version splits from the context cache
+ // This is not currently supported for the case where we have versions in the keys since we can't trace the old values down
+ clearPreviousSplitRecord(context, subspace, key, clearBasedOnPreviousSizeInfo, previousSizeInfo);
+ }
long index = SplitHelper.START_SPLIT_RECORD;
int offset = 0;
while (offset < serialized.length) {
@@ -176,9 +188,8 @@ private static void writeSplitRecord(@Nonnull final FDBRecordContext context, @N
if (nextOffset > serialized.length) {
nextOffset = serialized.length;
}
- final byte[] keyBytes = keySplitSubspace.pack(index);
final byte[] valueBytes = Arrays.copyOfRange(serialized, offset, nextOffset);
- tr.set(keyBytes, valueBytes);
+ final byte[] keyBytes = writeSplitValue(context, subspace, key.add(index), valueBytes);
if (sizeInfo != null) {
if (offset == 0) {
sizeInfo.set(keyBytes, valueBytes);
@@ -192,6 +203,28 @@ private static void writeSplitRecord(@Nonnull final FDBRecordContext context, @N
}
}
+ private static byte[] writeSplitValue(FDBRecordContext context, Subspace subspace, Tuple recordKey, byte[] serialized) {
+ byte[] keyBytes;
+ if (recordKey.hasIncompleteVersionstamp()) {
+ keyBytes = subspace.packWithVersionstamp(recordKey);
+ byte[] current = context.addVersionMutation(
+ MutationType.SET_VERSIONSTAMPED_KEY,
+ keyBytes,
+ serialized);
+ if (current != null) {
+ // This should never happen. It means that the same key (and suffix) and local version were used for subsequent
+ // write. It is most likely not an intended flow and this check will protect against that.
+ // It is an incomplete check since the same record can have different suffixes to the primary key that would not collide.
+ // Namely, if one is split, and one is not split they will not overlap. Anything else and the would.
+ throw new RecordCoreInternalException("Key with version overwritten");
+ }
+ } else {
+ keyBytes = subspace.pack(recordKey);
+ context.ensureActive().set(keyBytes, serialized);
+ }
+ return keyBytes;
+ }
+
@SuppressWarnings("PMD.CloseResource")
private static void writeVersion(@Nonnull final FDBRecordContext context, @Nonnull final Subspace subspace, @Nonnull final Tuple key,
@Nullable final FDBRecordVersion version, @Nullable final SizeInfo sizeInfo) {
@@ -201,11 +234,11 @@ private static void writeVersion(@Nonnull final FDBRecordContext context, @Nonnu
}
return;
}
- final Transaction tr = context.ensureActive();
+ // At this point we know the key does not have a version
final byte[] keyBytes = subspace.pack(key.add(RECORD_VERSION));
final byte[] valueBytes = packVersion(version);
if (version.isComplete()) {
- tr.set(keyBytes, valueBytes);
+ context.ensureActive().set(keyBytes, valueBytes);
} else {
context.addVersionMutation(MutationType.SET_VERSIONSTAMPED_VALUE, keyBytes, valueBytes);
context.addToLocalVersionCache(keyBytes, version.getLocalVersion());
diff --git a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/SplitHelperMultipleTransactionsTest.java b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/SplitHelperMultipleTransactionsTest.java
new file mode 100644
index 0000000000..e45d80d680
--- /dev/null
+++ b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/SplitHelperMultipleTransactionsTest.java
@@ -0,0 +1,1186 @@
+/*
+ * SplitHelperMultipleTransactionsTest.java
+ *
+ * This source file is part of the FoundationDB open source project
+ *
+ * Copyright 2015-2026 Apple Inc. and the FoundationDB project authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apple.foundationdb.record.provider.foundationdb;
+
+import com.apple.foundationdb.KeyValue;
+import com.apple.foundationdb.MutationType;
+import com.apple.foundationdb.ReadTransaction;
+import com.apple.foundationdb.record.ExecuteProperties;
+import com.apple.foundationdb.record.FDBRecordStoreProperties;
+import com.apple.foundationdb.record.RecordCoreArgumentException;
+import com.apple.foundationdb.record.RecordCoreException;
+import com.apple.foundationdb.record.RecordCoreInternalException;
+import com.apple.foundationdb.record.RecordCursor;
+import com.apple.foundationdb.record.RecordCursorIterator;
+import com.apple.foundationdb.record.RecordCursorResult;
+import com.apple.foundationdb.record.ScanProperties;
+import com.apple.foundationdb.record.TupleRange;
+import com.apple.foundationdb.record.provider.foundationdb.properties.RecordLayerPropertyStorage;
+import com.apple.foundationdb.subspace.Subspace;
+import com.apple.foundationdb.tuple.ByteArrayUtil;
+import com.apple.foundationdb.tuple.Tuple;
+import com.apple.foundationdb.tuple.Versionstamp;
+import com.apple.test.BooleanSource;
+import com.apple.test.ParameterizedTestUtils;
+import com.apple.test.Tags;
+import com.google.common.collect.Lists;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.LongStream;
+import java.util.stream.Stream;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.anyOf;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.hamcrest.Matchers.oneOf;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * A test that uses the same test cases as in {@link SplitHelperTest}, with the added behavior of multiple transactions.
+ * Tests here are named similarly to the ones in {@link SplitHelperTest} for tracability.
+ * suffix, so that they can be traced back to their origin.
+ * For tests that require the "versionInKey" to be TRUE, this is the only way to get them to run, as the
+ * verification of the key content can only be done after a commit.
+ * The pattern used in this test normally splits each test flow in two:
+ *
+ * - Save record and record global and local version
+ * - Verify the saved content using the recorded version
+ *
+ * In addition to testing useVersionInKey
this way, other configurations are also tested
+ * to extend coverage beyond the single-transaction tests.
+ * -
+ */
+@Tag(Tags.RequiresFDB)
+public class SplitHelperMultipleTransactionsTest extends FDBRecordStoreTestBase {
+
+ // From the traditional nursery rhyme
+ private static final byte[] HUMPTY_DUMPTY =
+ ("Humpty Dumpty sat on a wall,\n"
+ + "Humpty Dumpty had a great fall\n"
+ + "All the king's horses and all the king's men\n"
+ + "Couldn't put Humpty Dumpty together again.\n").getBytes(StandardCharsets.UTF_8);
+
+ private static final int MEDIUM_COPIES = 5;
+ private static final int MEDIUM_LEGNTH = HUMPTY_DUMPTY.length * MEDIUM_COPIES;
+ private static final int LONG_LENGTH = HUMPTY_DUMPTY.length * 1_000; // requires 1 split
+ private static final int VERY_LONG_LENGTH = HUMPTY_DUMPTY.length * 2_000; // requires 2 splits
+ private static final byte[] SHORT_STRING = HUMPTY_DUMPTY;
+ private static final byte[] MEDIUM_STRING;
+ private static final byte[] LONG_STRING;
+ private static final byte[] VERY_LONG_STRING;
+
+ private Subspace subspace;
+ private SplitHelperTestConfig testConfig = SplitHelperTestConfig.getDefault();
+
+ static {
+ ByteBuffer mediumBuffer = ByteBuffer.allocate(MEDIUM_LEGNTH);
+ for (int i = 0; i < MEDIUM_COPIES; i++) {
+ mediumBuffer.put(HUMPTY_DUMPTY);
+ }
+ MEDIUM_STRING = mediumBuffer.array();
+ ByteBuffer longBuffer = ByteBuffer.allocate(LONG_LENGTH);
+ while (longBuffer.position() < LONG_LENGTH - HUMPTY_DUMPTY.length) {
+ longBuffer.put(HUMPTY_DUMPTY);
+ }
+ LONG_STRING = longBuffer.array();
+ ByteBuffer veryLongBuffer = ByteBuffer.allocate(VERY_LONG_LENGTH);
+ while (veryLongBuffer.position() < VERY_LONG_LENGTH - HUMPTY_DUMPTY.length) {
+ veryLongBuffer.put(HUMPTY_DUMPTY);
+ }
+ VERY_LONG_STRING = veryLongBuffer.array();
+ }
+
+ @BeforeEach
+ public void setSubspace() {
+ try (FDBRecordContext context = openContext()) {
+ subspace = path.toSubspace(context);
+ }
+ }
+
+ public static Stream testConfigsNoDryRun() {
+ return SplitHelperTestConfig.getConfigsNoDryRun().map(Arguments::of);
+ }
+
+ @Override
+ protected RecordLayerPropertyStorage.Builder addDefaultProps(final RecordLayerPropertyStorage.Builder props) {
+ return testConfig.setProps(super.addDefaultProps(props));
+ }
+
+ private SplitHelper.SizeInfo saveUnsuccessfully(@Nonnull FDBRecordContext context, @Nonnull Tuple key, byte[] serialized,
+ @Nullable FDBRecordVersion version,
+ @Nonnull SplitHelperTestConfig testConfig,
+ @Nullable FDBStoredSizes previousSizeInfo,
+ @Nonnull Class errClazz, @Nonnull String errMessage) {
+ final SplitHelper.SizeInfo sizeInfo = new SplitHelper.SizeInfo();
+ E e = assertThrows(errClazz,
+ () -> SplitHelper.saveWithSplit(context, subspace, key, serialized, version, testConfig.splitLongRecords, testConfig.omitUnsplitSuffix,
+ previousSizeInfo != null, previousSizeInfo, sizeInfo));
+ assertThat(e.getMessage(), containsString(errMessage));
+
+ assertEquals(0, sizeInfo.getKeyCount());
+ assertEquals(0, sizeInfo.getKeySize());
+ assertEquals(0, sizeInfo.getValueSize());
+ assertThat(sizeInfo.isVersionedInline(), is(false));
+
+ assertEquals(0, previousSizeInfo == null ? 0 : previousSizeInfo.getKeyCount());
+
+ return sizeInfo;
+ }
+
+ /**
+ * This represents the first part of {@link SplitHelperTest#saveSuccessfully}.
+ * The other part is located in {@link #verifySuccessfullySaved(FDBRecordContext, Tuple, byte[], FDBRecordVersion, SplitHelperTestConfig)}.
+ */
+ private SplitHelper.SizeInfo saveOnly(@Nonnull FDBRecordContext context, @Nonnull Tuple key, byte[] serialized,
+ @Nullable FDBRecordVersion version,
+ @Nonnull SplitHelperTestConfig testConfig,
+ @Nullable FDBStoredSizes previousSizeInfo) {
+ final SplitHelper.SizeInfo sizeInfo = new SplitHelper.SizeInfo();
+ SplitHelper.saveWithSplit(context, subspace, key, serialized, version, testConfig.splitLongRecords, testConfig.omitUnsplitSuffix,
+ previousSizeInfo != null, previousSizeInfo, sizeInfo);
+ int dataKeyCount = (serialized.length - 1) / SplitHelper.SPLIT_RECORD_SIZE + 1;
+ boolean isSplit = dataKeyCount > 1;
+ int keyCount = dataKeyCount;
+ if (version != null) {
+ keyCount += 1;
+ }
+ assertEquals(isSplit, sizeInfo.isSplit());
+ assertEquals(keyCount, sizeInfo.getKeyCount());
+ int valueSize = serialized.length + (version != null ? 1 + FDBRecordVersion.VERSION_LENGTH : 0);
+ assertEquals(valueSize, sizeInfo.getValueSize());
+ assertEquals(version != null, sizeInfo.isVersionedInline());
+ if (!testConfig.useVersionInKey) {
+ // Key size can only be asserted when the key is fully known at write time (not the case for versioning keys,
+ // since the versionstamp in the key is not yet resolved until after commit)
+ int keySize = (subspace.pack().length + key.pack().length) * keyCount;
+ if (testConfig.hasSplitPoints()) {
+ // Add in the counters the split points.
+ if (!isSplit) {
+ keySize += 1; // As 0 requires 1 byte when Tuple packed
+ } else {
+ keySize += dataKeyCount * 2; // As each split point is two bytes when tuple packed
+ }
+ }
+ if (version != null) {
+ keySize += 2;
+ }
+ assertEquals(keySize, sizeInfo.getKeySize());
+ }
+ return sizeInfo;
+ }
+
+ /**
+ * This represents the second part of {@link SplitHelperTest#saveSuccessfully}.
+ */
+ private void verifySuccessfullySaved(@Nonnull FDBRecordContext context, @Nonnull Tuple key, byte[] serialized,
+ @Nullable FDBRecordVersion version,
+ @Nonnull SplitHelperTestConfig testConfig) {
+ // Match the logic of "saveWithSplit" and do nothing here if it called "saveUnsuccessfully"
+ // since in these cases no record was saved
+ if (testConfig.omitUnsplitSuffix && version != null) {
+ // cannot include version
+ return;
+ } else if (testConfig.useVersionInKey && (version != null)) {
+ // cannot use version in both key and value
+ return;
+ } else if (!testConfig.splitLongRecords && serialized.length > SplitHelper.SPLIT_RECORD_SIZE) {
+ // Record is too long
+ return;
+ }
+
+ // Similar to the calculation in saveOnly
+ int dataKeyCount = (serialized.length - 1) / SplitHelper.SPLIT_RECORD_SIZE + 1;
+ boolean isSplit = dataKeyCount > 1;
+ int keyCount = dataKeyCount;
+ if (version != null) {
+ keyCount += 1;
+ }
+ final Subspace keySubspace = subspace.subspace(key);
+ RecordCursorIterator kvCursor = KeyValueCursor.Builder.withSubspace(keySubspace)
+ .setContext(context)
+ .setScanProperties(ScanProperties.FORWARD_SCAN)
+ .build()
+ .asIterator();
+ List indexes = new ArrayList<>(keyCount);
+ byte[] versionBytes = null;
+ byte[] valueBytes = null;
+ while (kvCursor.hasNext()) {
+ KeyValue kv = kvCursor.next();
+ Tuple suffix = keySubspace.unpack(kv.getKey());
+ if (testConfig.omitUnsplitSuffix) {
+ assertThat(suffix.isEmpty(), is(true));
+ valueBytes = kv.getValue();
+ } else {
+ Long index = suffix.getLong(0);
+ indexes.add(index);
+ if (index == SplitHelper.RECORD_VERSION) {
+ versionBytes = kv.getValue();
+ } else {
+ if (valueBytes == null) {
+ valueBytes = kv.getValue();
+ } else {
+ valueBytes = ByteArrayUtil.join(valueBytes, kv.getValue());
+ }
+ }
+ }
+ }
+ List expectedIndexes;
+ if (testConfig.omitUnsplitSuffix) {
+ expectedIndexes = Collections.emptyList();
+ } else {
+ expectedIndexes = new ArrayList<>(keyCount);
+ if (version != null && version.isComplete()) {
+ expectedIndexes.add(SplitHelper.RECORD_VERSION);
+ }
+ if (!isSplit) {
+ expectedIndexes.add(SplitHelper.UNSPLIT_RECORD);
+ } else {
+ LongStream.range(SplitHelper.START_SPLIT_RECORD, SplitHelper.START_SPLIT_RECORD + dataKeyCount)
+ .forEach(expectedIndexes::add);
+ }
+ }
+ assertEquals(expectedIndexes, indexes);
+
+ assertNotNull(valueBytes);
+ assertArrayEquals(serialized, valueBytes);
+
+ if (version != null) {
+ if (!version.isComplete()) {
+ assertNull(versionBytes);
+ } else {
+ assertNotNull(versionBytes);
+ assertEquals(version, FDBRecordVersion.fromVersionstamp(Tuple.fromBytes(versionBytes).getVersionstamp(0)));
+ }
+ } else {
+ assertNull(versionBytes);
+ }
+ }
+
+ private Tuple toIncompleteKey(Tuple key, int localVersion, boolean versionInKey) {
+ if (versionInKey) {
+ return Tuple.from(Versionstamp.incomplete(localVersion)).addAll(key);
+ } else {
+ return key;
+ }
+ }
+
+ private Tuple toCompleteKey(Tuple key, byte[] versionStamp, int localVersion, boolean versionInKey) {
+ if (versionInKey) {
+ return Tuple.from(Versionstamp.complete(versionStamp, localVersion)).addAll(key);
+ } else {
+ return key;
+ }
+ }
+
+ private SplitHelper.SizeInfo saveWithSplit(@Nonnull FDBRecordContext context, @Nonnull Tuple key,
+ byte[] serialized,
+ @Nullable FDBRecordVersion version,
+ @Nonnull SplitHelperTestConfig testConfig,
+ @Nullable FDBStoredSizes previousSizeInfo) {
+ if (testConfig.omitUnsplitSuffix && version != null) {
+ return saveUnsuccessfully(context, key, serialized, version, testConfig, previousSizeInfo,
+ RecordCoreArgumentException.class, "Cannot include version");
+ } else if (testConfig.useVersionInKey && version != null) {
+ return saveUnsuccessfully(context, key, serialized, version, testConfig, previousSizeInfo,
+ RecordCoreException.class, "Cannot save versionStamp in both key and value");
+ } else if (!testConfig.splitLongRecords && serialized.length > SplitHelper.SPLIT_RECORD_SIZE) {
+ return saveUnsuccessfully(context, key, serialized, version, testConfig, previousSizeInfo,
+ RecordCoreException.class, "Record is too long");
+ } else {
+ return saveOnly(context, key, serialized, version, testConfig, previousSizeInfo);
+ }
+ }
+
+ @MethodSource("testConfigsNoDryRun")
+ @ParameterizedTest(name = "saveWithSplit[{0}]")
+ void saveWithSplit(@Nonnull SplitHelperTestConfig testConfig) {
+ this.testConfig = testConfig;
+
+ final Tuple key1 = Tuple.from(1066L);
+ final Tuple key2 = Tuple.from(1415L);
+ final Tuple key3 = Tuple.from(1776L);
+ final int localVersion1;
+ final int localVersion2;
+ final int localVersion3;
+ final SplitHelper.SizeInfo sizes1;
+ final SplitHelper.SizeInfo sizes2;
+ final SplitHelper.SizeInfo sizes3;
+ byte[] globalVersionstamp;
+ // Transaction #1: save some values
+ try (FDBRecordContext context = openContext()) {
+ // save with no version and no previousSizeInfo
+ localVersion1 = context.claimLocalVersion();
+ sizes1 = saveWithSplit(context, toIncompleteKey(key1, localVersion1, testConfig.useVersionInKey), SHORT_STRING, null, testConfig, null);
+ localVersion2 = context.claimLocalVersion();
+ sizes2 = saveWithSplit(context, toIncompleteKey(key2, localVersion2, testConfig.useVersionInKey), LONG_STRING, null, testConfig, null);
+ localVersion3 = context.claimLocalVersion();
+ sizes3 = saveWithSplit(context, toIncompleteKey(key3, localVersion3, testConfig.useVersionInKey), VERY_LONG_STRING, null, testConfig, null);
+
+ commit(context);
+ globalVersionstamp = context.getVersionStamp();
+ }
+
+ // Transaction #2: verify saved values
+ try (FDBRecordContext context = openContext()) {
+ verifySuccessfullySaved(context, toCompleteKey(key1, globalVersionstamp, localVersion1, testConfig.useVersionInKey), SHORT_STRING, null, testConfig);
+ verifySuccessfullySaved(context, toCompleteKey(key2, globalVersionstamp, localVersion2, testConfig.useVersionInKey), LONG_STRING, null, testConfig);
+ verifySuccessfullySaved(context, toCompleteKey(key3, globalVersionstamp, localVersion3, testConfig.useVersionInKey), VERY_LONG_STRING, null, testConfig);
+ }
+
+ int localVersion4 = 0;
+ int localVersion5 = 0;
+ int localVersion6 = 0;
+ // Transaction #3: modify saved values
+ // When there is a version in the key we do not override the previous records, so there is nothing to do here
+ if (!testConfig.useVersionInKey) {
+ try (FDBRecordContext context = openContext()) {
+ // Save over some things using the previous split points
+ if (testConfig.splitLongRecords) {
+ localVersion4 = context.claimLocalVersion();
+ saveWithSplit(context, toIncompleteKey(key1, localVersion4, testConfig.useVersionInKey), VERY_LONG_STRING, null, testConfig, sizes1);
+ localVersion5 = context.claimLocalVersion();
+ saveWithSplit(context, toIncompleteKey(key3, localVersion5, testConfig.useVersionInKey), LONG_STRING, null, testConfig, sizes3);
+ }
+ localVersion6 = context.claimLocalVersion();
+ saveWithSplit(context, toIncompleteKey(key2, localVersion6, testConfig.useVersionInKey), SHORT_STRING, null, testConfig, sizes2);
+ commit(context);
+ globalVersionstamp = context.getVersionStamp();
+ }
+ // Transaction #4: verify changes
+ try (FDBRecordContext context = openContext()) {
+ if (testConfig.splitLongRecords) {
+ verifySuccessfullySaved(context, toCompleteKey(key1, globalVersionstamp, localVersion4, testConfig.useVersionInKey), VERY_LONG_STRING, null, testConfig);
+ verifySuccessfullySaved(context, toCompleteKey(key3, globalVersionstamp, localVersion5, testConfig.useVersionInKey), LONG_STRING, null, testConfig);
+ }
+ verifySuccessfullySaved(context, toCompleteKey(key2, globalVersionstamp, localVersion6, testConfig.useVersionInKey), SHORT_STRING, null, testConfig);
+ }
+ }
+ }
+
+ @MethodSource("testConfigsNoDryRun")
+ @ParameterizedTest(name = "saveWithSplitAndCompleteVersion[{0}]")
+ public void saveWithSplitAndCompleteVersions(SplitHelperTestConfig testConfig) {
+ this.testConfig = testConfig;
+ byte[] globalValueVersion = "karlgrosse".getBytes(StandardCharsets.US_ASCII);
+ byte[] globalKeyVersion;
+ final Tuple key1 = Tuple.from(800L);
+ final Tuple key2 = Tuple.from(813L);
+ final Tuple key3 = Tuple.from(823L);
+ int localVersion1;
+ int localVersion2;
+ int localVersion3;
+
+ // save records with version in the value
+ saveInitialRecords(testConfig, globalValueVersion, key1, key2, key3, testConfig.useVersionInKey);
+
+ // Save over the records *without* using the previous size info and with no version in the value
+ try (FDBRecordContext context = openContext()) {
+ localVersion1 = context.claimLocalVersion();
+ saveWithSplit(context, toIncompleteKey(key1, localVersion1, testConfig.useVersionInKey), SHORT_STRING, null, testConfig, null);
+ localVersion2 = context.claimLocalVersion();
+ saveWithSplit(context, toIncompleteKey(key2, localVersion2, testConfig.useVersionInKey), LONG_STRING, null, testConfig, null);
+ localVersion3 = context.claimLocalVersion();
+ saveWithSplit(context, toIncompleteKey(key3, localVersion3, testConfig.useVersionInKey), VERY_LONG_STRING, null, testConfig, null);
+ commit(context);
+ globalKeyVersion = context.getVersionStamp();
+ }
+ try (FDBRecordContext context = openContext()) {
+ verifySuccessfullySaved(context, toCompleteKey(key1, globalKeyVersion, localVersion1, testConfig.useVersionInKey), SHORT_STRING, null, testConfig);
+ verifySuccessfullySaved(context, toCompleteKey(key2, globalKeyVersion, localVersion2, testConfig.useVersionInKey), LONG_STRING, null, testConfig);
+ verifySuccessfullySaved(context, toCompleteKey(key3, globalKeyVersion, localVersion3, testConfig.useVersionInKey), VERY_LONG_STRING, null, testConfig);
+ commit(context);
+ }
+
+ FDBStoredSizes sizes4;
+ FDBStoredSizes sizes5;
+ FDBStoredSizes sizes6;
+ FDBRecordVersion version4;
+ FDBRecordVersion version5;
+ FDBRecordVersion version6;
+ // save again with complete version in key and no previous size info
+ try (FDBRecordContext context = openContext()) {
+ localVersion1 = context.claimLocalVersion();
+ version4 = FDBRecordVersion.complete(globalValueVersion, context.claimLocalVersion());
+ sizes4 = saveWithSplit(context, toIncompleteKey(key1, localVersion1, testConfig.useVersionInKey), SHORT_STRING, version4, testConfig, null);
+ localVersion2 = context.claimLocalVersion();
+ version5 = FDBRecordVersion.complete(globalValueVersion, context.claimLocalVersion());
+ sizes5 = saveWithSplit(context, toIncompleteKey(key2, localVersion2, testConfig.useVersionInKey), LONG_STRING, version5, testConfig, null);
+ localVersion3 = context.claimLocalVersion();
+ version6 = FDBRecordVersion.complete(globalValueVersion, context.claimLocalVersion());
+ sizes6 = saveWithSplit(context, toIncompleteKey(key3, localVersion3, testConfig.useVersionInKey), VERY_LONG_STRING, version6, testConfig, null);
+ commit(context);
+ globalKeyVersion = context.getVersionStamp();
+ }
+ // in some cases nothing gets written (all saves are "saveUnsuccessfully) so the transaction is read-only
+ // and there is no global version
+ if (globalKeyVersion != null) {
+ try (FDBRecordContext context = openContext()) {
+ verifySuccessfullySaved(context, toCompleteKey(key1, globalKeyVersion, localVersion1, testConfig.useVersionInKey), SHORT_STRING, version4, testConfig);
+ verifySuccessfullySaved(context, toCompleteKey(key2, globalKeyVersion, localVersion2, testConfig.useVersionInKey), LONG_STRING, version5, testConfig);
+ verifySuccessfullySaved(context, toCompleteKey(key3, globalKeyVersion, localVersion3, testConfig.useVersionInKey), VERY_LONG_STRING, version6, testConfig);
+ commit(context);
+ }
+ }
+
+ // Save over the records *with* using the previous size info
+ try (FDBRecordContext context = openContext()) {
+ localVersion1 = context.claimLocalVersion();
+ saveWithSplit(context, toIncompleteKey(key1, localVersion1, testConfig.useVersionInKey), SHORT_STRING, null, testConfig, sizes4);
+ localVersion2 = context.claimLocalVersion();
+ saveWithSplit(context, toIncompleteKey(key2, localVersion2, testConfig.useVersionInKey), LONG_STRING, null, testConfig, sizes5);
+ localVersion3 = context.claimLocalVersion();
+ saveWithSplit(context, toIncompleteKey(key3, localVersion3, testConfig.useVersionInKey), VERY_LONG_STRING, null, testConfig, sizes6);
+ commit(context);
+ globalKeyVersion = context.getVersionStamp();
+ }
+ try (FDBRecordContext context = openContext()) {
+ verifySuccessfullySaved(context, toCompleteKey(key1, globalKeyVersion, localVersion1, testConfig.useVersionInKey), SHORT_STRING, null, testConfig);
+ verifySuccessfullySaved(context, toCompleteKey(key2, globalKeyVersion, localVersion2, testConfig.useVersionInKey), LONG_STRING, null, testConfig);
+ verifySuccessfullySaved(context, toCompleteKey(key3, globalKeyVersion, localVersion3, testConfig.useVersionInKey), VERY_LONG_STRING, null, testConfig);
+ commit(context);
+ }
+ }
+
+ private void saveInitialRecords(final SplitHelperTestConfig testConfig, final byte[] globalValueVersion, final Tuple key1, final Tuple key2, final Tuple key3, final boolean versionInKey) {
+ byte[] globalKeyVersion;
+ int localVersion1;
+ int localVersion2;
+ int localVersion3;
+ FDBRecordVersion version1;
+ FDBRecordVersion version2;
+ FDBRecordVersion version3;
+
+ // Save records with completed version in the value
+ try (FDBRecordContext context = openContext()) {
+ version1 = FDBRecordVersion.complete(globalValueVersion, context.claimLocalVersion());
+ localVersion1 = context.claimLocalVersion();
+ saveWithSplit(context, toIncompleteKey(key1, localVersion1, versionInKey), SHORT_STRING, version1, testConfig, null);
+ localVersion2 = context.claimLocalVersion();
+ version2 = FDBRecordVersion.complete(globalValueVersion, context.claimLocalVersion());
+ saveWithSplit(context, toIncompleteKey(key2, localVersion2, versionInKey), LONG_STRING, version2, testConfig, null);
+ localVersion3 = context.claimLocalVersion();
+ version3 = FDBRecordVersion.complete(globalValueVersion, context.claimLocalVersion());
+ saveWithSplit(context, toIncompleteKey(key3, localVersion3, versionInKey), VERY_LONG_STRING, version3, testConfig, null);
+ commit(context);
+ globalKeyVersion = context.getVersionStamp();
+ }
+ // in some cases nothing gets written (all saves are "saveUnsuccessfully") so the transaction is read-only
+ // and there is no global version
+ if (globalKeyVersion != null) {
+ try (FDBRecordContext context = openContext()) {
+ verifySuccessfullySaved(context, toCompleteKey(key1, globalKeyVersion, localVersion1, testConfig.useVersionInKey), SHORT_STRING, version1, testConfig);
+ verifySuccessfullySaved(context, toCompleteKey(key2, globalKeyVersion, localVersion2, testConfig.useVersionInKey), LONG_STRING, version2, testConfig);
+ verifySuccessfullySaved(context, toCompleteKey(key3, globalKeyVersion, localVersion3, testConfig.useVersionInKey), VERY_LONG_STRING, version3, testConfig);
+ }
+ }
+ }
+
+ @Nonnull
+ private FDBStoredSizes writeDummyRecord(@Nonnull FDBRecordContext context, @Nonnull Tuple key,
+ @Nullable FDBRecordVersion version, int splits,
+ boolean omitUnsplitSuffix, boolean useVersionInKey, int localVersion) {
+ SplitHelper.SizeInfo sizeInfo = new SplitHelper.SizeInfo();
+ if (version != null) {
+ assertThat(omitUnsplitSuffix, is(false));
+ assertThat(useVersionInKey, is(false));
+ sizeInfo.setVersionedInline(true);
+ Tuple keyTuple = key.add(SplitHelper.RECORD_VERSION);
+ byte[] valueBytes = SplitHelper.packVersion(version);
+ // Note that this will not mutate the version in the value
+ writeDummyKV(context, keyTuple, valueBytes, sizeInfo, useVersionInKey, localVersion);
+ }
+ if (splits == 1) {
+ if (omitUnsplitSuffix) {
+ Tuple keyTuple = key;
+ byte[] valueBytes = SHORT_STRING;
+ writeDummyKV(context, keyTuple, valueBytes, sizeInfo, useVersionInKey, localVersion);
+ } else {
+ Tuple keyTuple = key.add(SplitHelper.UNSPLIT_RECORD);
+ byte[] valueBytes = SHORT_STRING;
+ writeDummyKV(context, keyTuple, valueBytes, sizeInfo, useVersionInKey, localVersion);
+ }
+ sizeInfo.setSplit(false);
+ } else {
+ for (int i = 0; i < splits; i++) {
+ Tuple keyTuple = key.add(SplitHelper.START_SPLIT_RECORD + i);
+ byte[] valueBytes = SHORT_STRING;
+ writeDummyKV(context, keyTuple, valueBytes, sizeInfo, useVersionInKey, localVersion);
+ }
+ sizeInfo.setSplit(true);
+ }
+ return sizeInfo;
+ }
+
+ private void writeDummyKV(@Nonnull FDBRecordContext context, @Nonnull Tuple keyTuple,
+ byte[] valueBytes, @Nullable SplitHelper.SizeInfo sizeInfo, boolean useVersionInKey, int localVersion) {
+
+ byte[] keyBytes;
+ // Mimic the work done in SplitHelper.writeSplitValue
+ if (useVersionInKey) {
+ Tuple versionedKeyTuple = Tuple.from(Versionstamp.incomplete(localVersion)).addAll(keyTuple);
+ keyBytes = subspace.packWithVersionstamp(versionedKeyTuple);
+ context.addVersionMutation(MutationType.SET_VERSIONSTAMPED_KEY, keyBytes, valueBytes);
+ } else {
+ keyBytes = subspace.pack(keyTuple);
+ context.ensureActive().set(keyBytes, valueBytes);
+ }
+ if (sizeInfo != null) {
+ sizeInfo.add(keyBytes, valueBytes);
+ }
+ }
+
+ private void deleteSplit(@Nonnull FDBRecordContext context, @Nonnull Tuple key,
+ @Nonnull SplitHelperTestConfig testConfig,
+ @Nullable FDBStoredSizes sizeInfo) {
+ int count = KeyValueCursor.Builder.withSubspace(subspace.subspace(key))
+ .setContext(context)
+ .setScanProperties(ScanProperties.FORWARD_SCAN)
+ .build()
+ .getCount()
+ .join();
+ assertTrue(count > 0);
+
+ SplitHelper.deleteSplit(context, subspace, key, testConfig.splitLongRecords, testConfig.omitUnsplitSuffix, sizeInfo != null, sizeInfo);
+
+ count = KeyValueCursor.Builder.withSubspace(subspace.subspace(key))
+ .setContext(context)
+ .setScanProperties(ScanProperties.FORWARD_SCAN)
+ .build()
+ .getCount()
+ .join();
+ assertEquals(0, count);
+ }
+
+ @MethodSource("testConfigsNoDryRun")
+ @ParameterizedTest(name = "deleteWithSplit[{0}]")
+ public void deleteWithSplit(@Nonnull SplitHelperTestConfig testConfig) {
+ this.testConfig = testConfig;
+
+ final Tuple key1 = Tuple.from(-660L);
+ final Tuple key2 = Tuple.from(-581L);
+ final Tuple key3 = Tuple.from(-549L);
+ final Tuple key4 = Tuple.from(-510L);
+
+ // tx1: write records
+ int localVersion1;
+ int localVersion2;
+ int localVersion3 = 0;
+ int localVersion4 = 0;
+ final FDBStoredSizes sizes1;
+ final FDBStoredSizes sizes2;
+ FDBStoredSizes sizes3 = null;
+ FDBStoredSizes sizes4 = null;
+ byte[] globalVersionStamp;
+ try (FDBRecordContext context = openContext()) {
+ localVersion1 = context.claimLocalVersion();
+ sizes1 = writeDummyRecord(context, key1, null, 1, testConfig.omitUnsplitSuffix, testConfig.useVersionInKey, localVersion1);
+ localVersion2 = context.claimLocalVersion();
+ sizes2 = writeDummyRecord(context, key2, null, 1, testConfig.omitUnsplitSuffix, testConfig.useVersionInKey, localVersion2);
+ if (testConfig.splitLongRecords) {
+ localVersion3 = context.claimLocalVersion();
+ sizes3 = writeDummyRecord(context, key3, null, 5, testConfig.omitUnsplitSuffix, testConfig.useVersionInKey, localVersion3);
+ localVersion4 = context.claimLocalVersion();
+ sizes4 = writeDummyRecord(context, key4, null, 5, testConfig.omitUnsplitSuffix, testConfig.useVersionInKey, localVersion4);
+ }
+ commit(context);
+ globalVersionStamp = context.getVersionStamp();
+ }
+
+ // tx2: delete records
+ try (FDBRecordContext context = openContext()) {
+ deleteSplit(context, toCompleteKey(key1, globalVersionStamp, localVersion1, testConfig.useVersionInKey), testConfig, sizes1);
+ deleteSplit(context, toCompleteKey(key2, globalVersionStamp, localVersion2, testConfig.useVersionInKey), testConfig, sizes2);
+ if (testConfig.splitLongRecords) {
+ deleteSplit(context, toCompleteKey(key3, globalVersionStamp, localVersion3, testConfig.useVersionInKey), testConfig, sizes3);
+ deleteSplit(context, toCompleteKey(key4, globalVersionStamp, localVersion4, testConfig.useVersionInKey), testConfig, sizes4);
+ }
+ commit(context);
+ }
+ }
+
+ @FunctionalInterface
+ private interface LoadRecordFunction {
+ FDBRawRecord load(@Nonnull FDBRecordContext context, @Nonnull Tuple key, @Nullable FDBStoredSizes sizes, @Nullable byte[] expectedContents, @Nullable FDBRecordVersion version);
+ }
+
+ private void loadSingleRecords(SplitHelperTestConfig testConfig, @Nonnull LoadRecordFunction loadRecordFunction) {
+ final Tuple key1 = Tuple.from(1042L);
+ final Tuple key2 = Tuple.from(1066L);
+ final Tuple key3 = Tuple.from(1087L);
+ final Tuple key4 = Tuple.from(1100L);
+ final Tuple key5 = Tuple.from(1135L);
+ final Tuple key6 = Tuple.from(1189L);
+ final Tuple key7 = Tuple.from(1199L);
+ final Tuple key8 = Tuple.from(1216L);
+ final Tuple key9 = Tuple.from(1272L);
+ int localVersion2;
+ int localVersion3 = 0;
+ int localVersion4 = 0;
+ int localVersion5 = 0;
+ int localVersion6 = 0;
+ int localVersion7 = 0;
+ int localVersion8 = 0;
+ int localVersion9 = 0;
+ FDBRecordVersion version3 = null;
+ FDBRecordVersion version4 = null;
+ FDBRecordVersion version9 = null;
+ FDBStoredSizes sizes2;
+ FDBStoredSizes sizes3 = null;
+ FDBStoredSizes sizes5 = null;
+ final byte[] valueGlobalVersion = "-hastings-".getBytes(StandardCharsets.US_ASCII);
+ byte[] keyGlobalVersion;
+ try (FDBRecordContext context = openContext()) {
+ // One unsplit record
+ localVersion2 = context.claimLocalVersion();
+ sizes2 = writeDummyRecord(context, key2, null, 1, testConfig.omitUnsplitSuffix, testConfig.useVersionInKey, localVersion2);
+ assertThat(sizes2.isSplit(), is(false));
+
+ if ((!testConfig.omitUnsplitSuffix) && (!testConfig.useVersionInKey)) {
+ // One record with version
+ localVersion3 = context.claimLocalVersion();
+ version3 = FDBRecordVersion.complete(valueGlobalVersion, localVersion3);
+ sizes3 = writeDummyRecord(context, key3, version3, 1, false, false, 0);
+ assertThat(sizes3.isVersionedInline(), is(true));
+
+ // One version but missing record
+ localVersion4 = context.claimLocalVersion();
+ version4 = FDBRecordVersion.complete(valueGlobalVersion, localVersion4);
+ writeDummyRecord(context, key4, version4, 1, false, false, 0);
+ context.ensureActive().clear(subspace.pack(key4.add(SplitHelper.UNSPLIT_RECORD)));
+ }
+
+ if (testConfig.splitLongRecords) {
+ // One split record
+ localVersion5 = context.claimLocalVersion();
+ sizes5 = writeDummyRecord(context, key5, null, MEDIUM_COPIES, false, testConfig.useVersionInKey, localVersion5);
+ assertEquals(MEDIUM_COPIES, sizes5.getKeyCount());
+
+ // One split record but then delete the last split point (no way to distinguish this from just inserting one fewer split)
+ writeDummyRecord(context, key5, null, MEDIUM_COPIES + 1, false, testConfig.useVersionInKey, localVersion5);
+
+ // One split record then delete the first split point
+ localVersion6 = context.claimLocalVersion();
+ writeDummyRecord(context, key6, null, MEDIUM_COPIES, false, testConfig.useVersionInKey, localVersion6);
+
+ // One split record then delete the middle split point
+ localVersion7 = context.claimLocalVersion();
+ writeDummyRecord(context, key7, null, MEDIUM_COPIES, false, testConfig.useVersionInKey, localVersion7);
+
+ // One split record then add an extra key in the middle
+ localVersion8 = context.claimLocalVersion();
+ writeDummyRecord(context, key8, null, MEDIUM_COPIES, false, testConfig.useVersionInKey, localVersion8);
+ writeDummyKV(context, key8.add(SplitHelper.START_SPLIT_RECORD + 2).add( 0L), HUMPTY_DUMPTY, null, testConfig.useVersionInKey, localVersion8);
+
+ // One split record with version then delete the first split point
+ if (!testConfig.useVersionInKey) {
+ localVersion9 = context.claimLocalVersion();
+ version9 = FDBRecordVersion.complete(valueGlobalVersion, context.claimLocalVersion());
+ writeDummyRecord(context, key9, version9, MEDIUM_COPIES, false, false, 0);
+ }
+ }
+
+ commit(context);
+ keyGlobalVersion = context.getVersionStamp();
+ }
+
+ // transaction 2 - delete any items needing deleting
+ try (FDBRecordContext context = openContext()) {
+ // One split record but then delete the last split point (no way to distinguish this from just inserting one fewer split)
+ if (testConfig.splitLongRecords) {
+ final Tuple deleteKey5 = toCompleteKey(key5.add(SplitHelper.START_SPLIT_RECORD + MEDIUM_COPIES), keyGlobalVersion, localVersion5, testConfig.useVersionInKey);
+ context.ensureActive().clear(subspace.pack(deleteKey5));
+ // One split record then delete the first split point
+ final Tuple deleteKey6 = toCompleteKey(key6.add(SplitHelper.START_SPLIT_RECORD), keyGlobalVersion, localVersion6, testConfig.useVersionInKey);
+ context.ensureActive().clear(subspace.pack(deleteKey6));
+ // One split record then delete the middle split point
+ final Tuple deleteKey7 = toCompleteKey(key7.add(SplitHelper.START_SPLIT_RECORD + 2), keyGlobalVersion, localVersion7, testConfig.useVersionInKey);
+ context.ensureActive().clear(subspace.pack(deleteKey7));
+ if (!testConfig.useVersionInKey) {
+ // One split record with version then delete the first split point
+ final Tuple DeleteKey9 = toCompleteKey(key9.add(SplitHelper.START_SPLIT_RECORD), keyGlobalVersion, localVersion9, testConfig.useVersionInKey);
+ context.ensureActive().clear(subspace.pack(DeleteKey9));
+ }
+ }
+ commit(context);
+ }
+ List completedKeys = List.of(
+ toCompleteKey(key1, keyGlobalVersion, 0, testConfig.useVersionInKey),
+ toCompleteKey(key2, keyGlobalVersion, localVersion2, testConfig.useVersionInKey),
+ toCompleteKey(key3, keyGlobalVersion, localVersion3, testConfig.useVersionInKey),
+ toCompleteKey(key4, keyGlobalVersion, localVersion4, testConfig.useVersionInKey),
+ toCompleteKey(key5, keyGlobalVersion, localVersion5, testConfig.useVersionInKey),
+ toCompleteKey(key6, keyGlobalVersion, localVersion6, testConfig.useVersionInKey),
+ toCompleteKey(key7, keyGlobalVersion, localVersion7, testConfig.useVersionInKey),
+ toCompleteKey(key8, keyGlobalVersion, localVersion8, testConfig.useVersionInKey),
+ toCompleteKey(key9, keyGlobalVersion, localVersion9, testConfig.useVersionInKey));
+ // transaction 3 - verify
+ verifyRecords(testConfig, loadRecordFunction, completedKeys,
+ sizes2, sizes3, sizes5, version3, version4, version9);
+ }
+
+ private void verifyRecords(final SplitHelperTestConfig testConfig, final @Nonnull LoadRecordFunction loadRecordFunction, final List completeKeys, final FDBStoredSizes sizes2, final FDBStoredSizes sizes3, final FDBStoredSizes sizes5, final FDBRecordVersion version3, final FDBRecordVersion version4, final FDBRecordVersion version9) {
+ try (FDBRecordContext context = openContext()) {
+ // No record
+ loadRecordFunction.load(context, completeKeys.get(0), null, null, null);
+ // One unsplit record
+ loadRecordFunction.load(context, completeKeys.get(1), sizes2, HUMPTY_DUMPTY, null);
+ if ((!testConfig.omitUnsplitSuffix) && (!testConfig.useVersionInKey)) {
+ // One record with version
+ loadRecordFunction.load(context, completeKeys.get(2), sizes3, HUMPTY_DUMPTY, version3);
+ // One version but missing record
+ final FDBRecordVersion v4 = version4;
+ assertThrows(SplitHelper.FoundSplitWithoutStartException.class,
+ () -> loadRecordFunction.load(context, completeKeys.get(3), null, null, v4));
+ }
+ if (testConfig.splitLongRecords) {
+ // One split record
+ // One split record but then delete the last split point (no way to distinguish this from just inserting one fewer split)
+ loadRecordFunction.load(context, completeKeys.get(4), sizes5, MEDIUM_STRING, null);
+ // One split record then delete the first split point
+ if (testConfig.loadViaGets) {
+ loadRecordFunction.load(context, completeKeys.get(5), null, null, null);
+ } else {
+ assertThrows(SplitHelper.FoundSplitWithoutStartException.class,
+ () -> loadRecordFunction.load(context, completeKeys.get(5), null, null, null));
+ }
+ // One split record then delete the middle split point
+ RecordCoreException err7 = assertThrows(RecordCoreException.class,
+ () -> loadRecordFunction.load(context, completeKeys.get(6), null, null, null));
+ assertThat(err7.getMessage(), containsString("Split record segments out of order"));
+ // One split record then add an extra key in the middle
+ RecordCoreException err8 = assertThrows(RecordCoreException.class,
+ () -> loadRecordFunction.load(context, completeKeys.get(7), null, null, null));
+ assertThat(err8.getMessage(), anyOf(
+ containsString("Expected only a single key extension"),
+ containsString("Split record segments out of order")
+ ));
+ // One split record with version then delete the first split point
+ if (!testConfig.useVersionInKey) {
+ final FDBRecordVersion v9 = version9;
+ assertThrows(SplitHelper.FoundSplitWithoutStartException.class,
+ () -> loadRecordFunction.load(context, completeKeys.get(8), null, null, v9));
+ }
+ }
+ }
+ }
+
+ @Nullable
+ private FDBRawRecord loadWithSplit(@Nonnull FDBRecordContext context, @Nonnull Tuple key, @Nonnull SplitHelperTestConfig testConfig,
+ @Nullable FDBStoredSizes expectedSizes, @Nullable byte[] expectedContents, @Nullable FDBRecordVersion expectedVersion, boolean useVersionInKey) {
+ final ReadTransaction tr = context.ensureActive();
+ SplitHelper.SizeInfo sizeInfo = new SplitHelper.SizeInfo();
+ FDBRawRecord rawRecord;
+ try {
+ rawRecord = SplitHelper.loadWithSplit(tr, context, subspace, key, testConfig.splitLongRecords, testConfig.omitUnsplitSuffix, sizeInfo).get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw FDBExceptions.wrapException(e);
+ }
+
+ if (expectedSizes == null || expectedContents == null) {
+ assertNull(rawRecord);
+ assertEmpty(sizeInfo);
+ } else {
+ assertNotNull(rawRecord);
+ assertArrayEquals(expectedContents, rawRecord.getRawRecord());
+ int valueSize = expectedContents.length;
+ if (expectedVersion != null) {
+ valueSize += 1 + FDBRecordVersion.VERSION_LENGTH;
+ }
+ assertEquals(valueSize, rawRecord.getValueSize());
+ if (!testConfig.splitLongRecords) {
+ assertThat(rawRecord.isSplit(), is(false));
+ }
+ if (testConfig.omitUnsplitSuffix) {
+ assertThat(rawRecord.isVersionedInline(), is(false));
+ }
+ final int versionKeyCount = expectedVersion != null ? 1 : 0;
+ boolean isSplit = rawRecord.getKeyCount() - versionKeyCount != 1;
+ assertEquals(isSplit, rawRecord.isSplit());
+ assertEquals(key, rawRecord.getPrimaryKey());
+ if (expectedVersion != null) {
+ assertThat(rawRecord.isVersionedInline(), is(true));
+ assertEquals(expectedVersion, rawRecord.getVersion());
+ } else {
+ assertThat(rawRecord.isVersionedInline(), is(false));
+ assertNull(rawRecord.getVersion());
+ }
+
+ // Verify that the expected sizes are the same as the ones retrieved
+ assertEquals(expectedSizes.getKeyCount(), rawRecord.getKeyCount());
+ assertEquals(expectedSizes.getValueSize(), rawRecord.getValueSize());
+ assertEquals(expectedSizes.isSplit(), rawRecord.isSplit());
+ assertEquals(expectedSizes.isVersionedInline(), rawRecord.isVersionedInline());
+
+ // Verify using sizeInfo and using the raw record get the same size information
+ assertEquals(rawRecord.getKeyCount(), sizeInfo.getKeyCount());
+ assertEquals(rawRecord.getValueSize(), sizeInfo.getValueSize());
+ assertEquals(rawRecord.isSplit(), sizeInfo.isSplit());
+ assertEquals(rawRecord.isVersionedInline(), sizeInfo.isVersionedInline());
+ int keySizeAdjustment = useVersionInKey ? -4 * expectedSizes.getKeyCount() : 0; // 4 bytes diff between complete and incomplete versionStamp
+ assertEquals(expectedSizes.getKeySize() + keySizeAdjustment, rawRecord.getKeySize());
+ assertEquals(rawRecord.getKeySize(), sizeInfo.getKeySize());
+ }
+
+ return rawRecord;
+ }
+
+ private void assertEmpty(final SplitHelper.SizeInfo sizeInfo) {
+ assertEquals(0, sizeInfo.getKeyCount());
+ assertEquals(0, sizeInfo.getKeySize());
+ assertEquals(0, sizeInfo.getValueSize());
+ assertFalse(sizeInfo.isSplit());
+ assertFalse(sizeInfo.isVersionedInline());
+ }
+
+ @MethodSource("testConfigsNoDryRun")
+ @ParameterizedTest(name = "loadWithSplit[{0}]")
+ public void loadWithSplit(SplitHelperTestConfig testConfig) {
+ this.testConfig = testConfig;
+ loadSingleRecords(testConfig,
+ (context, key, expectedSizes, expectedContents, version) ->
+ loadWithSplit(context, key, testConfig, expectedSizes, expectedContents, version, testConfig.useVersionInKey));
+
+ if (testConfig.splitLongRecords) {
+ final Tuple key = Tuple.from(1307L);
+ final int localVersion;
+ final byte[] globalVersion;
+ // Unsplit record followed by some unsplit stuff
+ // This particular error is caught by the single key unsplitter but not the mulit-key one
+ try (FDBRecordContext context = openContext()) {
+ localVersion = context.claimLocalVersion();
+ writeDummyRecord(context, key, null, 1, false, testConfig.useVersionInKey, localVersion);
+ writeDummyRecord(context, key, null, MEDIUM_COPIES, false, testConfig.useVersionInKey, localVersion);
+
+ commit(context);
+ globalVersion = context.getVersionStamp();
+ }
+ Tuple completeKey = toCompleteKey(key, globalVersion, localVersion, testConfig.useVersionInKey);
+ try (FDBRecordContext context = openContext()) {
+ RecordCoreException err = assertThrows(RecordCoreException.class,
+ () -> loadWithSplit(context, completeKey, testConfig, null, null, null, testConfig.useVersionInKey));
+ assertThat(err.getMessage(), containsString("Unsplit value followed by split"));
+ }
+ }
+ }
+
+ private FDBRawRecord scanSingleRecord(@Nonnull FDBRecordContext context, boolean reverse,
+ @Nonnull Tuple key, @Nullable FDBStoredSizes expectedSizes,
+ @Nullable byte[] expectedContents, @Nullable FDBRecordVersion version,
+ boolean useVersionInKey) {
+ final ScanProperties scanProperties = reverse ? ScanProperties.REVERSE_SCAN : ScanProperties.FORWARD_SCAN;
+ KeyValueCursor kvCursor = KeyValueCursor.Builder.withSubspace(subspace)
+ .setContext(context)
+ .setRange(TupleRange.allOf(key))
+ .setScanProperties(scanProperties)
+ .build();
+ SplitHelper.KeyValueUnsplitter kvUnsplitter = new SplitHelper.KeyValueUnsplitter(context, subspace, kvCursor, false, null, scanProperties);
+
+ RecordCursorResult result = kvUnsplitter.getNext();
+ if (expectedSizes == null || expectedContents == null) {
+ assertThat(result.hasNext(), is(false));
+ assertEquals(RecordCursor.NoNextReason.SOURCE_EXHAUSTED, result.getNoNextReason());
+ return null;
+ } else {
+ assertThat(result.hasNext(), is(true));
+ final FDBRawRecord rawRecord = result.get();
+ result = kvUnsplitter.getNext();
+ assertThat(result.hasNext(), is(false));
+ assertEquals(RecordCursor.NoNextReason.SOURCE_EXHAUSTED, result.getNoNextReason());
+
+ assertNotNull(rawRecord);
+ assertEquals(key, rawRecord.getPrimaryKey());
+ assertArrayEquals(expectedContents, rawRecord.getRawRecord());
+ assertEquals(expectedSizes.getKeyCount(), rawRecord.getKeyCount());
+ assertEquals(expectedSizes.getValueSize(), rawRecord.getValueSize());
+ boolean isSplit = rawRecord.getKeyCount() - (rawRecord.isVersionedInline() ? 1 : 0) != 1;
+ assertEquals(rawRecord.getKeyCount() - (rawRecord.isVersionedInline() ? 1 : 0) != 1, expectedSizes.isSplit());
+ assertEquals(version != null, expectedSizes.isVersionedInline());
+ int keySizeAdjustment = useVersionInKey ? -4 * expectedSizes.getKeyCount() : 0; // 4 bytes diff between complete and incomplete versionStamp
+ assertEquals(expectedSizes.getKeySize() + keySizeAdjustment, rawRecord.getKeySize());
+ return rawRecord;
+ }
+ }
+
+ @ParameterizedTest(name = "scan[reverse = {0}, useVersionInKey = {1}]")
+ @BooleanSource({"reverse", "useVersionInKey"})
+ public void scanSingleRecords(boolean reverse, boolean useVersionInKey) {
+ loadSingleRecords(new SplitHelperTestConfig(true, false, FDBRecordStoreProperties.UNROLL_SINGLE_RECORD_DELETES.getDefaultValue(), false, false, useVersionInKey),
+ (context, key, expectedSizes, expectedContents, version) ->
+ scanSingleRecord(context, reverse, key, expectedSizes, expectedContents, version, useVersionInKey));
+ }
+
+ private List writeDummyRecords(boolean useVersionInKey) {
+ final byte[] valueVersion = "_cushions_".getBytes(StandardCharsets.US_ASCII);
+ // Generate primary keys using a generalization of the Fibonacci formula: https://oeis.org/A247698
+ long currKey = 2308L;
+ long nextKey = 4261L;
+
+ final int numRecords = 50;
+ final Tuple[] keys = new Tuple[numRecords];
+ final byte[][] rawBytesArr = new byte[numRecords][];
+ final FDBRecordVersion[] versions = new FDBRecordVersion[numRecords];
+ final int[] localVersions = new int[numRecords];
+ final FDBStoredSizes[] sizes = new FDBStoredSizes[numRecords];
+
+ final byte[] globalVersionStamp;
+ try (FDBRecordContext context = openContext()) {
+ for (int i = 0; i < numRecords; i++) {
+ keys[i] = Tuple.from(currKey);
+ rawBytesArr[i] = (i % 4 < 2) ? SHORT_STRING : MEDIUM_STRING;
+ versions[i] = (!useVersionInKey && (i % 2 == 0)) ? FDBRecordVersion.complete(valueVersion, context.claimLocalVersion()) : null;
+ localVersions[i] = useVersionInKey ? context.claimLocalVersion() : 0;
+ sizes[i] = writeDummyRecord(context, keys[i], versions[i], (i % 4 < 2) ? 1 : MEDIUM_COPIES, false, useVersionInKey, localVersions[i]);
+
+ long temp = currKey + nextKey;
+ currKey = nextKey;
+ nextKey = temp;
+ }
+ commit(context);
+ globalVersionStamp = context.getVersionStamp();
+ }
+
+ // Produce the raw records
+ final List rawRecords = new ArrayList<>();
+ for (int i = 0; i < numRecords; i++) {
+ Tuple expectedKey = toCompleteKey(keys[i], globalVersionStamp, localVersions[i], useVersionInKey);
+ rawRecords.add(new FDBRawRecord(expectedKey, rawBytesArr[i], versions[i], sizes[i]));
+ }
+
+ return rawRecords;
+ }
+
+ @ParameterizedTest(name = "scanMultipleRecords[reverse = {0}, useVersionInKey = {1}]")
+ @BooleanSource({"reverse", "useVersionInKey"})
+ void scanMultipleRecords(boolean reverse, boolean useVersionInKey) {
+ final ScanProperties scanProperties = reverse ? ScanProperties.REVERSE_SCAN : ScanProperties.FORWARD_SCAN;
+ List rawRecords = writeDummyRecords(useVersionInKey);
+
+ try (FDBRecordContext context = openContext()) {
+ KeyValueCursor kvCursor = KeyValueCursor.Builder.withSubspace(subspace)
+ .setContext(context)
+ .setRange(TupleRange.ALL)
+ .setScanProperties(scanProperties)
+ .build();
+ List readRecords = new SplitHelper.KeyValueUnsplitter(context, subspace, kvCursor, false, null, scanProperties)
+ .asList().join();
+ if (reverse) {
+ readRecords = Lists.reverse(readRecords);
+ }
+ assertEquals(rawRecords.size(), readRecords.size());
+ for (int i = 0; i < rawRecords.size(); i++) {
+ if (useVersionInKey) {
+ assertEqualsNoKeySize(rawRecords.get(i), readRecords.get(i));
+ } else {
+ assertEquals(rawRecords.get(i), readRecords.get(i));
+ }
+ }
+
+ commit(context);
+ }
+ }
+
+ private void assertEqualsNoKeySize(final FDBRawRecord expected, final FDBRawRecord actual) {
+ assertEquals(expected.getPrimaryKey(), actual.getPrimaryKey());
+ assertArrayEquals(expected.getRawRecord(), actual.getRawRecord());
+ assertEquals(expected.getVersion(), actual.getVersion());
+ assertEquals(expected.getKeyCount(), actual.getKeyCount());
+ assertEquals(expected.getValueSize(), actual.getValueSize());
+ assertEquals(expected.isSplit(), actual.isSplit());
+ assertEquals(expected.isVersionedInline(), actual.isVersionedInline());
+ }
+
+ @Nonnull
+ public static Stream limitsReverseVersionArgs() {
+ List limits = List.of(1, 2, 7, Integer.MAX_VALUE);
+ return ParameterizedTestUtils.cartesianProduct(
+ limits.stream(),
+ limits.stream(),
+ ParameterizedTestUtils.booleans("reverse"),
+ ParameterizedTestUtils.booleans("useVersionInKey"));
+ }
+
+ @MethodSource("limitsReverseVersionArgs")
+ @ParameterizedTest
+ void scanContinuations(final int returnLimit, final int readLimit, final boolean reverse, boolean useVersionInKey) {
+ List rawRecords = writeDummyRecords(useVersionInKey);
+ if (reverse) {
+ rawRecords = Lists.reverse(rawRecords);
+ }
+ final Iterator expectedRecordIterator = rawRecords.iterator();
+
+ try (FDBRecordContext context = openContext()) {
+ byte[] continuation = null;
+
+ do {
+ final ExecuteProperties executeProperties = ExecuteProperties.newBuilder()
+ .setReturnedRowLimit(returnLimit)
+ .setScannedRecordsLimit(readLimit)
+ .build();
+ ScanProperties scanProperties = new ScanProperties(executeProperties, reverse);
+ RecordCursor kvCursor = KeyValueCursor.Builder.withSubspace(subspace)
+ .setContext(context)
+ .setRange(TupleRange.ALL)
+ .setScanProperties(scanProperties.with(ExecuteProperties::clearRowAndTimeLimits).with(ExecuteProperties::clearState))
+ .setContinuation(continuation)
+ .build();
+ RecordCursorIterator recordCursor = new SplitHelper.KeyValueUnsplitter(context, subspace, kvCursor, false, null, scanProperties.with(ExecuteProperties::clearReturnedRowLimit))
+ .limitRowsTo(returnLimit)
+ .asIterator();
+
+ int retrieved = 0;
+ int rowsScanned = 0;
+ while (recordCursor.hasNext()) {
+ assertThat(retrieved, lessThan(returnLimit));
+ assertThat(rowsScanned, lessThanOrEqualTo(readLimit));
+
+ FDBRawRecord nextRecord = recordCursor.next();
+ assertNotNull(nextRecord);
+ assertThat(expectedRecordIterator.hasNext(), is(true));
+ FDBRawRecord expectedRecord = expectedRecordIterator.next();
+ if (useVersionInKey) {
+ assertEqualsNoKeySize(expectedRecord, nextRecord);
+ } else {
+ assertEquals(expectedRecord, nextRecord);
+ }
+
+ rowsScanned += nextRecord.getKeyCount();
+ retrieved += 1;
+ }
+
+ if (retrieved > 0) {
+ continuation = recordCursor.getContinuation();
+ if (retrieved >= returnLimit) {
+ assertEquals(RecordCursor.NoNextReason.RETURN_LIMIT_REACHED, recordCursor.getNoNextReason());
+ assertNotNull(continuation);
+ } else if (rowsScanned > readLimit) {
+ assertEquals(RecordCursor.NoNextReason.SCAN_LIMIT_REACHED, recordCursor.getNoNextReason());
+ assertNotNull(continuation);
+ } else if (rowsScanned < readLimit) {
+ assertEquals(RecordCursor.NoNextReason.SOURCE_EXHAUSTED, recordCursor.getNoNextReason());
+ } else {
+ // If we read exactly as many records as is allowed by the read record limit, then
+ // this probably means that we hit SCAN_LIMIT_REACHED, but it's also possible to
+ // hit SOURCE_EXHAUSTED if we hit the record read limit at exactly the same time
+ // as we needed to do another speculative read to determine if a split record
+ // continues or not.
+ assertEquals(readLimit, rowsScanned);
+ assertThat(recordCursor.getNoNextReason(), is(oneOf(RecordCursor.NoNextReason.SCAN_LIMIT_REACHED, RecordCursor.NoNextReason.SOURCE_EXHAUSTED)));
+ if (!recordCursor.getNoNextReason().isSourceExhausted()) {
+ assertNotNull(recordCursor.getContinuation());
+ }
+ }
+ } else {
+ assertNull(recordCursor.getContinuation());
+ continuation = null;
+ }
+ } while (continuation != null);
+
+ commit(context);
+ }
+ }
+
+ /**
+ * When two saveWithSplit calls use the same incomplete versionstamp (same localVersion, same key) within
+ * one transaction, we may get a failure or data corruption. The localVersionCache (map by key) may contain
+ * previous values from an identical key (same versionstamp/localversion/PK) but different splits and may not collide
+ * directly
+ * with the previous values. This test shows the case where there is a collision since the split numbers are the
+ * same.
+ */
+ @Test
+ void saveWithSplitVersionInKeyOverwriteInTransaction() {
+ try (FDBRecordContext context = openContext()) {
+ final int localVersion = context.claimLocalVersion();
+ // First write: VERY_LONG_STRING requires multiple splits
+ final Tuple key = Tuple.from(Versionstamp.incomplete(localVersion)).add(1066);
+ SplitHelper.saveWithSplit(context, subspace, key, VERY_LONG_STRING, null,
+ true, false,
+ false, null, null);
+
+ // Second write: LONG_STRING — same localVersion, same key, shorter value (fewer splits)
+ final RecordCoreInternalException ex = assertThrows(RecordCoreInternalException.class, () ->
+ SplitHelper.saveWithSplit(context, subspace, key, LONG_STRING, null,
+ true, false,
+ false, null, null));
+ assertTrue(ex.getMessage().contains("Key with version overwritten"));
+ }
+ }
+
+ /**
+ * When two saveWithSplit calls use the same incomplete versionstamp (same localVersion, same key) within
+ * one transaction, we may get a failure or data corruption. The localVersionCache (map by key) may contain
+ * previous values from an identical key (same versionstamp/localversion/PK) but different splits and may not
+ * collide
+ * directly with the previous values. This test shows the data corruption when two record overlap.
+ */
+ @Test
+ void saveWithSplitVersionInKeyOverlapSplits() {
+ final byte[] globalVersion;
+ final int localVersion;
+ final Tuple key = Tuple.from(1066);
+ try (FDBRecordContext context = openContext()) {
+ localVersion = context.claimLocalVersion();
+ // First write: VERY_LONG_STRING requires multiple splits
+ final Tuple incompleteKey = Tuple.from(Versionstamp.incomplete(localVersion)).add(1066);
+ SplitHelper.saveWithSplit(context, subspace, incompleteKey, VERY_LONG_STRING, null,
+ true, false,
+ false, null, null);
+
+ // Second write: SHORT_STRING — same localVersion, same key, shorter value (one split)
+ SplitHelper.saveWithSplit(context, subspace, incompleteKey, SHORT_STRING, null,
+ true, false,
+ false, null, null);
+ commit(context);
+ globalVersion = context.getVersionStamp();
+ }
+ // Record is corrupt, as the '0' and '1..n' suffixes did not conflict, and none was in the RYW cache upon writing.
+ try (FDBRecordContext context = openContext()) {
+ final Tuple completeKey = toCompleteKey(key, globalVersion, localVersion, true);
+ Exception ex = assertThrows(CompletionException.class, () ->
+ SplitHelper.loadWithSplit(context.ensureActive(), context, subspace, completeKey, true, false, null).join());
+ assertTrue(ex.getMessage().contains("Unsplit value followed by split"));
+ }
+ }
+}
diff --git a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/SplitHelperTest.java b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/SplitHelperTest.java
index 7d15cd5a04..0b7b964ded 100644
--- a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/SplitHelperTest.java
+++ b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/SplitHelperTest.java
@@ -56,7 +56,6 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
-import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.stream.LongStream;
import java.util.stream.Stream;
@@ -125,79 +124,8 @@ public void setSubspace() {
}
}
- static class SplitHelperTestConfig {
- private final boolean splitLongRecords;
- private final boolean omitUnsplitSuffix;
- private final boolean unrollRecordDeletes;
- private final boolean loadViaGets;
- private final boolean isDryRun;
-
- public SplitHelperTestConfig(boolean splitLongRecords, boolean omitUnsplitSuffix, boolean unrollRecordDeletes, boolean loadViaGets, boolean isDryRun) {
- this.splitLongRecords = splitLongRecords;
- this.omitUnsplitSuffix = omitUnsplitSuffix;
- this.unrollRecordDeletes = unrollRecordDeletes;
- this.loadViaGets = loadViaGets;
- this.isDryRun = isDryRun;
- }
-
- @Nonnull
- public RecordLayerPropertyStorage.Builder setProps(@Nonnull RecordLayerPropertyStorage.Builder props) {
- return props
- .addProp(FDBRecordStoreProperties.UNROLL_SINGLE_RECORD_DELETES, unrollRecordDeletes)
- .addProp(FDBRecordStoreProperties.LOAD_RECORDS_VIA_GETS, loadViaGets);
- }
-
- public boolean hasSplitPoints() {
- return splitLongRecords || !omitUnsplitSuffix;
- }
-
- @Override
- public String toString() {
- return "SplitHelperTestConfig{" +
- "splitLongRecords=" + splitLongRecords +
- ", omitUnsplitSuffix=" + omitUnsplitSuffix +
- ", unrollRecordDeletes=" + unrollRecordDeletes +
- ", loadViaGets=" + loadViaGets +
- ", isDryRun=" + isDryRun +
- '}';
- }
-
- @Override
- public boolean equals(final Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- final SplitHelperTestConfig that = (SplitHelperTestConfig)o;
- return splitLongRecords == that.splitLongRecords && omitUnsplitSuffix == that.omitUnsplitSuffix && unrollRecordDeletes == that.unrollRecordDeletes && loadViaGets == that.loadViaGets && isDryRun == that.isDryRun;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(splitLongRecords, omitUnsplitSuffix, unrollRecordDeletes, loadViaGets);
- }
-
- public static Stream allValidConfigs() {
- // Note that splitLongRecords="true" && omitUnsplitSuffix="true" is not valid
- return Stream.of(false, true).flatMap(splitLongRecords ->
- (splitLongRecords ? Stream.of(false) : Stream.of(false, true)).flatMap(omitUnsplitSuffix ->
- Stream.of(false, true).flatMap(unrollRecordDeletes ->
- Stream.of(false, true).flatMap(loadViaGets ->
- Stream.of(false, true).map(isDryRun ->
- new SplitHelperTestConfig(splitLongRecords, omitUnsplitSuffix, unrollRecordDeletes, loadViaGets, isDryRun))))));
- }
-
- public static SplitHelperTestConfig getDefault() {
- return new SplitHelperTestConfig(true, false,
- FDBRecordStoreProperties.UNROLL_SINGLE_RECORD_DELETES.getDefaultValue(),
- FDBRecordStoreProperties.LOAD_RECORDS_VIA_GETS.getDefaultValue(), false);
- }
- }
-
- public static Stream testConfigs() {
- return SplitHelperTestConfig.allValidConfigs().map(Arguments::of);
+ public static Stream testConfigsNoVersionInKey() {
+ return SplitHelperTestConfig.getConfigsNoVersionInKey().map(Arguments::of);
}
@Nonnull
@@ -407,7 +335,7 @@ private SplitHelper.SizeInfo saveWithSplit(@Nonnull FDBRecordContext context, @N
return saveWithSplit(context, key, serialized, null, testConfig);
}
- @MethodSource("testConfigs")
+ @MethodSource("testConfigsNoVersionInKey")
@ParameterizedTest(name = "saveWithSplit[{0}]")
public void saveWithSplit(@Nonnull SplitHelperTestConfig testConfig) {
this.testConfig = testConfig;
@@ -428,7 +356,7 @@ public void saveWithSplit(@Nonnull SplitHelperTestConfig testConfig) {
}
}
- @MethodSource("testConfigs")
+ @MethodSource("testConfigsNoVersionInKey")
@ParameterizedTest(name = "saveWithSplitAndIncompleteVersions[{0}]")
public void saveWithSplitAndIncompleteVersions(SplitHelperTestConfig testConfig) {
this.testConfig = testConfig;
@@ -471,7 +399,7 @@ public void saveWithSplitAndIncompleteVersions(SplitHelperTestConfig testConfig)
}
}
- @MethodSource("testConfigs")
+ @MethodSource("testConfigsNoVersionInKey")
@ParameterizedTest(name = "saveWithSplitAndCompleteVersion[{0}]")
public void saveWithSplitAndCompleteVersions(SplitHelperTestConfig testConfig) {
this.testConfig = testConfig;
@@ -557,7 +485,7 @@ private void deleteSplit(@Nonnull FDBRecordContext context, @Nonnull Tuple key,
assertEquals(0, count);
}
- @MethodSource("testConfigs")
+ @MethodSource("testConfigsNoVersionInKey")
@ParameterizedTest(name = "deleteWithSplit[{0}]")
public void deleteWithSplit(SplitHelperTestConfig testConfig) {
this.testConfig = testConfig;
@@ -591,7 +519,7 @@ static Stream deleteWithSplitAndVersion() {
}
@ParameterizedTest(name = "deleteWithSplitAndVersion[{0}]")
- @MethodSource("testConfigs")
+ @MethodSource("testConfigsNoVersionInKey")
public void deleteWithSplitAndVersion(SplitHelperTestConfig testConfig) {
this.testConfig = testConfig;
Assumptions.assumeFalse(testConfig.omitUnsplitSuffix);
@@ -763,7 +691,7 @@ private FDBRawRecord loadWithSplit(@Nonnull FDBRecordContext context, @Nonnull T
return loadWithSplit(context, key, testConfig, expectedSizes, expectedContents, null);
}
- @MethodSource("testConfigs")
+ @MethodSource("testConfigsNoVersionInKey")
@ParameterizedTest(name = "loadWithSplit[{0}]")
public void loadWithSplit(SplitHelperTestConfig testConfig) {
this.testConfig = testConfig;
@@ -823,7 +751,7 @@ private FDBRawRecord scanSingleRecord(@Nonnull FDBRecordContext context, boolean
@ParameterizedTest(name = "scan[reverse = {0}]")
@BooleanSource
public void scanSingleRecords(boolean reverse) {
- loadSingleRecords(new SplitHelperTestConfig(true, false, FDBRecordStoreProperties.UNROLL_SINGLE_RECORD_DELETES.getDefaultValue(), false, false),
+ loadSingleRecords(new SplitHelperTestConfig(true, false, FDBRecordStoreProperties.UNROLL_SINGLE_RECORD_DELETES.getDefaultValue(), false, false, false),
(context, key, expectedSizes, expectedContents, version) -> scanSingleRecord(context, reverse, key, expectedSizes, expectedContents, version));
}
@@ -955,4 +883,35 @@ public void scanContinuations(final int returnLimit, final int readLimit, final
commit(context);
}
}
+
+ public static Stream previousKeyEquivalence() {
+ return Stream.of(
+ Arguments.of(Tuple.from(1066L), SplitHelper.START_SPLIT_RECORD),
+ Arguments.of(Tuple.from(1066L), SplitHelper.START_SPLIT_RECORD + 1),
+ Arguments.of(Tuple.from(1066L), SplitHelper.START_SPLIT_RECORD + 2),
+ Arguments.of(Tuple.from(1066L), SplitHelper.UNSPLIT_RECORD),
+ Arguments.of(Tuple.from(1066L), SplitHelper.RECORD_VERSION),
+ Arguments.of(Tuple.from(1066L, "extra"), SplitHelper.START_SPLIT_RECORD),
+ Arguments.of(Tuple.from(), SplitHelper.START_SPLIT_RECORD),
+ Arguments.of(Tuple.from(), SplitHelper.UNSPLIT_RECORD)
+ );
+ }
+
+ /**
+ * Verify that the refactored split call site in {@link SplitHelper} — which now passes
+ * {@code subspace} + {@code key.add(index)} to {@code packSplitKey} instead of the old
+ * {@code subspace.subspace(key)} + {@code Tuple.from(index)} — produces identical byte keys.
+ */
+ @MethodSource("previousKeyEquivalence")
+ @ParameterizedTest(name = "previousKeyEquivalence[key={0}, index={1}]")
+ void previousKeyEquivalence(Tuple key, long index) {
+ Subspace subspace = new Subspace(Tuple.from("test"));
+
+ // Old call: subspace.subspace(key).pack(index)
+ byte[] oldKey = subspace.subspace(key).pack(index);
+ // New call subspace.pack(key.add(index))
+ byte[] newKey = subspace.pack(key.add(index));
+
+ assertArrayEquals(oldKey, newKey);
+ }
}
diff --git a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/SplitHelperTestConfig.java b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/SplitHelperTestConfig.java
new file mode 100644
index 0000000000..f4bd996153
--- /dev/null
+++ b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/SplitHelperTestConfig.java
@@ -0,0 +1,114 @@
+/*
+ * SplitHelperTestConfig.java
+ *
+ * This source file is part of the FoundationDB open source project
+ *
+ * Copyright 2015-2026 Apple Inc. and the FoundationDB project authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apple.foundationdb.record.provider.foundationdb;
+
+import com.apple.foundationdb.record.FDBRecordStoreProperties;
+import com.apple.foundationdb.record.provider.foundationdb.properties.RecordLayerPropertyStorage;
+
+import javax.annotation.Nonnull;
+import java.util.Objects;
+import java.util.stream.Stream;
+
+public class SplitHelperTestConfig {
+ public final boolean splitLongRecords;
+ public final boolean omitUnsplitSuffix;
+ public final boolean unrollRecordDeletes;
+ public final boolean loadViaGets;
+ public final boolean isDryRun;
+ public final boolean useVersionInKey;
+
+ public SplitHelperTestConfig(boolean splitLongRecords, boolean omitUnsplitSuffix, boolean unrollRecordDeletes,
+ boolean loadViaGets, boolean isDryRun, boolean useVersionInKey) {
+ this.splitLongRecords = splitLongRecords;
+ this.omitUnsplitSuffix = omitUnsplitSuffix;
+ this.unrollRecordDeletes = unrollRecordDeletes;
+ this.loadViaGets = loadViaGets;
+ this.isDryRun = isDryRun;
+ this.useVersionInKey = useVersionInKey;
+ }
+
+ @Nonnull
+ public RecordLayerPropertyStorage.Builder setProps(@Nonnull RecordLayerPropertyStorage.Builder props) {
+ return props
+ .addProp(FDBRecordStoreProperties.UNROLL_SINGLE_RECORD_DELETES, unrollRecordDeletes)
+ .addProp(FDBRecordStoreProperties.LOAD_RECORDS_VIA_GETS, loadViaGets);
+ }
+
+ public boolean hasSplitPoints() {
+ return splitLongRecords || !omitUnsplitSuffix;
+ }
+
+ @Override
+ public String toString() {
+ return "SplitHelperTestConfig{" +
+ "splitLongRecords=" + splitLongRecords +
+ ", omitUnsplitSuffix=" + omitUnsplitSuffix +
+ ", unrollRecordDeletes=" + unrollRecordDeletes +
+ ", loadViaGets=" + loadViaGets +
+ ", isDryRun=" + isDryRun +
+ ", useVersionInKey=" + useVersionInKey +
+ '}';
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final SplitHelperTestConfig that = (SplitHelperTestConfig)o;
+ return splitLongRecords == that.splitLongRecords && omitUnsplitSuffix == that.omitUnsplitSuffix &&
+ unrollRecordDeletes == that.unrollRecordDeletes && loadViaGets == that.loadViaGets &&
+ isDryRun == that.isDryRun && useVersionInKey == that.useVersionInKey;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(splitLongRecords, omitUnsplitSuffix, unrollRecordDeletes, loadViaGets, isDryRun, useVersionInKey);
+ }
+
+ public static Stream allValidConfigs() {
+ // Note that splitLongRecords="true" && omitUnsplitSuffix="true" is not valid
+ return Stream.of(false, true).flatMap(useVersionInKey ->
+ Stream.of(false, true).flatMap(splitLongRecords ->
+ (splitLongRecords ? Stream.of(false) : Stream.of(false, true)).flatMap(omitUnsplitSuffix ->
+ Stream.of(false, true).flatMap(unrollRecordDeletes ->
+ Stream.of(false, true).flatMap(loadViaGets ->
+ Stream.of(false, true).map(isDryRun ->
+ new SplitHelperTestConfig(splitLongRecords, omitUnsplitSuffix, unrollRecordDeletes, loadViaGets, isDryRun, useVersionInKey)))))));
+ }
+
+ public static Stream getConfigsNoDryRun() {
+ return allValidConfigs().filter(config -> !config.isDryRun);
+ }
+
+ public static Stream getConfigsNoVersionInKey() {
+ return allValidConfigs().filter(config -> !config.useVersionInKey);
+ }
+
+ public static SplitHelperTestConfig getDefault() {
+ return new SplitHelperTestConfig(true, false,
+ FDBRecordStoreProperties.UNROLL_SINGLE_RECORD_DELETES.getDefaultValue(),
+ FDBRecordStoreProperties.LOAD_RECORDS_VIA_GETS.getDefaultValue(), false, false);
+ }
+}
diff --git a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/PendingWriteQueue.java b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/PendingWriteQueue.java
index 038ce98f66..6bdb550b0e 100644
--- a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/PendingWriteQueue.java
+++ b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/PendingWriteQueue.java
@@ -38,9 +38,11 @@
import com.apple.foundationdb.record.lucene.LuceneLogMessageKeys;
import com.apple.foundationdb.record.lucene.LucenePendingWriteQueueProto;
import com.apple.foundationdb.record.metadata.Index;
+import com.apple.foundationdb.record.provider.foundationdb.FDBRawRecord;
import com.apple.foundationdb.record.provider.foundationdb.FDBRecordContext;
import com.apple.foundationdb.record.provider.foundationdb.FDBRecordVersion;
import com.apple.foundationdb.record.provider.foundationdb.KeyValueCursor;
+import com.apple.foundationdb.record.provider.foundationdb.SplitHelper;
import com.apple.foundationdb.subspace.Subspace;
import com.apple.foundationdb.tuple.Tuple;
import com.apple.foundationdb.tuple.Versionstamp;
@@ -179,13 +181,21 @@ public RecordCursor getQueueCursor(
@Nonnull FDBRecordContext context,
@Nonnull ScanProperties scanProperties,
@Nullable byte[] continuation) {
-
- final KeyValueCursor cursor = KeyValueCursor.Builder.newBuilder(queueSubspace)
+ KeyValueCursor inner = KeyValueCursor.Builder.newBuilder(queueSubspace)
.setContext(context)
- .setScanProperties(scanProperties)
+ .setScanProperties(scanProperties
+ .with(ExecuteProperties::clearRowAndTimeLimits)
+ .with(ExecuteProperties::clearSkipAndLimit)
+ .with(ExecuteProperties::clearState))
.setContinuation(continuation)
.build();
- return cursor.map(kv -> PendingWritesQueueHelper.toQueueEntry(queueSubspace, serializer, kv));
+ RecordCursor unsplitter = new SplitHelper.KeyValueUnsplitter(
+ context, queueSubspace, inner,
+ false, null, scanProperties)
+ .limitRowsTo(scanProperties.getExecuteProperties().getReturnedRowLimit());
+
+ return unsplitter.map(rawRecord ->
+ PendingWritesQueueHelper.toQueueEntry(serializer, rawRecord.getPrimaryKey(), rawRecord.getRawRecord()));
}
/**
@@ -204,7 +214,9 @@ public void clearEntry(@Nonnull FDBRecordContext context, @Nonnull QueueEntry en
throw new RecordCoreArgumentException("Queue item should have complete version stamp");
}
- context.ensureActive().clear(queueSubspace.pack(entry.versionstamp));
+ // The only element of the key is the completed version stamp
+ final Tuple key = Tuple.from(entry.getVersionstamp());
+ SplitHelper.deleteSplit(context, queueSubspace, key, true, false, false, null);
// Atomically decrement the queue size counter
mutateQueueSizeCounter(context, -1);
@@ -328,20 +340,11 @@ private void enqueueOperationInternal(
// Build key with incomplete versionStamp with a new local version
FDBRecordVersion recordVersion = FDBRecordVersion.incomplete(context.claimLocalVersion());
+ // key contains only an incomplete version
Tuple keyTuple = Tuple.from(recordVersion.toVersionstamp());
- byte[] queueKey = queueSubspace.packWithVersionstamp(keyTuple);
byte[] value = serializer.encode(builder.build().toByteArray());
-
- // Use addVersionMutation to let FDB assign the versionStamp
- final byte[] current = context.addVersionMutation(
- MutationType.SET_VERSIONSTAMPED_KEY,
- queueKey,
- value);
-
- if (current != null) {
- // This should never happen
- throw new RecordCoreInternalException("Pending queue item overwritten");
- }
+ // save with splits
+ SplitHelper.saveWithSplit(context, queueSubspace, keyTuple, value, null, true, false, false, null, null);
// Atomically increment the queue size counter
mutateQueueSizeCounter(context, 1);
diff --git a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/PendingWritesQueueHelper.java b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/PendingWritesQueueHelper.java
index 00c3cfbfb2..e36b1a328d 100644
--- a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/PendingWritesQueueHelper.java
+++ b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/PendingWritesQueueHelper.java
@@ -20,14 +20,12 @@
package com.apple.foundationdb.record.lucene.directory;
-import com.apple.foundationdb.KeyValue;
import com.apple.foundationdb.annotation.API;
import com.apple.foundationdb.record.RecordCoreArgumentException;
-import com.apple.foundationdb.record.RecordCoreInternalException;
+import com.apple.foundationdb.record.RecordCoreStorageException;
import com.apple.foundationdb.record.lucene.LuceneDocumentFromRecord;
import com.apple.foundationdb.record.lucene.LuceneIndexExpressions;
import com.apple.foundationdb.record.lucene.LucenePendingWriteQueueProto;
-import com.apple.foundationdb.subspace.Subspace;
import com.apple.foundationdb.tuple.Tuple;
import com.apple.foundationdb.tuple.Versionstamp;
import com.google.protobuf.InvalidProtocolBufferException;
@@ -40,15 +38,18 @@
@API(API.Status.INTERNAL)
public final class PendingWritesQueueHelper {
- public static PendingWriteQueue.QueueEntry toQueueEntry(final Subspace queueSubspace, LuceneSerializer serializer, final KeyValue kv) {
+ /**
+ * Convert a raw record back to a queue entry.
+ */
+ public static PendingWriteQueue.QueueEntry toQueueEntry(LuceneSerializer serializer, Tuple keyTuple, byte[] valueBytes) {
try {
- Tuple keyTuple = queueSubspace.unpack(kv.getKey());
final Versionstamp versionstamp = keyTuple.getVersionstamp(0);
- final byte[] value = serializer.decode(kv.getValue());
+ final byte[] value = serializer.decode(valueBytes);
LucenePendingWriteQueueProto.PendingWriteItem item = LucenePendingWriteQueueProto.PendingWriteItem.parseFrom(value);
return new PendingWriteQueue.QueueEntry(versionstamp, item);
} catch (InvalidProtocolBufferException e) {
- throw new RecordCoreInternalException("Failed to parse queue item", e);
+ throw new RecordCoreStorageException("Failed to parse queue item", e)
+ .addLogInfo("key", keyTuple);
}
}
diff --git a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/directory/PendingWriteQueueTest.java b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/directory/PendingWriteQueueTest.java
index bf05675df3..cb79ca15d9 100644
--- a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/directory/PendingWriteQueueTest.java
+++ b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/directory/PendingWriteQueueTest.java
@@ -32,11 +32,14 @@
import com.apple.foundationdb.record.lucene.LuceneEvents;
import com.apple.foundationdb.record.lucene.LuceneIndexExpressions;
import com.apple.foundationdb.record.lucene.LucenePendingWriteQueueProto;
-import com.apple.foundationdb.record.provider.foundationdb.FDBExceptions;
import com.apple.foundationdb.record.provider.foundationdb.FDBRecordContext;
import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStoreTestBase;
import com.apple.foundationdb.subspace.Subspace;
import com.apple.foundationdb.tuple.Tuple;
+import com.apple.foundationdb.tuple.Versionstamp;
+import com.apple.test.ParameterizedTestUtils;
+import com.apple.test.RandomSeedSource;
+import com.apple.test.RandomizedTestUtils;
import com.apple.test.Tags;
import com.google.common.collect.Streams;
import org.assertj.core.api.Assertions;
@@ -45,7 +48,9 @@
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.MethodSource;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -54,6 +59,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -68,6 +74,8 @@
*/
@Tag(Tags.RequiresFDB)
class PendingWriteQueueTest extends FDBRecordStoreTestBase {
+ private static final String CHARACTERS = "abcdefghijklmnopqrstuvwxyz0123456789";
+
LuceneSerializer serializer;
@BeforeEach
@@ -240,16 +248,16 @@ void testIterateWithContinuations() {
commit(context);
}
- // There are 8 documents, reading with limit=3 will create 2 continuations
+ // There are 10 documents, reading with limit=4 will create 2 continuations
List allResults = new ArrayList<>();
RecordCursorContinuation continuation;
ScanProperties scanProperties = new ScanProperties(ExecuteProperties.newBuilder()
- .setReturnedRowLimit(3)
+ .setReturnedRowLimit(4)
.setIsolationLevel(IsolationLevel.SERIALIZABLE)
.build());
- // First iteration - 3 elements
+ // First iteration - 4 elements
try (FDBRecordContext context = openContext()) {
final RecordCursor cursor = queue.getQueueCursor(context, scanProperties, null);
final RecordCursorResult lastResult = cursor.forEachResult(result -> {
@@ -257,9 +265,9 @@ void testIterateWithContinuations() {
}).join();
continuation = lastResult.getContinuation();
}
- assertEquals(3, allResults.size());
+ assertEquals(4, allResults.size());
- // Second iteration - 3 elements
+ // Second iteration - 4 elements
try (FDBRecordContext context = openContext()) {
final RecordCursor cursor = queue.getQueueCursor(context, scanProperties, continuation.toBytes());
final RecordCursorResult lastResult = cursor.forEachResult(result -> {
@@ -267,9 +275,9 @@ void testIterateWithContinuations() {
}).join();
continuation = lastResult.getContinuation();
}
- assertEquals(6, allResults.size());
+ assertEquals(8, allResults.size());
- // Third iteration - 2 elements
+ // Third iteration - 2 element
try (FDBRecordContext context = openContext()) {
final RecordCursor cursor = queue.getQueueCursor(context, scanProperties, continuation.toBytes());
final RecordCursorResult lastResult = cursor.forEachResult(result -> {
@@ -277,7 +285,7 @@ void testIterateWithContinuations() {
}).join();
continuation = lastResult.getContinuation();
}
- assertEquals(8, allResults.size());
+ assertEquals(10, allResults.size());
// Ensure all documents show up in the results
List allDocs = Streams.concat(docs.stream(), moreDocs.stream()).collect(Collectors.toList());
@@ -355,22 +363,29 @@ void testFailToDeserialize() {
}
}
- @Test
- void testQueueItemTooLarge() throws Exception {
- StringBuilder builder = new StringBuilder();
- for (int i = 0 ; i < 100_000 ; i++) {
- builder.append("Hello ");
- }
- String hugeString = builder.toString();
- TestDocument docWithHugeString = new TestDocument(primaryKey("Huge"),
- List.of(createField("f2", hugeString.toString(), LuceneIndexExpressions.DocumentFieldType.STRING, false, false)));
+ static Stream seedAndUseCompression() {
+ return ParameterizedTestUtils.cartesianProduct(
+ RandomizedTestUtils.randomSeeds(476373L, 7768884L),
+ ParameterizedTestUtils.booleans("useCompression"));
+ }
- LuceneSerializer passThroughSerializer = new PassThroughLuceneSerializer();
+ @ParameterizedTest
+ @MethodSource("seedAndUseCompression")
+ void testLargeQueueItem(long seed, boolean useCompression) throws Exception {
+ // Test that we can store large queue items with and without compression
+ TestDocument docWithHugeString = createHugeDocument(new Random(seed));
+
+ LuceneSerializer serializerToUse;
+ if (useCompression) {
+ serializerToUse = serializer;
+ } else {
+ serializerToUse = new PassThroughLuceneSerializer();
+ }
PendingWriteQueue queue;
try (FDBRecordContext context = openContext()) {
- queue = getQueue(context);
- // save a single doc using the good queue (should succeed since the serializer compresses the data)
+ queue = getQueue(context, serializerToUse);
+ // save a single doc using the appropriate serializer (should succeed since we split the records even for uncompressed)
queue.enqueueInsert(context, docWithHugeString.getPrimaryKey(), docWithHugeString.getFields());
commit(context);
}
@@ -379,18 +394,72 @@ void testQueueItemTooLarge() throws Exception {
RecordCursor queueCursor = queue.getQueueCursor(context, ScanProperties.FORWARD_SCAN, null);
List list = queueCursor.asList().get();
assertEquals(1, list.size());
- assertEquals(hugeString, list.get(0).getDocumentFields().get(0).getStringValue());
+ assertEquals(docWithHugeString.getFields().get(0).getValue(), list.get(0).getDocumentFields().get(0).getStringValue());
+ }
+ }
+
+ @ParameterizedTest
+ @RandomSeedSource({663377L, 7758893L})
+ void testLargeQueueItemDelete(long seed) {
+ // A split entry (>100KB) must be fully removed when clearEntry is called
+ TestDocument docWithHugeString = createHugeDocument(new Random(seed));
+ TestDocument normalDoc = new TestDocument(primaryKey("Normal"),
+ List.of(createField("f", "small", LuceneIndexExpressions.DocumentFieldType.STRING, false, false)));
+
+ PendingWriteQueue queue;
+ try (FDBRecordContext context = openContext()) {
+ queue = getQueue(context, new PassThroughLuceneSerializer());
+ queue.enqueueInsert(context, docWithHugeString.getPrimaryKey(), docWithHugeString.getFields());
+ queue.enqueueInsert(context, normalDoc.getPrimaryKey(), normalDoc.getFields());
+ commit(context);
}
+ List entries;
+ try (FDBRecordContext context = openContext()) {
+ entries = queue.getQueueCursor(context, ScanProperties.FORWARD_SCAN, null).asList().join();
+ }
+ assertEquals(2, entries.size());
+
+ // Delete the large entry
try (FDBRecordContext context = openContext()) {
- PendingWriteQueue failingQueue = getQueue(context, passThroughSerializer);
- // save a single doc using the bad serializer (should fail as the entry will be too large)
- failingQueue.enqueueInsert(context, docWithHugeString.getPrimaryKey(), docWithHugeString.getFields());
- Assertions.assertThatThrownBy(() -> commit(context))
- .isInstanceOf(FDBExceptions.FDBStoreValueSizeException.class);
+ queue.clearEntry(context, entries.get(0));
+ commit(context);
+ }
+
+ // Only the normal doc should remain
+ try (FDBRecordContext context = openContext()) {
+ List remaining = queue.getQueueCursor(context, ScanProperties.FORWARD_SCAN, null).asList().join();
+ assertEquals(1, remaining.size());
+ assertEquals("small", remaining.get(0).getDocumentFields().get(0).getStringValue());
}
}
+ @Test
+ void testClearEntryWithIncompleteVersionstamp() {
+ try (FDBRecordContext context = openContext()) {
+ PendingWriteQueue queue = getQueue(context);
+ // Manufacture an entry with an incomplete versionstamp
+ Versionstamp incomplete = Versionstamp.incomplete(0);
+ PendingWriteQueue.QueueEntry entryWithIncompleteStamp = new PendingWriteQueue.QueueEntry(
+ incomplete,
+ LucenePendingWriteQueueProto.PendingWriteItem.getDefaultInstance());
+ Assertions.assertThatThrownBy(() -> queue.clearEntry(context, entryWithIncompleteStamp))
+ .isInstanceOf(RecordCoreArgumentException.class)
+ .hasMessageContaining("complete");
+ }
+ }
+
+ @Nonnull
+ private TestDocument createHugeDocument(Random random) {
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0 ; i < 500_000 ; i++) {
+ builder.append(CHARACTERS.charAt(random.nextInt(CHARACTERS.length())));
+ }
+ TestDocument docWithHugeString = new TestDocument(primaryKey("Huge"),
+ List.of(createField("f", builder.toString(), LuceneIndexExpressions.DocumentFieldType.STRING, false, false)));
+ return docWithHugeString;
+ }
+
private PendingWriteQueue getQueue(FDBRecordContext context) {
return getQueue(context, serializer);
}
@@ -507,7 +576,10 @@ private List createTestDocuments() {
createField("long field", 6L, LuceneIndexExpressions.DocumentFieldType.LONG, true, false),
createField("double field", 3.14D, LuceneIndexExpressions.DocumentFieldType.DOUBLE, true, true)));
- return List.of(docWithNoFields, docWithOneFields, docWithMultipleFields, docWithAllFieldTypes);
+ Random random = new Random();
+ TestDocument hugeDoc = createHugeDocument(random);
+
+ return List.of(docWithNoFields, docWithOneFields, docWithMultipleFields, hugeDoc, docWithAllFieldTypes);
}
@Nonnull