-
Notifications
You must be signed in to change notification settings - Fork 123
Lucene: Split large Lucene pending writes queue entries #3970
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
f3ded4a
d7e130d
a3ef2ef
515d431
6bdd1cf
783c3b1
8656bdb
fc24b5b
c2a3cc6
8c260e0
493a40f
57f960d
84598da
7071e40
0f89ece
e7dd234
cf54a2d
05c958e
fb0085c
8f7038e
c1b641c
1a68f22
d7deeed
e9da6a3
f93aa9f
96eccea
4524208
db31556
3e68faa
f196baf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
|
|
@@ -33,6 +33,7 @@ | |||||||
| import com.apple.foundationdb.record.FDBRecordStoreProperties; | ||||||||
| import com.apple.foundationdb.record.RecordCoreArgumentException; | ||||||||
| import com.apple.foundationdb.record.RecordCoreException; | ||||||||
| import com.apple.foundationdb.record.RecordCoreInternalException; | ||||||||
| import com.apple.foundationdb.record.RecordCoreStorageException; | ||||||||
| import com.apple.foundationdb.record.RecordCursor; | ||||||||
| import com.apple.foundationdb.record.RecordCursorContinuation; | ||||||||
|
|
@@ -121,7 +122,7 @@ | |||||||
| * @param sizeInfo optional size information to populate | ||||||||
| */ | ||||||||
| @SuppressWarnings("PMD.CloseResource") | ||||||||
| public static void saveWithSplit(@Nonnull final FDBRecordContext context, @Nonnull final Subspace subspace, | ||||||||
|
Check warning on line 125 in fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/SplitHelper.java
|
||||||||
| @Nonnull final Tuple key, @Nonnull final byte[] serialized, @Nullable final FDBRecordVersion version, | ||||||||
| final boolean splitLongRecords, final boolean omitUnsplitSuffix, | ||||||||
| final boolean clearBasedOnPreviousSizeInfo, @Nullable final FDBStoredSizes previousSizeInfo, | ||||||||
|
|
@@ -132,17 +133,26 @@ | |||||||
| .addLogInfo(LogMessageKeys.SUBSPACE, ByteArrayUtil2.loggable(subspace.pack())) | ||||||||
| .addLogInfo(LogMessageKeys.VERSION, version); | ||||||||
| } | ||||||||
| final Transaction tr = context.ensureActive(); | ||||||||
| boolean hasVersionInKey = key.hasIncompleteVersionstamp(); | ||||||||
| if ((version != null) && hasVersionInKey) { | ||||||||
| // Cannot have versionStamps in BOTH key and value | ||||||||
| throw new RecordCoreArgumentException("Cannot save versionStamp in both key and value"); | ||||||||
| } | ||||||||
| if (serialized.length > SplitHelper.SPLIT_RECORD_SIZE) { | ||||||||
| if (!splitLongRecords) { | ||||||||
| throw new RecordCoreException("Record is too long to be stored in a single value; consider split_long_records") | ||||||||
| .addLogInfo(LogMessageKeys.KEY_TUPLE, key) | ||||||||
| .addLogInfo(LogMessageKeys.SUBSPACE, ByteArrayUtil2.loggable(subspace.pack())) | ||||||||
| .addLogInfo(LogMessageKeys.VALUE_SIZE, serialized.length); | ||||||||
| } | ||||||||
| writeSplitRecord(context, subspace, key, serialized, clearBasedOnPreviousSizeInfo, previousSizeInfo, sizeInfo); | ||||||||
| writeSplitRecord(context, subspace, key, serialized, hasVersionInKey, clearBasedOnPreviousSizeInfo, previousSizeInfo, sizeInfo); | ||||||||
| } else { | ||||||||
| if (splitLongRecords || previousSizeInfo == null || previousSizeInfo.isVersionedInline()) { | ||||||||
| // An incomplete version in the key means that we shouldn't delete previous k/v pairs using these keys since, | ||||||||
| // in the DB, from previous transactions, they would have been completed the versions already (and so wouldn't match) | ||||||||
| if (!hasVersionInKey && (splitLongRecords || previousSizeInfo == null || previousSizeInfo.isVersionedInline())) { | ||||||||
| // Note that the clearPreviousSplitRecords also removes version splits from the context cache | ||||||||
| // This is not currently supported for the case where we have versions in the keys since we can't trace the old values down | ||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we can't know the previous value, how can we have a
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With the changed logic, if the key has an incomplete version, then we can't clear older records based on size info since the size info either has complete version (previous transaction was committed) or it has incomplete versions (in which case the keys are not in the RYW cache)
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe I don't understand this, but it seems like we should be able to do this. The As it happens, I think this is fine for the use case of the pending writes queue, as a different local version is claimed with each insert. Theoretically, you could imagine leveraging this code to support versionstamps in primary keys (which we currently don't, but this gets us pretty close to being able to support), in which case this may be more important
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The issue faced was that the cache contains specific keys with split suffixes which can span multiple cache entries. In the "incomplete version in key" case we would need to replace them all when a new record gets written with the same local version. One way to introduce that would be to group all the splits by the local version and replace the entire group every time, but that seems a little excessive since there is no use case for this currently.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we do group all of the keys by their local version. The mutations currently go into a
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, added a comment about supporting in a future PR |
||||||||
| // Will be improved on in a separate PR | ||||||||
| clearPreviousSplitRecord(context, subspace, key, clearBasedOnPreviousSizeInfo, previousSizeInfo); | ||||||||
| } | ||||||||
| final Tuple recordKey; | ||||||||
|
|
@@ -151,8 +161,7 @@ | |||||||
| } else { | ||||||||
| recordKey = key; | ||||||||
| } | ||||||||
| final byte[] keyBytes = subspace.pack(recordKey); | ||||||||
| tr.set(keyBytes, serialized); | ||||||||
| final byte[] keyBytes = writeSplitValue(context, subspace, recordKey, serialized); | ||||||||
| if (sizeInfo != null) { | ||||||||
| sizeInfo.set(keyBytes, serialized); | ||||||||
| sizeInfo.setSplit(false); | ||||||||
|
|
@@ -162,23 +171,25 @@ | |||||||
| } | ||||||||
|
|
||||||||
| @SuppressWarnings("PMD.CloseResource") | ||||||||
| private static void writeSplitRecord(@Nonnull final FDBRecordContext context, @Nonnull final Subspace subspace, | ||||||||
|
Check warning on line 174 in fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/SplitHelper.java
|
||||||||
| @Nonnull final Tuple key, @Nonnull final byte[] serialized, | ||||||||
| boolean hasVersionInKey, | ||||||||
| final boolean clearBasedOnPreviousSizeInfo, @Nullable final FDBStoredSizes previousSizeInfo, | ||||||||
| @Nullable SizeInfo sizeInfo) { | ||||||||
| final Transaction tr = context.ensureActive(); | ||||||||
| final Subspace keySplitSubspace = subspace.subspace(key); | ||||||||
| clearPreviousSplitRecord(context, subspace, key, clearBasedOnPreviousSizeInfo, previousSizeInfo); | ||||||||
| if (!hasVersionInKey) { | ||||||||
| // Note that the clearPreviousSplitRecords also removes version splits from the context cache | ||||||||
| // This is not currently supported for the case where we have versions in the keys since we can't trace the old values down | ||||||||
| clearPreviousSplitRecord(context, subspace, key, clearBasedOnPreviousSizeInfo, previousSizeInfo); | ||||||||
| } | ||||||||
| long index = SplitHelper.START_SPLIT_RECORD; | ||||||||
| int offset = 0; | ||||||||
| while (offset < serialized.length) { | ||||||||
| int nextOffset = offset + SplitHelper.SPLIT_RECORD_SIZE; | ||||||||
| if (nextOffset > serialized.length) { | ||||||||
| nextOffset = serialized.length; | ||||||||
| } | ||||||||
| final byte[] keyBytes = keySplitSubspace.pack(index); | ||||||||
| final byte[] valueBytes = Arrays.copyOfRange(serialized, offset, nextOffset); | ||||||||
| tr.set(keyBytes, valueBytes); | ||||||||
| final byte[] keyBytes = writeSplitValue(context, subspace, key.add(index), valueBytes); | ||||||||
| if (sizeInfo != null) { | ||||||||
| if (offset == 0) { | ||||||||
| sizeInfo.set(keyBytes, valueBytes); | ||||||||
|
|
@@ -192,6 +203,28 @@ | |||||||
| } | ||||||||
| } | ||||||||
|
|
||||||||
| private static byte[] writeSplitValue(FDBRecordContext context, Subspace subspace, Tuple recordKey, byte[] serialized) { | ||||||||
| byte[] keyBytes; | ||||||||
| if (recordKey.hasIncompleteVersionstamp()) { | ||||||||
| keyBytes = subspace.packWithVersionstamp(recordKey); | ||||||||
| byte[] current = context.addVersionMutation( | ||||||||
| MutationType.SET_VERSIONSTAMPED_KEY, | ||||||||
| keyBytes, | ||||||||
| serialized); | ||||||||
| if (current != null) { | ||||||||
| // This should never happen. It means that the same key (and suffix) and local version were used for subsequent | ||||||||
| // write. It is most likely not an intended flow and this check will protect against that. | ||||||||
| // It is an incomplete check since the same record can have different suffixes to the primary key that would not collide. | ||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yup, done. |
||||||||
| // Namely, if one is split, and one is not split they will not overlap. Anything else and the would. | ||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
| throw new RecordCoreInternalException("Key with version overwritten"); | ||||||||
| } | ||||||||
| } else { | ||||||||
| keyBytes = subspace.pack(recordKey); | ||||||||
| context.ensureActive().set(keyBytes, serialized); | ||||||||
| } | ||||||||
| return keyBytes; | ||||||||
| } | ||||||||
|
|
||||||||
| @SuppressWarnings("PMD.CloseResource") | ||||||||
| private static void writeVersion(@Nonnull final FDBRecordContext context, @Nonnull final Subspace subspace, @Nonnull final Tuple key, | ||||||||
| @Nullable final FDBRecordVersion version, @Nullable final SizeInfo sizeInfo) { | ||||||||
|
|
@@ -201,11 +234,11 @@ | |||||||
| } | ||||||||
| return; | ||||||||
| } | ||||||||
| final Transaction tr = context.ensureActive(); | ||||||||
| // At this point we know the key does not have a version | ||||||||
| final byte[] keyBytes = subspace.pack(key.add(RECORD_VERSION)); | ||||||||
| final byte[] valueBytes = packVersion(version); | ||||||||
| if (version.isComplete()) { | ||||||||
| tr.set(keyBytes, valueBytes); | ||||||||
| context.ensureActive().set(keyBytes, valueBytes); | ||||||||
| } else { | ||||||||
| context.addVersionMutation(MutationType.SET_VERSIONSTAMPED_VALUE, keyBytes, valueBytes); | ||||||||
| context.addToLocalVersionCache(keyBytes, version.getLocalVersion()); | ||||||||
|
|
||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is still a little unclear. It might be that we're conceiving of the problem in different ways.
My point has been that you can (and should) clear out the keys from the version mutation cache if you update the same key (with an incomplete version) multiple times in the same transaction. This comment is pointing out the (also true) statement that it is incorrect for the user to provide a
Tuplewith an incomplete version associated with a previous transaction.I might rephrase it as something like:
It probably could be more succint. I kind of feel like the more important point is just that we need to search through a data structure that we currently don't look through (the
versionMutationCache) to do the right thing here, but I do kind of see the point that any incomplete keys committed during previous transactions are now accessed via different complete keys