diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/FDBRecordStore.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/FDBRecordStore.java index f55ec680ca..0d80af3ab5 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/FDBRecordStore.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/FDBRecordStore.java @@ -73,7 +73,6 @@ import com.apple.foundationdb.record.metadata.Index; import com.apple.foundationdb.record.metadata.IndexAggregateFunction; import com.apple.foundationdb.record.metadata.IndexRecordFunction; -import com.apple.foundationdb.record.metadata.IndexTypes; import com.apple.foundationdb.record.metadata.JoinedRecordType; import com.apple.foundationdb.record.metadata.Key; import com.apple.foundationdb.record.metadata.MetaDataException; @@ -86,8 +85,6 @@ import com.apple.foundationdb.record.metadata.expressions.KeyExpression; import com.apple.foundationdb.record.provider.common.DynamicMessageRecordSerializer; import com.apple.foundationdb.record.provider.common.RecordSerializer; -import com.apple.foundationdb.record.provider.foundationdb.indexing.IndexingHeartbeat; -import com.apple.foundationdb.record.provider.foundationdb.indexing.IndexingRangeSet; import com.apple.foundationdb.record.provider.foundationdb.keyspace.KeySpacePath; import com.apple.foundationdb.record.provider.foundationdb.storestate.FDBRecordStoreStateCache; import com.apple.foundationdb.record.provider.foundationdb.storestate.FDBRecordStoreStateCacheEntry; @@ -114,7 +111,6 @@ import com.apple.foundationdb.tuple.TupleHelpers; import com.apple.foundationdb.util.LoggableException; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableMap; import com.google.protobuf.ByteString; import com.google.protobuf.Descriptors; @@ -130,10 +126,8 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -141,12 +135,10 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.IntFunction; import java.util.function.Predicate; -import java.util.function.Supplier; import java.util.function.UnaryOperator; import java.util.stream.Collectors; @@ -269,7 +261,7 @@ public class FDBRecordStore extends FDBStoreBase implements FDBRecordStoreBase indexStateReadConflicts = ConcurrentHashMap.newKeySet(8); + final Set indexStateReadConflicts = ConcurrentHashMap.newKeySet(8); + + @Nonnull + final IndexStateManager indexStateManager; @Nonnull private final PlanSerializationRegistry planSerializationRegistry; @@ -348,6 +343,7 @@ protected FDBRecordStore(@Nonnull FDBRecordContext context, this.omitUnsplitRecordSuffix = !formatVersion.isAtLeast(FormatVersion.SAVE_UNSPLIT_WITH_SUFFIX); this.preloadCache = new FDBPreloadRecordCache(PRELOAD_CACHE_SIZE); this.planSerializationRegistry = planSerializationRegistry; + this.indexStateManager = new IndexStateManager(this); } @Override @@ -399,7 +395,7 @@ public int getUserVersion() { return userVersion; } - private boolean useOldVersionFormat() { + boolean useOldVersionFormat() { return useOldVersionFormat(getFormatVersion(), omitUnsplitRecordSuffix); } @@ -685,7 +681,7 @@ private void saveVersionWithOldFormat(@Nonnull Tuple primaryKey, @Nonnull FDBRec } @Nonnull - private Tuple recordVersionKey(@Nonnull Tuple primaryKey) { + Tuple recordVersionKey(@Nonnull Tuple primaryKey) { if (useOldVersionFormat()) { return Tuple.from(RECORD_VERSION_KEY).addAll(primaryKey); } else { @@ -693,6 +689,12 @@ private Tuple recordVersionKey(@Nonnull Tuple primaryKey) { } } + @API(API.Status.INTERNAL) + @VisibleForTesting + public Subspace getLegacyVersionSubspace() { + return getSubspace().subspace(Tuple.from(RECORD_VERSION_KEY)); + } + @Nonnull @SuppressWarnings("PMD.CompareObjectsWithEquals") private CompletableFuture updateSecondaryIndexes(@Nullable final FDBStoredRecord oldRecord, @@ -2770,40 +2772,18 @@ private static void validateStoreLockState(@Nonnull RecordMetaDataProto.DataStor } } - /** - * Schedule a pre-commit hook to look for replaced indexes. This method delays executing the replaced-indexes - * check logic until right before the transaction commits, and it will only schedule the check at most once. - * This is to prevent multiple instances of that logic running at the same time. Because - * {@link #removeReplacedIndexes()} needs to both read and (potentially) update the index state information, running - * that method concurrently with itself or with other index state updates can result in errors. So, unless it - * can be guaranteed via other means that that method is the only method removing replacement indexes, this - * method should be preferred over calling the method directly. - * - * @param changed whether the index state information has changed - * @return whether the commit check was scheduled - * @see #removeReplacedIndexes() - * @see com.apple.foundationdb.record.metadata.IndexOptions#REPLACED_BY_OPTION_PREFIX - */ - private boolean addRemoveReplacedIndexesCommitCheckIfChanged(boolean changed) { - if (changed) { - final String commitCheckName = "removeReplacedIndexes_" + ByteArrayUtil2.toHexString(getSubspace().pack()); - getRecordContext().getOrCreateCommitCheck(commitCheckName, name -> this::removeReplacedIndexes); - } - return changed; - } - /** * Remove any replaced indexes if the index state information has changed. This executes right away (if the * index states have, in fact, changed). Note that it is unsafe to call {@link #removeReplacedIndexes()} twice * concurrently on the same record store, so this method should only be called if it can be guaranteed to be * the only thing calling {@link #removeReplacedIndexes()} at that time. If this cannot be guaranteed, then - * calling {@link #addRemoveReplacedIndexesCommitCheckIfChanged(boolean)} will ensure that the check is eventually + * calling {@link IndexStateManager#addRemoveReplacedIndexesCommitCheckIfChanged(boolean)} will ensure that the check is eventually * run once and therefore protects against concurrent accesses. * * @param changed whether index state information has changed * @return a future that will return whether {@link #removeReplacedIndexes()} was called * @see #removeReplacedIndexes() - * @see #addRemoveReplacedIndexesCommitCheckIfChanged(boolean) + * @see IndexStateManager#addRemoveReplacedIndexesCommitCheckIfChanged(boolean) * @see com.apple.foundationdb.record.metadata.IndexOptions#REPLACED_BY_OPTION_PREFIX */ @Nonnull @@ -2865,19 +2845,19 @@ private CompletableFuture removeReplacedIndexes() { } private void beginRecordStoreStateRead() { - recordStoreStateRef.get().beginRead(); + indexStateManager.beginRecordStoreStateRead(); } private void endRecordStoreStateRead() { - recordStoreStateRef.get().endRead(); + indexStateManager.endRecordStoreStateRead(); } private void beginRecordStoreStateWrite() { - recordStoreStateRef.get().beginWrite(); + indexStateManager.beginRecordStoreStateWrite(); } private void endRecordStoreStateWrite() { - recordStoreStateRef.get().endWrite(); + indexStateManager.endRecordStoreStateWrite(); } @Nonnull @@ -2975,14 +2955,7 @@ private static CompletableFuture readStoreFirstKey(@Nonnull FDBRecordC @Nonnull @SuppressWarnings("PMD.CloseResource") public CompletableFuture rebuildAllIndexes() { - // Note that index states are *not* cleared, as rebuilding the indexes resets each state - context.clear(getSubspace().range(Tuple.from(INDEX_KEY))); - context.clear(getSubspace().range(Tuple.from(INDEX_SECONDARY_SPACE_KEY))); - context.clear(getSubspace().range(Tuple.from(INDEX_RANGE_SPACE_KEY))); - context.clear(getSubspace().range(Tuple.from(INDEX_UNIQUENESS_VIOLATIONS_KEY))); - List> work = new LinkedList<>(); - addRebuildRecordCountsJob(work); - return rebuildIndexes(getRecordMetaData().getIndexesToBuildSince(-1), Collections.emptyMap(), work, RebuildIndexReason.REBUILD_ALL, null); + return indexStateManager.rebuildAllIndexes(); } /** @@ -2998,22 +2971,12 @@ public CompletableFuture rebuildAllIndexes() { */ @Nonnull public Map> getIndexesToBuild() { - if (recordStoreStateRef.get() == null) { - throw uninitializedStoreException("cannot get indexes to build on uninitialized store"); - } - final Map> indexesToBuild = getRecordMetaData().getIndexesToBuildSince(-1); - beginRecordStoreStateRead(); - try { - indexesToBuild.keySet().removeIf(this::isIndexReadable); - return indexesToBuild; - } finally { - endRecordStoreStateRead(); - } + return indexStateManager.getIndexesToBuild(); } @Nonnull public CompletableFuture clearAndMarkIndexWriteOnly(@Nonnull String indexName) { - return clearAndMarkIndexWriteOnly(metaDataProvider.getRecordMetaData().getIndex(indexName)); + return indexStateManager.clearAndMarkIndexWriteOnly(indexName); } /** @@ -3023,8 +2986,7 @@ public CompletableFuture clearAndMarkIndexWriteOnly(@Nonnull String indexN */ @Nonnull public CompletableFuture clearAndMarkIndexWriteOnly(@Nonnull Index index) { - return markIndexWriteOnly(index) - .thenRun(() -> clearIndexData(index)); + return indexStateManager.clearAndMarkIndexWriteOnly(index); } /** @@ -3152,7 +3114,7 @@ public boolean setStateCacheability(boolean cacheable) { return context.asyncToSync(FDBStoreTimer.Waits.WAIT_SET_STATE_CACHEABILITY, setStateCacheabilityAsync(cacheable)); } - private boolean isStateCacheableInternal() { + boolean isStateCacheableInternal() { if (recordStoreStateRef.get() == null) { throw uninitializedStoreException("cannot check record store state cacheability on uninitialized store"); } @@ -3472,94 +3434,6 @@ public CompletableFuture updateIncarnation(@Nonnull IntFunction u }); } - // Actually (1) writes the index state to the database and (2) updates the cached state with the new state - @SuppressWarnings("PMD.CloseResource") - private void updateIndexState(@Nonnull String indexName, byte[] indexKey, @Nonnull IndexState indexState) { - if (LOGGER.isInfoEnabled()) { - LOGGER.info(KeyValueLogMessage.of("index state change", - LogMessageKeys.INDEX_NAME, indexName, - LogMessageKeys.TARGET_INDEX_STATE, indexState.name(), - subspaceProvider.logKey(), subspaceProvider.toString(context) - )); - } - if (recordStoreStateRef.get() == null) { - throw uninitializedStoreException("cannot update index state on an uninitialized store"); - } - // This is generally called by someone who should already have a write lock, but adding them here - // defensively shouldn't cause problems. - beginRecordStoreStateWrite(); - try { - context.setDirtyStoreState(true); - if (isStateCacheableInternal()) { - // The cache contains index state information, so updates to this information must also - // update the meta-data version stamp or instances might cache state index states. - context.setMetaDataVersionStamp(); - } - Transaction tr = context.ensureActive(); - if (IndexState.READABLE.equals(indexState)) { - tr.clear(indexKey); - } else { - tr.set(indexKey, Tuple.from(indexState.code()).pack()); - } - recordStoreStateRef.updateAndGet(state -> { - // See beginRecordStoreStateRead() on why setting state is done in updateAndGet(). - state.setState(indexName, indexState); - return state; - }); - } finally { - endRecordStoreStateWrite(); - } - } - - @Nonnull - @SuppressWarnings("PMD.CloseResource") - private CompletableFuture markIndexNotReadable(@Nonnull String indexName, @Nonnull IndexState indexState) { - if (recordStoreStateRef.get() == null) { - return preloadRecordStoreStateAsync().thenCompose(vignore -> markIndexNotReadable(indexName, indexState)); - } - - addIndexStateReadConflict(indexName); - - beginRecordStoreStateWrite(); - boolean haveFuture = false; - try { - // A read is done before the write in order to avoid having unnecessary - // updates cause spurious not_committed errors. - byte[] indexKey = indexStateSubspace().pack(indexName); - Transaction tr = context.ensureActive(); - CompletableFuture future = tr.get(indexKey).thenCompose(previous -> { - if (previous == null) { - IndexingRangeSet indexRangeSet = IndexingRangeSet.forIndexBuild(this, getRecordMetaData().getIndex(indexName)); - return indexRangeSet.isEmptyAsync().thenCompose(empty -> { - if (empty) { - // For readable indexes, we have an optimization where the range set is cleared out - // after the index is build to avoid carrying extra meta-data about the index range - // set. However, when we mark an index as write-only, we want to preserve the record - // that the index was completely built (if the range set was empty, i.e., cleared) - return indexRangeSet.insertRangeAsync(null, null); - } else { - return AsyncUtil.READY_FALSE; - } - }).thenApply(ignore -> { - updateIndexState(indexName, indexKey, indexState); - return true; - }); - } else if (!Tuple.fromBytes(previous).get(0).equals(indexState.code())) { - updateIndexState(indexName, indexKey, indexState); - return AsyncUtil.READY_TRUE; - } else { - return AsyncUtil.READY_FALSE; - } - }).whenComplete((b, t) -> endRecordStoreStateWrite()); - haveFuture = true; - return future; - } finally { - if (!haveFuture) { - endRecordStoreStateWrite(); - } - } - } - /** * Adds the index of the given name to the list of write-only indexes stored within the store. * This will update the list stored within database. @@ -3574,7 +3448,7 @@ private CompletableFuture markIndexNotReadable(@Nonnull String indexNam */ @Nonnull public CompletableFuture markIndexWriteOnly(@Nonnull String indexName) { - return markIndexNotReadable(indexName, IndexState.WRITE_ONLY); + return indexStateManager.markIndexWriteOnly(indexName); } /** @@ -3588,7 +3462,7 @@ public CompletableFuture markIndexWriteOnly(@Nonnull String indexName) */ @Nonnull public CompletableFuture markIndexWriteOnly(@Nonnull Index index) { - return markIndexWriteOnly(index.getName()); + return indexStateManager.markIndexWriteOnly(index); } /** @@ -3621,12 +3495,7 @@ public CompletableFuture markIndexDisabled(@Nonnull String indexName) { */ @Nonnull public CompletableFuture markIndexDisabled(@Nonnull Index index) { - return markIndexNotReadable(index.getName(), IndexState.DISABLED).thenApply(changed -> { - if (changed) { - clearIndexData(index); - } - return changed; - }); + return indexStateManager.markIndexDisabled(index); } /** @@ -3640,11 +3509,7 @@ public CompletableFuture markIndexDisabled(@Nonnull Index index) { @Nonnull @SuppressWarnings("PMD.CloseResource") public CompletableFuture> firstUnbuiltRange(@Nonnull Index index) { - if (!getRecordMetaData().hasIndex(index.getName())) { - throw new MetaDataException("Index " + index.getName() + " does not exist in meta-data."); - } - IndexingRangeSet rangeSet = IndexingRangeSet.forIndexBuild(this, index); - return rangeSet.firstMissingRangeAsync().thenApply(Optional::ofNullable); + return indexStateManager.firstUnbuiltRange(index); } /** @@ -3690,7 +3555,7 @@ public Range getUnbuiltRange() { */ @Nonnull public CompletableFuture markIndexReadableOrUniquePending(@Nonnull Index index) { - return markIndexReadable(index, true); + return indexStateManager.markIndexReadableOrUniquePending(index); } /** @@ -3706,37 +3571,7 @@ public CompletableFuture markIndexReadableOrUniquePending(@Nonnull Inde @Nonnull @SuppressWarnings("PMD.CloseResource") public CompletableFuture markIndexReadable(@Nonnull Index index) { - return markIndexReadable(index, false); - } - - @Nonnull - private CompletableFuture markIndexReadable(@Nonnull Index index, boolean allowUniquePending) { - if (recordStoreStateRef.get() == null) { - return preloadRecordStoreStateAsync().thenCompose(vignore -> markIndexReadable(index, allowUniquePending)); - } - - addIndexStateReadConflict(index.getName()); - - beginRecordStoreStateWrite(); - boolean haveFuture = false; - try { - @SuppressWarnings("PMD.CloseResource") - Transaction tr = ensureContextActive(); - byte[] indexKey = indexStateSubspace().pack(index.getName()); - CompletableFuture future = tr.get(indexKey).thenCompose(previous -> { - if (previous != null) { - return checkAndUpdateBuiltIndexState(index, indexKey, allowUniquePending); - } else { - return AsyncUtil.READY_FALSE; - } - }).whenComplete((b, t) -> endRecordStoreStateWrite()).thenApply(this::addRemoveReplacedIndexesCommitCheckIfChanged); - haveFuture = true; - return future; - } finally { - if (!haveFuture) { - endRecordStoreStateWrite(); - } - } + return indexStateManager.markIndexReadable(index); } /** @@ -3759,69 +3594,7 @@ public CompletableFuture markIndexReadable(@Nonnull String indexName) { return markIndexReadable(getRecordMetaData().getIndex(indexName)); } - private CompletableFuture checkAndUpdateBuiltIndexState(Index index, byte[] indexKey, boolean allowUniquePending) { - // An extension function to reduce markIndexReadable's complexity - CompletableFuture> builtFuture = firstUnbuiltRange(index); - CompletableFuture> uniquenessFuture; - if (index.isUnique()) { - // we wait for all the commit checks and then scan, because if the index is WriteOnly, the commit check - // won't throw an error, it will just record a violation. - // If the index is ReadableUniquePending it will throw an error here, but this would only happen if you - // added the violation in this transaction, and you normally aren't allowed to add violations to - // indexes that ReadableUniquePending - uniquenessFuture = whenAllIndexUniquenessCommitChecks(index) - .thenCompose(vignore -> scanUniquenessViolations(index, 1).first()); - } else { - uniquenessFuture = CompletableFuture.completedFuture(Optional.empty()); - } - return CompletableFuture.allOf(builtFuture, uniquenessFuture).thenApply(vignore -> { - Optional firstUnbuilt = context.join(builtFuture); - Optional uniquenessViolation = context.join(uniquenessFuture); - - if (firstUnbuilt.isPresent()) { - throw new IndexNotBuiltException("Attempted to make unbuilt index readable" , firstUnbuilt.get(), - LogMessageKeys.INDEX_NAME, index.getName(), - "unbuiltRangeBegin", ByteArrayUtil2.loggable(firstUnbuilt.get().begin), - "unbuiltRangeEnd", ByteArrayUtil2.loggable(firstUnbuilt.get().end), - subspaceProvider.logKey(), subspaceProvider.toString(context), - LogMessageKeys.SUBSPACE_KEY, index.getSubspaceKey()); - } else if (uniquenessViolation.isPresent()) { - if (allowUniquePending) { - if (isIndexReadableUniquePending(index)) { - return false; // Unchanged - } - updateIndexState(index.getName(), indexKey, IndexState.READABLE_UNIQUE_PENDING); - // Keeping the index build data until the uniqueness violations are resolved. The build data will be - // cleared only at READABLE state. - return true; - } - RecordIndexUniquenessViolation wrapped = new RecordIndexUniquenessViolation("Uniqueness violation when making index readable", - uniquenessViolation.get()); - wrapped.addLogInfo( - LogMessageKeys.INDEX_NAME, index.getName(), - subspaceProvider.logKey(), subspaceProvider.toString(context)); - throw wrapped; - } else { - updateIndexState(index.getName(), indexKey, IndexState.READABLE); - clearReadableIndexBuildData(index); - return true; - } - }); - } - - private void logExceptionAsWarn(KeyValueLogMessage message, Throwable exception) { - if (LOGGER.isWarnEnabled()) { - for (Throwable ex = exception; - ex != null; - ex = ex.getCause()) { - if (ex instanceof LoggableException) { - message.addKeysAndValues(((LoggableException)ex).getLogInfo()); - } - } - message.addKeyAndValue(subspaceProvider.logKey(), subspaceProvider.toString(context)); - LOGGER.warn(message.toString(), exception); - } - } + // Delegated to IndexStateManager /** * Marks the index with the given name as readable without checking to see if it is @@ -3835,32 +3608,7 @@ private void logExceptionAsWarn(KeyValueLogMessage message, Throwable exception) @Nonnull @SuppressWarnings("PMD.CloseResource") public CompletableFuture uncheckedMarkIndexReadable(@Nonnull String indexName) { - if (recordStoreStateRef.get() == null) { - return preloadRecordStoreStateAsync().thenCompose(vignore -> uncheckedMarkIndexReadable(indexName)); - } - - addIndexStateReadConflict(indexName); - - beginRecordStoreStateWrite(); - boolean haveFuture = false; - try { - Transaction tr = ensureContextActive(); - byte[] indexKey = indexStateSubspace().pack(indexName); - CompletableFuture future = tr.get(indexKey).thenApply(previous -> { - if (previous != null) { - updateIndexState(indexName, indexKey, IndexState.READABLE); - return true; - } else { - return false; - } - }).whenComplete((b, t) -> endRecordStoreStateWrite()).thenApply(this::addRemoveReplacedIndexesCommitCheckIfChanged); - haveFuture = true; - return future; - } finally { - if (!haveFuture) { - endRecordStoreStateWrite(); - } - } + return indexStateManager.uncheckedMarkIndexReadable(indexName); } /** @@ -3982,7 +3730,7 @@ private CompletableFuture> loadIndexStatesAsync(@Nonnull * Add a read conflict key for all records. */ @SuppressWarnings("PMD.CloseResource") - private void addRecordsReadConflict() { + void addRecordsReadConflict() { if (recordsReadConflict) { return; } @@ -3997,7 +3745,7 @@ private void addRecordsReadConflict() { * @param indexName the index to conflict on, if it's state changes */ @SuppressWarnings("PMD.CloseResource") - private void addIndexStateReadConflict(@Nonnull String indexName) { + void addIndexStateReadConflict(@Nonnull String indexName) { if (!getRecordMetaData().hasIndex(indexName)) { throw new MetaDataException("Index " + indexName + " does not exist in meta-data."); } @@ -4015,7 +3763,7 @@ private void addIndexStateReadConflict(@Nonnull String indexName) { * Add a read conflict key for the whole record store state. */ @SuppressWarnings("PMD.CloseResource") - private void addStoreStateReadConflict() { + void addStoreStateReadConflict() { if (storeStateReadConflict) { return; } @@ -4038,7 +3786,7 @@ private void addStoreStateReadConflict() { */ @Nonnull public IndexState getIndexState(@Nonnull Index index) { - return getIndexState(index.getName()); + return indexStateManager.getIndexState(index); } /** @@ -4052,8 +3800,7 @@ public IndexState getIndexState(@Nonnull Index index) { */ @Nonnull public IndexState getIndexState(@Nonnull String indexName) { - addIndexStateReadConflict(indexName); - return getRecordStoreState().getState(indexName); + return indexStateManager.getIndexState(indexName); } /** @@ -4068,7 +3815,7 @@ public IndexState getIndexState(@Nonnull String indexName) { * @throws IllegalArgumentException if no index in the metadata has the same name as this index */ public boolean isIndexReadable(@Nonnull Index index) { - return isIndexReadable(index.getName()); + return indexStateManager.isIndexReadable(index); } /** @@ -4081,7 +3828,7 @@ public boolean isIndexReadable(@Nonnull Index index) { * @throws IllegalArgumentException if no index in the metadata has the given name */ public boolean isIndexReadable(@Nonnull String indexName) { - return getIndexState(indexName).equals(IndexState.READABLE); + return indexStateManager.isIndexReadable(indexName); } /** @@ -4099,7 +3846,7 @@ public boolean isIndexReadable(@Nonnull String indexName) { * @throws IllegalArgumentException if no index in the metadata has the same name as this index */ public boolean isIndexReadableUniquePending(@Nonnull Index index) { - return isIndexReadableUniquePending(index.getName()); + return indexStateManager.isIndexReadableUniquePending(index); } /** @@ -4115,7 +3862,7 @@ public boolean isIndexReadableUniquePending(@Nonnull Index index) { * @throws IllegalArgumentException if no index in the metadata has the given name */ public boolean isIndexReadableUniquePending(@Nonnull String indexName) { - return getIndexState(indexName).equals(IndexState.READABLE_UNIQUE_PENDING); + return indexStateManager.isIndexReadableUniquePending(indexName); } /** @@ -4129,7 +3876,7 @@ public boolean isIndexReadableUniquePending(@Nonnull String indexName) { * @throws IllegalArgumentException if no index in the metadata has the given name */ public boolean isIndexScannable(@Nonnull Index index) { - return isIndexScannable(index.getName()); + return indexStateManager.isIndexScannable(index); } /** @@ -4143,7 +3890,7 @@ public boolean isIndexScannable(@Nonnull Index index) { * @throws IllegalArgumentException if no index in the metadata has the given name */ public boolean isIndexScannable(@Nonnull String indexName) { - return getIndexState(indexName).isScannable(); + return indexStateManager.isIndexScannable(indexName); } /** @@ -4158,7 +3905,7 @@ public boolean isIndexScannable(@Nonnull String indexName) { * @throws IllegalArgumentException if no index in the metadata has the same name as this index */ public boolean isIndexWriteOnly(@Nonnull Index index) { - return isIndexWriteOnly(index.getName()); + return indexStateManager.isIndexWriteOnly(index); } /** @@ -4171,7 +3918,7 @@ public boolean isIndexWriteOnly(@Nonnull Index index) { * @throws IllegalArgumentException if no index in the metadata has the given name */ public boolean isIndexWriteOnly(@Nonnull String indexName) { - return getIndexState(indexName).equals(IndexState.WRITE_ONLY); + return indexStateManager.isIndexWriteOnly(indexName); } /** @@ -4186,7 +3933,7 @@ public boolean isIndexWriteOnly(@Nonnull String indexName) { * @throws IllegalArgumentException if no index in the metadata has the same name as this index */ public boolean isIndexDisabled(@Nonnull Index index) { - return isIndexDisabled(index.getName()); + return indexStateManager.isIndexDisabled(index); } /** @@ -4199,7 +3946,7 @@ public boolean isIndexDisabled(@Nonnull Index index) { * @throws IllegalArgumentException if no index in the metadata has the given name */ public boolean isIndexDisabled(@Nonnull String indexName) { - return getIndexState(indexName).equals(IndexState.DISABLED); + return indexStateManager.isIndexDisabled(indexName); } /** @@ -4352,27 +4099,7 @@ public List getEnabledUniversalIndexes() { */ @Nonnull public Map getAllIndexStates() { - final RecordStoreState localRecordStoreState = getRecordStoreState(); - localRecordStoreState.beginRead(); - try { - addStoreStateReadConflict(); - return getRecordMetaData().getAllIndexes().stream() - .collect(Collectors.toMap(Function.identity(), localRecordStoreState::getState)); - } finally { - localRecordStoreState.endRead(); - } - } - - private CompletableFuture> rebuildIndexesGetDesiredIndexStates( - @Nonnull List> preWork, - @Nonnull Map> newStates) { - final ConcurrentHashMap desiredIndexStates = new ConcurrentHashMap<>(); - // Combine pre-existing work and newStates resolution into a single list of futures - final List> allWork = new ArrayList<>(preWork); - for (Map.Entry> entry : newStates.entrySet()) { - allWork.add(entry.getValue().thenAccept(state -> desiredIndexStates.put(entry.getKey(), state))); - } - return AsyncUtil.whenAll(allWork).thenApply(ignore -> desiredIndexStates); + return indexStateManager.getAllIndexStates(); } @Nonnull @@ -4381,10 +4108,7 @@ protected CompletableFuture rebuildIndexes(@Nonnull Map> work, @Nonnull RebuildIndexReason reason, @Nullable Integer oldMetaDataVersion) { - // Finish any pre-existing work items and resolve desired index states (which may query index states) before - // rebuilding indexes (which writes index states) - return rebuildIndexesGetDesiredIndexStates(work, newStates).thenCompose(desiredIndexStates -> - rebuildIndexes(indexes, desiredIndexStates, reason, oldMetaDataVersion)); + return indexStateManager.rebuildIndexes(indexes, newStates, work, reason, oldMetaDataVersion); } @Nonnull @@ -4392,43 +4116,7 @@ protected CompletableFuture rebuildIndexes(@Nonnull Map desiredIndexStates, @Nonnull RebuildIndexReason reason, @Nullable Integer oldMetaDataVersion) { - List> work = new ArrayList<>(); - Iterator>> indexIter = indexes.entrySet().iterator(); - return AsyncUtil.whileTrue(() -> { - Iterator> workIter = work.iterator(); - while (workIter.hasNext()) { - CompletableFuture workItem = workIter.next(); - if (workItem.isDone()) { - context.asyncToSync(FDBStoreTimer.Waits.WAIT_ERROR_CHECK, workItem); // Just for error handling. - workIter.remove(); - } - } - while (work.size() < MAX_PARALLEL_INDEX_REBUILD) { - if (indexIter.hasNext()) { - Map.Entry> indexItem = indexIter.next(); - Index index = indexItem.getKey(); - List recordTypes = indexItem.getValue(); - IndexState indexState = desiredIndexStates.getOrDefault(index, IndexState.READABLE); - final StringBuilder errMessageBuilder = new StringBuilder("unable to "); - final CompletableFuture rebuildOrMarkIndexSafely = MoreAsyncUtil.handleOnException( - () -> rebuildOrMarkIndex(index, indexState, recordTypes, reason, oldMetaDataVersion, errMessageBuilder), - exception -> { - // If there is any issue, simply mark the index as disabled without blocking checkVersion - logExceptionAsWarn(KeyValueLogMessage.build(errMessageBuilder.toString(), - LogMessageKeys.INDEX_NAME, index.getName() - ), exception); - return markIndexDisabled(index).thenApply(b -> null); - }); - work.add(rebuildOrMarkIndexSafely); - } else { - break; - } - } - if (work.isEmpty()) { - return AsyncUtil.READY_FALSE; - } - return AsyncUtil.whenAny(work).thenApply(v -> true); - }, getExecutor()); + return indexStateManager.rebuildIndexes(indexes, desiredIndexStates, reason, oldMetaDataVersion); } /** @@ -4449,60 +4137,11 @@ public enum RebuildIndexReason { } } - private boolean areAllRecordTypesSince(@Nullable Collection recordTypes, @Nullable Integer oldMetaDataVersion) { - return oldMetaDataVersion != null && (oldMetaDataVersion == -1 || (recordTypes != null && recordTypes.stream().allMatch(recordType -> { - Integer sinceVersion = recordType.getSinceVersion(); - return sinceVersion != null && sinceVersion > oldMetaDataVersion; - }))); - } - protected CompletableFuture rebuildOrMarkIndex(@Nonnull Index index, @Nonnull IndexState indexState, @Nullable List recordTypes, @Nonnull RebuildIndexReason reason, @Nullable Integer oldMetaDataVersion, @Nonnull StringBuilder errMessageBuilder) { - // Skip index rebuild if the index is on new record types. This may fail because of reusing an index name whose - // state hasn't been cleared. - if (indexState != IndexState.DISABLED && areAllRecordTypesSince(recordTypes, oldMetaDataVersion)) { - errMessageBuilder.append("rebuild index with no records"); - return rebuildIndexWithNoRecord(index, reason); - } - - switch (indexState) { - case WRITE_ONLY: - errMessageBuilder.append("clear and mark index write only"); - return clearAndMarkIndexWriteOnly(index).thenApply(b -> null); - case DISABLED: - errMessageBuilder.append("mark index disabled"); - return markIndexDisabled(index).thenApply(b -> null); - case READABLE: - default: - errMessageBuilder.append("rebuild index"); - return rebuildIndex(index, reason); - } - } - - @Nonnull - private CompletableFuture rebuildIndexWithNoRecord(@Nonnull final Index index, @Nonnull RebuildIndexReason reason) { - final boolean newStore = reason == RebuildIndexReason.NEW_STORE; - if (newStore ? LOGGER.isDebugEnabled() : LOGGER.isInfoEnabled()) { - final KeyValueLogMessage msg = KeyValueLogMessage.build("rebuilding index with no record", - LogMessageKeys.INDEX_NAME, index.getName(), - LogMessageKeys.INDEX_VERSION, index.getLastModifiedVersion(), - LogMessageKeys.REASON, reason.name(), - subspaceProvider.logKey(), subspaceProvider.toString(context), - LogMessageKeys.SUBSPACE_KEY, index.getSubspaceKey()); - if (newStore) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug(msg.toString()); - } - } else { - if (LOGGER.isInfoEnabled()) { - LOGGER.info(msg.toString()); - } - } - } - - return markIndexReadable(index).thenApply(b -> null); + return indexStateManager.rebuildOrMarkIndex(index, indexState, recordTypes, reason, oldMetaDataVersion, errMessageBuilder); } /** @@ -4524,7 +4163,7 @@ private CompletableFuture rebuildIndexWithNoRecord(@Nonnull final Index in */ @Nonnull public CompletableFuture rebuildIndex(@Nonnull Index index) { - return rebuildIndex(index, RebuildIndexReason.EXPLICIT); + return indexStateManager.rebuildIndex(index); } @API(API.Status.INTERNAL) @@ -4532,496 +4171,14 @@ public CompletableFuture rebuildIndex(@Nonnull Index index) { @VisibleForTesting @SuppressWarnings({"squid:S2095", "PMD.CloseResource"}) // Resource usage for indexBuilder is too complicated for rules. public CompletableFuture rebuildIndex(@Nonnull final Index index, @Nonnull RebuildIndexReason reason) { - final boolean newStore = reason == RebuildIndexReason.NEW_STORE; - if (newStore ? LOGGER.isDebugEnabled() : LOGGER.isInfoEnabled()) { - final KeyValueLogMessage msg = KeyValueLogMessage.build("rebuilding index", - LogMessageKeys.INDEX_NAME, index.getName(), - LogMessageKeys.INDEX_VERSION, index.getLastModifiedVersion(), - LogMessageKeys.REASON, reason.name(), - subspaceProvider.logKey(), subspaceProvider.toString(context), - LogMessageKeys.SUBSPACE_KEY, index.getSubspaceKey()); - if (newStore) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug(msg.toString()); - } - } else { - if (LOGGER.isInfoEnabled()) { - LOGGER.info(msg.toString()); - } - } - } - - long startTime = System.nanoTime(); - OnlineIndexer indexBuilder = OnlineIndexer.newBuilder().setRecordStore(this).setIndex(index).build(); - CompletableFuture future = indexBuilder.rebuildIndexAsync(this) - .thenCompose(vignore -> markIndexReadable(index)) - .handle((b, t) -> { - if (t != null) { - logExceptionAsWarn(KeyValueLogMessage.build("rebuilding index failed", - LogMessageKeys.INDEX_NAME, index.getName(), - LogMessageKeys.INDEX_VERSION, index.getLastModifiedVersion(), - LogMessageKeys.REASON, reason.name(), - LogMessageKeys.SUBSPACE_KEY, index.getSubspaceKey()), t); - } - // Only call method that builds in the current transaction, so never any pending work, - // so it would work to close before returning future, which would look better to SonarQube. - // But this is better if close ever does more. - indexBuilder.close(); - return null; - }); - - return context.instrument(FDBStoreTimer.Events.REBUILD_INDEX, - context.instrument(reason.event, future, startTime), - startTime); + return indexStateManager.rebuildIndex(index, reason); } - @SuppressWarnings("PMD.GuardLogStatement") // Already is, but around several call. + @SuppressWarnings("PMD.GuardLogStatement") private CompletableFuture checkPossiblyRebuild(@Nullable UserVersionChecker userVersionChecker, @Nonnull RecordMetaDataProto.DataStoreInfo.Builder info, @Nonnull boolean[] dirty) { - final int oldFormatVersion = info.getFormatVersion(); - final int newFormatVersion = Math.max(oldFormatVersion, formatVersion.getValueForSerialization()); - final boolean formatVersionChanged = oldFormatVersion != newFormatVersion; - formatVersion = FormatVersion.getFormatVersion(newFormatVersion); - - final boolean newStore = oldFormatVersion == 0; - final int oldMetaDataVersion = newStore ? -1 : info.getMetaDataversion(); - final RecordMetaData metaData = getRecordMetaData(); - final int newMetaDataVersion = metaData.getVersion(); - if (oldMetaDataVersion > newMetaDataVersion) { - CompletableFuture ret = new CompletableFuture<>(); - ret.completeExceptionally(new RecordStoreStaleMetaDataVersionException("Local meta-data has stale version", - LogMessageKeys.LOCAL_VERSION, newMetaDataVersion, - LogMessageKeys.STORED_VERSION, oldMetaDataVersion, - subspaceProvider.logKey(), subspaceProvider.toString(context))); - return ret; - } - final boolean metaDataVersionChanged = oldMetaDataVersion != newMetaDataVersion; - - if (!formatVersionChanged && !metaDataVersionChanged) { - return AsyncUtil.DONE; - } - - if (LOGGER.isInfoEnabled()) { - if (newStore) { - LOGGER.info(KeyValueLogMessage.of("new record store", - LogMessageKeys.FORMAT_VERSION, newFormatVersion, - LogMessageKeys.META_DATA_VERSION, newMetaDataVersion, - subspaceProvider.logKey(), subspaceProvider.toString(context))); - } else { - if (formatVersionChanged) { - LOGGER.info(KeyValueLogMessage.of("format version changed", - LogMessageKeys.OLD_VERSION, oldFormatVersion, - LogMessageKeys.NEW_VERSION, newFormatVersion, - subspaceProvider.logKey(), subspaceProvider.toString(context))); - } - if (metaDataVersionChanged) { - LOGGER.info(KeyValueLogMessage.of("meta-data version changed", - LogMessageKeys.OLD_VERSION, oldMetaDataVersion, - LogMessageKeys.NEW_VERSION, newMetaDataVersion, - subspaceProvider.logKey(), subspaceProvider.toString(context))); - } - } - } - - dirty[0] = true; - return checkRebuild(userVersionChecker, info, metaData); - } - - @SuppressWarnings("PMD.CloseResource") - private CompletableFuture checkRebuild(@Nullable UserVersionChecker userVersionChecker, - @Nonnull RecordMetaDataProto.DataStoreInfo.Builder info, - @Nonnull RecordMetaData metaData) { - final List> work = new LinkedList<>(); - - final int oldFormatVersion = info.getFormatVersion(); - if (oldFormatVersion != formatVersion.getValueForSerialization()) { - info.setFormatVersion(formatVersion.getValueForSerialization()); - // We must check whether we have to save unsplit records without a suffix before - // attempting to read data, i.e., before we update any indexes. - if ((oldFormatVersion >= MIN_FORMAT_VERSION - && oldFormatVersion < SAVE_UNSPLIT_WITH_SUFFIX_FORMAT_VERSION - && formatVersion.isAtLeast(FormatVersion.SAVE_UNSPLIT_WITH_SUFFIX) - && !metaData.isSplitLongRecords())) { - if (LOGGER.isInfoEnabled()) { - LOGGER.info(KeyValueLogMessage.of("unsplit records stored at old format", - LogMessageKeys.OLD_VERSION, oldFormatVersion, - LogMessageKeys.NEW_VERSION, formatVersion, - subspaceProvider.logKey(), subspaceProvider.toString(context))); - } - info.setOmitUnsplitRecordSuffix(true); - omitUnsplitRecordSuffix = true; - } - if (oldFormatVersion >= MIN_FORMAT_VERSION && oldFormatVersion < SAVE_VERSION_WITH_RECORD_FORMAT_VERSION - && metaData.isStoreRecordVersions() && !useOldVersionFormat()) { - if (LOGGER.isInfoEnabled()) { - LOGGER.info(KeyValueLogMessage.of("migrating record versions to new format", - LogMessageKeys.OLD_VERSION, oldFormatVersion, - LogMessageKeys.NEW_VERSION, formatVersion, - subspaceProvider.logKey(), subspaceProvider.toString(context))); - } - addConvertRecordVersions(work); - } - } - - final boolean newStore = oldFormatVersion == 0; - final int oldMetaDataVersion = newStore ? -1 : info.getMetaDataversion(); - final int newMetaDataVersion = metaData.getVersion(); - final boolean metaDataVersionChanged = oldMetaDataVersion != newMetaDataVersion; - if (metaDataVersionChanged) { - // Clear the version table if we are no longer storing record versions. - // This can be skipped if the store is new (in which case there is no data), or if the old - // store did not use the old version format to store record versions - if (!metaData.isStoreRecordVersions() && !newStore - && useOldVersionFormat(oldFormatVersion, omitUnsplitRecordSuffix)) { - final Transaction tr = ensureContextActive(); - tr.clear(getSubspace().subspace(Tuple.from(RECORD_VERSION_KEY)).range()); - } - info.setMetaDataversion(newMetaDataVersion); - } - - final boolean rebuildRecordCounts = checkPossiblyRebuildRecordCounts(metaData, info, work, oldFormatVersion); - - // Done if we just needed to update format version (which might trigger record count rebuild). - if (!metaDataVersionChanged) { - return work.isEmpty() ? AsyncUtil.DONE : AsyncUtil.whenReady(work.get(0)); - } - - // Remove former indexes. - for (FormerIndex formerIndex : metaData.getFormerIndexesSince(oldMetaDataVersion)) { - removeFormerIndex(formerIndex); - } - - return checkRebuildIndexes(userVersionChecker, info, oldFormatVersion, metaData, oldMetaDataVersion, rebuildRecordCounts, work); - } - - private CompletableFuture checkRebuildIndexes(@Nullable UserVersionChecker userVersionChecker, @Nonnull RecordMetaDataProto.DataStoreInfo.Builder info, - int oldFormatVersion, @Nonnull RecordMetaData metaData, int oldMetaDataVersion, - boolean rebuildRecordCounts, List> work) { - final boolean newStore = oldFormatVersion == 0; - final Map> indexes = metaData.getIndexesToBuildSince(oldMetaDataVersion); - handleNoLongerUniqueIndex(metaData, work, indexes); - if (!indexes.isEmpty()) { - // If all the new indexes are only for a record type whose primary key has a type prefix, then we can scan less. - RecordType singleRecordTypeWithPrefixKey = singleRecordTypeWithPrefixKey(indexes); - final AtomicLong recordCountRef = new AtomicLong(-1); - final Supplier> lazyRecordCount = getAndRememberFutureLong(recordCountRef, - () -> getRecordCountForRebuildIndexes(newStore, rebuildRecordCounts, indexes, singleRecordTypeWithPrefixKey)); - AtomicLong recordsSizeRef = new AtomicLong(-1); - final Supplier> lazyRecordsSize = getAndRememberFutureLong(recordsSizeRef, - () -> getRecordSizeForRebuildIndexes(singleRecordTypeWithPrefixKey)); - if (singleRecordTypeWithPrefixKey == null - && formatVersion.isAtLeast(FormatVersion.SAVE_UNSPLIT_WITH_SUFFIX) - && omitUnsplitRecordSuffix) { - // Check to see if the unsplit format can be upgraded on an empty store. - // Only works if singleRecordTypeWithPrefixKey is null as otherwise, the recordCount will not contain - // all records - work.add(lazyRecordCount.get().thenAccept(recordCount -> { - if (recordCount == 0) { - if (newStore ? LOGGER.isDebugEnabled() : LOGGER.isInfoEnabled()) { - KeyValueLogMessage msg = KeyValueLogMessage.build("upgrading unsplit format on empty store", - LogMessageKeys.NEW_FORMAT_VERSION, formatVersion, - subspaceProvider.logKey(), subspaceProvider.toString(context)); - if (newStore) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug(msg.toString()); - } - } else { - if (LOGGER.isInfoEnabled()) { - LOGGER.info(msg.toString()); - } - } - } - omitUnsplitRecordSuffix = !formatVersion.isAtLeast(FormatVersion.SAVE_UNSPLIT_WITH_SUFFIX); - info.clearOmitUnsplitRecordSuffix(); - addRecordsReadConflict(); // We used snapshot to determine emptiness, and are now acting on it. - } - })); - } - - Map> newStates = getStatesForRebuildIndexes(userVersionChecker, indexes, lazyRecordCount, lazyRecordsSize, newStore, oldMetaDataVersion, oldFormatVersion); - return rebuildIndexes(indexes, newStates, work, newStore ? RebuildIndexReason.NEW_STORE : RebuildIndexReason.FEW_RECORDS, oldMetaDataVersion).thenRun(() -> { - // Log after checking all index states - maybeLogIndexesNeedingRebuilding(newStates, recordCountRef, recordsSizeRef, rebuildRecordCounts, newStore); - context.increment(FDBStoreTimer.Counts.INDEXES_NEED_REBUILDING, newStates.entrySet().size()); - }); - } else { - return work.isEmpty() ? AsyncUtil.DONE : AsyncUtil.whenAll(work); - } - } - - /** - * It is legal to remove a uniqueness constraint from an index without changing the {@code lastModifiedVersion}, - * if this is done this method will clear any violations, and transition {@link IndexState#READABLE_UNIQUE_PENDING} - * to {@link IndexState#READABLE}. - * @param metaData the current metadata - * @param work work to be done as part of {@code checkVersion} - * @param indexesToBuildSince the list of indexes whose {@code lastModifiedVersion} has changed since we last did - * {@code checkVersion} - */ - private void handleNoLongerUniqueIndex(@Nonnull final RecordMetaData metaData, - @Nonnull final List> work, - @Nonnull final Map> indexesToBuildSince) { - for (Index index : metaData.getAllIndexes()) { - if (!indexesToBuildSince.containsKey(index) && - !index.isUnique()) { - final IndexState indexState = getIndexState(index); - if (indexState == IndexState.READABLE_UNIQUE_PENDING || indexState == IndexState.WRITE_ONLY) { - final CompletableFuture uniquenessFuture = AsyncUtil.getAll(getRecordContext().removeCommitChecks( - commitCheck -> { - if (commitCheck instanceof IndexUniquenessCommitCheck) { - return ((IndexUniquenessCommitCheck)commitCheck).getIndexSubspace().equals(indexSubspace(index)); - } else { - return false; - } - }, - // swallow any uniqueness violations, which may happen if the index was READABLE_UNIQUE_PENDING - err -> err instanceof RecordIndexUniquenessViolation)) - // Regardless, we want to clear any existing uniqueness violations on disk - .thenCompose(vignore -> getIndexMaintainer(index).clearUniquenessViolations()); - if (indexState == IndexState.READABLE_UNIQUE_PENDING) { - work.add(uniquenessFuture - .thenCompose(vignore -> markIndexReadable(index, false)) - .thenApply(vignore2 -> null)); - } else { - work.add(uniquenessFuture); - } - } - } - } - } - - private static Supplier> getAndRememberFutureLong(@Nonnull AtomicLong ref, @Nonnull Supplier> lazyFuture) { - return Suppliers.memoize(() -> lazyFuture.get().whenComplete((val, err) -> { - if (err == null) { - ref.set(val); - } - })); - } - - /** - * Get count of records to pass to a {@link UserVersionChecker} to decide whether to build right away. If all of the - * new indexes are over a single type and that type has a record key prefix, then this count will only be over the - * record type being indexed. If not, it will be the count of all records of all types, as in that case, the indexer - * will need to scan the entire store to build each index. If determining the record count would be too costly (such - * as if there is not an appropriate {@linkplain IndexTypes#COUNT count} index defined), this function may return - * {@link Long#MAX_VALUE} to indicate that an unknown and unbounded number of records would have to be scanned - * to build the index. - * - * @param newStore {@code true} if this is a brand new store - * @param rebuildRecordCounts {@code true} if there is a record count key that needs to be rebuilt - * @param indexes indexes that need to be built - * @param singleRecordTypeWithPrefixKey either a single record type prefixed by the record type key or {@code null} - * @return a future that completes to the record count for the version checker - */ - @Nonnull - @SuppressWarnings({"PMD.EmptyCatchBlock", "PMD.CloseResource"}) - protected CompletableFuture getRecordCountForRebuildIndexes(boolean newStore, boolean rebuildRecordCounts, - @Nonnull Map> indexes, - @Nullable RecordType singleRecordTypeWithPrefixKey) { - // Do this with the new indexes filtered out to avoid using one of them when evaluating the snapshot record count. - // At this point we won't have written that any new indexes are disabled - final IndexQueryabilityFilter indexQueryabilityFilter = index -> !indexes.containsKey(index); - if (singleRecordTypeWithPrefixKey != null) { - // Get a count for just those records, either from a COUNT index on just that type or from a universal COUNT index grouped by record type. - try { - return getSnapshotRecordCountForRecordType(singleRecordTypeWithPrefixKey.getName(), indexQueryabilityFilter); - } catch (RecordCoreException ex) { - // No such index; have to use total record count. - } - } - if (!rebuildRecordCounts) { - try { - // Note the call below can time out if the count index group cardinality is too high. Users can avoid it by - // setting a custom UserVersionChecker that looks at the size estimate rather than the count, but we should - // consider checking the size by default or otherwise making the ergonomics around hitting that limitation - // better in the future - // See: FDBRecordStoreBase.checkPossiblyRebuild() could take a long time if the record count index is split into many groups (https://github.com/FoundationDB/fdb-record-layer/issues/7) - return getSnapshotRecordCount(EmptyKeyExpression.EMPTY, Key.Evaluated.EMPTY, indexQueryabilityFilter); - } catch (RecordCoreException ex) { - // Probably this was from the lack of appropriate index on count; treat like rebuildRecordCounts = true. - } - } - // Do a scan (limited to a single record) to see if the store is empty. - final ExecuteProperties executeProperties = ExecuteProperties.newBuilder() - .setReturnedRowLimit(1) - .setIsolationLevel(IsolationLevel.SNAPSHOT) - .build(); - final ScanProperties scanProperties = new ScanProperties(executeProperties); - final RecordCursor> records; - if (singleRecordTypeWithPrefixKey == null) { - records = scanRecords(null, scanProperties); - } else { - records = scanRecords(TupleRange.allOf(singleRecordTypeWithPrefixKey.getRecordTypeKeyTuple()), null, scanProperties); - } - return records.onNext().thenApply(result -> { - if (result.hasNext()) { - if (LOGGER.isInfoEnabled()) { - LOGGER.info(KeyValueLogMessage.of("version check scan found non-empty store", - subspaceProvider.logKey(), subspaceProvider.toString(context))); - } - return Long.MAX_VALUE; - } else { - if (newStore ? LOGGER.isDebugEnabled() : LOGGER.isInfoEnabled()) { - KeyValueLogMessage msg = KeyValueLogMessage.build("version check scan found empty store", - subspaceProvider.logKey(), subspaceProvider.toString(context)); - if (newStore) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug(msg.toString()); - } - } else { - if (LOGGER.isInfoEnabled()) { - LOGGER.info(msg.toString()); - } - } - } - return 0L; - } - }); - } - - @Nonnull - private CompletableFuture getRecordSizeForRebuildIndexes(@Nullable RecordType singleRecordTypeWithPrefixKey) { - if (singleRecordTypeWithPrefixKey == null) { - return estimateRecordsSizeAsync(); - } else { - return estimateRecordsSizeAsync(TupleRange.allOf(singleRecordTypeWithPrefixKey.getRecordTypeKeyTuple())); - } - } - - @Nullable - @SuppressWarnings("PMD.CompareObjectsWithEquals") - protected RecordType singleRecordTypeWithPrefixKey(@Nonnull Map> indexes) { - RecordType recordType = null; - for (List entry : indexes.values()) { - Collection types = entry != null ? entry : getRecordMetaData().getRecordTypes().values(); - if (types.size() != 1) { - return null; - } - RecordType type1 = entry != null ? entry.get(0) : types.iterator().next(); - if (recordType == null) { - if (!type1.primaryKeyHasRecordTypePrefix()) { - return null; - } - recordType = type1; - } else if (type1 != recordType) { - return null; - } - } - return recordType; - } - - @SuppressWarnings("PMD.CloseResource") - private void addConvertRecordVersions(@Nonnull List> work) { - if (useOldVersionFormat()) { - throw recordCoreException("attempted to convert record versions when still using older format"); - } - final Subspace legacyVersionSubspace = getLegacyVersionSubspace(); - - // Read all of the keys in the old record version location. For each - // record, copy its version to the new location within the primary record - // subspace. Then once they are all copied, delete the old subspace. - KeyValueCursor kvCursor = KeyValueCursor.Builder.withSubspace(legacyVersionSubspace) - .setContext(getRecordContext()) - .setScanProperties(ScanProperties.FORWARD_SCAN) - .build(); - CompletableFuture workFuture = kvCursor.forEach(kv -> { - final Tuple primaryKey = legacyVersionSubspace.unpack(kv.getKey()); - final FDBRecordVersion version = FDBRecordVersion.fromBytes(kv.getValue(), false); - final byte[] newKeyBytes = getSubspace().pack(recordVersionKey(primaryKey)); - final byte[] newValueBytes = SplitHelper.packVersion(version); - ensureContextActive().set(newKeyBytes, newValueBytes); - }).thenAccept(ignore -> ensureContextActive().clear(legacyVersionSubspace.range())); - work.add(workFuture); - } - - @API(API.Status.INTERNAL) - @VisibleForTesting - public Subspace getLegacyVersionSubspace() { - return getSubspace().subspace(Tuple.from(RECORD_VERSION_KEY)); - } - - @Nonnull - protected Map> getStatesForRebuildIndexes(@Nullable UserVersionChecker userVersionChecker, - @Nonnull Map> indexes, - @Nonnull Supplier> lazyRecordCount, - @Nonnull Supplier> lazyRecordsSize, - boolean newStore, - int oldMetaDataVersion, - int oldFormatVersion) { - Map> newStates = new HashMap<>(); - for (Map.Entry> entry : indexes.entrySet()) { - Index index = entry.getKey(); - List recordTypes = entry.getValue(); - boolean indexOnNewRecordTypes = areAllRecordTypesSince(recordTypes, oldMetaDataVersion); - CompletableFuture stateFuture = userVersionChecker == null ? - lazyRecordCount.get().thenApply(recordCount -> FDBRecordStore.disabledIfTooManyRecordsForRebuild(recordCount, indexOnNewRecordTypes)) : - userVersionChecker.needRebuildIndex(index, lazyRecordCount, lazyRecordsSize, indexOnNewRecordTypes); - if (IndexTypes.VERSION.equals(index.getType()) - && !newStore - && oldFormatVersion < SAVE_VERSION_WITH_RECORD_FORMAT_VERSION - && !useOldVersionFormat()) { - stateFuture = stateFuture.thenApply(state -> { - if (IndexState.READABLE.equals(state)) { - // Do not rebuild any version indexes while the format conversion is going on. - // Otherwise, the process moving the versions might race against the index - // build and some versions won't be indexed correctly. - return IndexState.DISABLED; - } - return state; - }); - } - newStates.put(index, stateFuture); - } - return newStates; - } - - private void maybeLogIndexesNeedingRebuilding(@Nonnull Map> newStates, - @Nonnull AtomicLong recordCountRef, - @Nonnull AtomicLong recordsSizeRef, - boolean rebuildRecordCounts, - boolean newStore) { - if (LOGGER.isDebugEnabled()) { - KeyValueLogMessage msg = KeyValueLogMessage.build("indexes need rebuilding", - subspaceProvider.logKey(), subspaceProvider.toString(context)); - - // Log the statistics that the user version checker used to determine whether the index could be rebuilt - // online. For both the record count and the records size estimate, a non-negative value implies that - // the checker resolved the value - long recordCount = recordCountRef.get(); - if (recordCount >= 0L) { - msg.addKeyAndValue(LogMessageKeys.RECORD_COUNT, recordCount == Long.MAX_VALUE ? "unknown" : Long.toString(recordCount)); - } - long recordsSize = recordsSizeRef.get(); - if (recordsSize >= 0L) { - msg.addKeyAndValue(LogMessageKeys.RECORDS_SIZE_ESTIMATE, Long.toString(recordsSize)); - } - - if (rebuildRecordCounts) { - msg.addKeyAndValue(LogMessageKeys.REBUILD_RECORD_COUNTS, "true"); - } - Map> stateNames = new HashMap<>(); - for (Map.Entry> stateEntry : newStates.entrySet()) { - final String stateName; - if (MoreAsyncUtil.isCompletedNormally(stateEntry.getValue())) { - stateName = stateEntry.getValue().join().getLogName(); - } else { - stateName = "UNKNOWN"; - } - stateNames.compute(stateName, (key, names) -> { - if (names == null) { - names = new ArrayList<>(); - } - names.add(stateEntry.getKey().getName()); - return names; - }); - } - msg.addKeysAndValues(stateNames); - if (newStore) { - msg.addKeyAndValue(LogMessageKeys.NEW_STORE, "true"); - } - LOGGER.debug(msg.toString()); - } + return indexStateManager.checkPossiblyRebuild(userVersionChecker, info, dirty); } // Clear the data associated with a given index. This is only safe to do if one is @@ -5029,60 +4186,18 @@ private void maybeLogIndexesNeedingRebuilding(@Nonnull Map indexStates = getAllIndexStates(); // also adds state to read conflicts - for (Map.Entry entry : indexStates.entrySet()) { - if (entry.getValue().equals(IndexState.READABLE)) { - clearReadableIndexBuildData(entry.getKey()); - } - } + indexStateManager.vacuumReadableIndexesBuildData(); } @SuppressWarnings("PMD.CloseResource") @@ -5090,70 +4205,19 @@ protected boolean checkPossiblyRebuildRecordCounts(@Nonnull RecordMetaData metaD @Nonnull RecordMetaDataProto.DataStoreInfo.Builder info, @Nonnull List> work, int oldFormatVersion) { - boolean existingStore = oldFormatVersion > 0; - KeyExpression countKeyExpression = metaData.getRecordCountKey(); - - boolean rebuildRecordCounts = - (existingStore && oldFormatVersion < RECORD_COUNT_ADDED_FORMAT_VERSION) - || (countKeyExpression != null && formatVersion.isAtLeast(FormatVersion.RECORD_COUNT_KEY_ADDED) && - (!info.hasRecordCountKey() || !KeyExpression.fromProto(info.getRecordCountKey()).equals(countKeyExpression))) - || (countKeyExpression == null && info.hasRecordCountKey()); - - if (rebuildRecordCounts) { - // We want to clear all record counts. - // This code will leave data behind if the previous RecordCountKey was not grouped - // https://github.com/FoundationDB/fdb-record-layer/issues/3335 - if (existingStore) { - context.clear(getSubspace().range(Tuple.from(RECORD_COUNT_KEY))); - } - - // Set the new record count key if we have one. - if (formatVersion.isAtLeast(FormatVersion.RECORD_COUNT_KEY_ADDED)) { - if (countKeyExpression != null) { - info.setRecordCountKey(countKeyExpression.toKeyExpression()); - } else { - info.clearRecordCountKey(); - } - } - - // Add the record rebuild job. - if (existingStore) { - addRebuildRecordCountsJob(work); - } - } - return rebuildRecordCounts; + return indexStateManager.checkPossiblyRebuildRecordCounts(metaData, info, work, oldFormatVersion); } @SuppressWarnings("PMD.CloseResource") public void addRebuildRecordCountsJob(List> work) { - final KeyExpression recordCountKey = getRecordMetaData().getRecordCountKey(); - if (recordCountKey == null || - getRecordStoreState().getStoreHeader().getRecordCountState() == RecordMetaDataProto.DataStoreInfo.RecordCountState.DISABLED) { - return; - } - if (LOGGER.isDebugEnabled()) { - LOGGER.debug(KeyValueLogMessage.of("recounting all records", - subspaceProvider.logKey(), subspaceProvider.toString(context))); - } - final Map counts = new HashMap<>(); - final RecordCursor> records = scanRecords(null, ScanProperties.FORWARD_SCAN); - CompletableFuture future = records.forEach(rec -> { - Key.Evaluated subkey = recordCountKey.evaluateSingleton(rec); - counts.compute(subkey, (k, v) -> (v == null) ? 1 : v + 1); - }).thenApply(vignore -> { - final Transaction tr = ensureContextActive(); - final byte[] bytes = new byte[8]; - final ByteBuffer buf = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN); - for (Map.Entry entry : counts.entrySet()) { - buf.putLong(entry.getValue()); - tr.set(getSubspace().pack(Tuple.from(RECORD_COUNT_KEY).addAll(entry.getKey().toTupleAppropriateList())), - bytes); - buf.clear(); - } - return null; - }); - future = context.instrument(FDBStoreTimer.Events.RECOUNT_RECORDS, future); - work.add(future); + indexStateManager.addRebuildRecordCountsJob(work); + } + + @Nonnull + protected CompletableFuture getRecordCountForRebuildIndexes(boolean newStore, boolean rebuildRecordCounts, + @Nonnull Map> indexes, + @Nullable RecordType singleRecordTypeWithPrefixKey) { + return indexStateManager.getRecordCountForRebuildIndexesInternal(newStore, rebuildRecordCounts, indexes, singleRecordTypeWithPrefixKey); } /** @@ -5361,7 +4425,7 @@ private RecordCoreException recordCoreException(@Nonnull String msg, Object... k } @Nonnull - private UninitializedRecordStoreException uninitializedStoreException(@Nonnull String msg) { + UninitializedRecordStoreException uninitializedStoreException(@Nonnull String msg) { return new UninitializedRecordStoreException(msg, subspaceProvider.logKey(), subspaceProvider.toString(context)); } diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexStateManager.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexStateManager.java new file mode 100644 index 0000000000..243c527532 --- /dev/null +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexStateManager.java @@ -0,0 +1,1239 @@ +/* + * IndexStateManager.java + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2015-2024 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.apple.foundationdb.record.provider.foundationdb; + +import com.apple.foundationdb.Range; +import com.apple.foundationdb.Transaction; +import com.apple.foundationdb.annotation.API; +import com.apple.foundationdb.async.AsyncUtil; +import com.apple.foundationdb.async.MoreAsyncUtil; +import com.apple.foundationdb.record.ExecuteProperties; +import com.apple.foundationdb.record.IndexState; +import com.apple.foundationdb.record.IsolationLevel; +import com.apple.foundationdb.record.RecordCoreException; +import com.apple.foundationdb.record.RecordCursor; +import com.apple.foundationdb.record.RecordIndexUniquenessViolation; +import com.apple.foundationdb.record.RecordMetaData; +import com.apple.foundationdb.record.RecordMetaDataProto; +import com.apple.foundationdb.record.RecordStoreState; +import com.apple.foundationdb.record.ScanProperties; +import com.apple.foundationdb.record.TupleRange; +import com.apple.foundationdb.record.logging.KeyValueLogMessage; +import com.apple.foundationdb.record.logging.LogMessageKeys; +import com.apple.foundationdb.record.metadata.FormerIndex; +import com.apple.foundationdb.record.metadata.Index; +import com.apple.foundationdb.record.metadata.IndexTypes; +import com.apple.foundationdb.record.metadata.Key; +import com.apple.foundationdb.record.metadata.MetaDataException; +import com.apple.foundationdb.record.metadata.RecordType; +import com.apple.foundationdb.record.metadata.expressions.EmptyKeyExpression; +import com.apple.foundationdb.record.metadata.expressions.KeyExpression; +import com.apple.foundationdb.record.provider.foundationdb.indexing.IndexingHeartbeat; +import com.apple.foundationdb.record.provider.foundationdb.indexing.IndexingRangeSet; +import com.apple.foundationdb.record.query.IndexQueryabilityFilter; +import com.apple.foundationdb.subspace.Subspace; +import com.apple.foundationdb.tuple.ByteArrayUtil2; +import com.apple.foundationdb.tuple.Tuple; +import com.apple.foundationdb.util.LoggableException; +import com.google.common.base.Suppliers; +import com.google.protobuf.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +/** + * Manages index state queries, transitions, and rebuild orchestration for an {@link FDBRecordStore}. + * + *

+ * This is a package-private helper extracted from {@code FDBRecordStore} to reduce its size and + * improve cohesion. All index lifecycle operations — querying state, marking indexes as readable/ + * write-only/disabled, rebuilding indexes during version checks — are handled here. + *

+ */ +@API(API.Status.INTERNAL) +class IndexStateManager { + private static final Logger LOGGER = LoggerFactory.getLogger(IndexStateManager.class); + + @Nonnull + private final FDBRecordStore store; + + IndexStateManager(@Nonnull FDBRecordStore store) { + this.store = store; + } + + // region State Read/Write Control + + void beginRecordStoreStateRead() { + store.recordStoreStateRef.get().beginRead(); + } + + void endRecordStoreStateRead() { + store.recordStoreStateRef.get().endRead(); + } + + void beginRecordStoreStateWrite() { + store.recordStoreStateRef.get().beginWrite(); + } + + void endRecordStoreStateWrite() { + store.recordStoreStateRef.get().endWrite(); + } + + // endregion + + // region Index State Query + + @Nonnull + IndexState getIndexState(@Nonnull Index index) { + return getIndexState(index.getName()); + } + + @Nonnull + IndexState getIndexState(@Nonnull String indexName) { + store.addIndexStateReadConflict(indexName); + return store.getRecordStoreState().getState(indexName); + } + + boolean isIndexReadable(@Nonnull Index index) { + return isIndexReadable(index.getName()); + } + + boolean isIndexReadable(@Nonnull String indexName) { + return getIndexState(indexName).equals(IndexState.READABLE); + } + + boolean isIndexReadableUniquePending(@Nonnull Index index) { + return isIndexReadableUniquePending(index.getName()); + } + + boolean isIndexReadableUniquePending(@Nonnull String indexName) { + return getIndexState(indexName).equals(IndexState.READABLE_UNIQUE_PENDING); + } + + boolean isIndexScannable(@Nonnull Index index) { + return isIndexScannable(index.getName()); + } + + boolean isIndexScannable(@Nonnull String indexName) { + return getIndexState(indexName).isScannable(); + } + + boolean isIndexWriteOnly(@Nonnull Index index) { + return isIndexWriteOnly(index.getName()); + } + + boolean isIndexWriteOnly(@Nonnull String indexName) { + return getIndexState(indexName).equals(IndexState.WRITE_ONLY); + } + + boolean isIndexDisabled(@Nonnull Index index) { + return isIndexDisabled(index.getName()); + } + + boolean isIndexDisabled(@Nonnull String indexName) { + return getIndexState(indexName).equals(IndexState.DISABLED); + } + + @Nonnull + Map getAllIndexStates() { + final RecordStoreState localRecordStoreState = store.getRecordStoreState(); + localRecordStoreState.beginRead(); + try { + store.addStoreStateReadConflict(); + return store.getRecordMetaData().getAllIndexes().stream() + .collect(Collectors.toMap(Function.identity(), localRecordStoreState::getState)); + } finally { + localRecordStoreState.endRead(); + } + } + + // endregion + + // region Index State Mutation + + @SuppressWarnings("PMD.CloseResource") + void updateIndexState(@Nonnull String indexName, byte[] indexKey, @Nonnull IndexState indexState) { + if (LOGGER.isInfoEnabled()) { + LOGGER.info(KeyValueLogMessage.of("index state change", + LogMessageKeys.INDEX_NAME, indexName, + LogMessageKeys.TARGET_INDEX_STATE, indexState.name(), + store.subspaceProvider.logKey(), store.subspaceProvider.toString(store.context) + )); + } + if (store.recordStoreStateRef.get() == null) { + throw store.uninitializedStoreException("cannot update index state on an uninitialized store"); + } + beginRecordStoreStateWrite(); + try { + store.context.setDirtyStoreState(true); + if (store.isStateCacheableInternal()) { + store.context.setMetaDataVersionStamp(); + } + Transaction tr = store.context.ensureActive(); + if (IndexState.READABLE.equals(indexState)) { + tr.clear(indexKey); + } else { + tr.set(indexKey, Tuple.from(indexState.code()).pack()); + } + store.recordStoreStateRef.updateAndGet(state -> { + state.setState(indexName, indexState); + return state; + }); + } finally { + endRecordStoreStateWrite(); + } + } + + @Nonnull + @SuppressWarnings("PMD.CloseResource") + CompletableFuture markIndexNotReadable(@Nonnull String indexName, @Nonnull IndexState indexState) { + if (store.recordStoreStateRef.get() == null) { + return store.preloadRecordStoreStateAsync().thenCompose(vignore -> markIndexNotReadable(indexName, indexState)); + } + + store.addIndexStateReadConflict(indexName); + + beginRecordStoreStateWrite(); + boolean haveFuture = false; + try { + byte[] indexKey = store.indexStateSubspace().pack(indexName); + Transaction tr = store.context.ensureActive(); + CompletableFuture future = tr.get(indexKey).thenCompose(previous -> { + if (previous == null) { + IndexingRangeSet indexRangeSet = IndexingRangeSet.forIndexBuild(store, store.getRecordMetaData().getIndex(indexName)); + return indexRangeSet.isEmptyAsync().thenCompose(empty -> { + if (empty) { + return indexRangeSet.insertRangeAsync(null, null); + } else { + return AsyncUtil.READY_FALSE; + } + }).thenApply(ignore -> { + updateIndexState(indexName, indexKey, indexState); + return true; + }); + } else if (!Tuple.fromBytes(previous).get(0).equals(indexState.code())) { + updateIndexState(indexName, indexKey, indexState); + return AsyncUtil.READY_TRUE; + } else { + return AsyncUtil.READY_FALSE; + } + }).whenComplete((b, t) -> endRecordStoreStateWrite()); + haveFuture = true; + return future; + } finally { + if (!haveFuture) { + endRecordStoreStateWrite(); + } + } + } + + @Nonnull + CompletableFuture markIndexWriteOnly(@Nonnull String indexName) { + return markIndexNotReadable(indexName, IndexState.WRITE_ONLY); + } + + @Nonnull + CompletableFuture markIndexWriteOnly(@Nonnull Index index) { + return markIndexWriteOnly(index.getName()); + } + + @Nonnull + CompletableFuture markIndexDisabled(@Nonnull String indexName) { + return markIndexDisabled(store.getRecordMetaData().getIndex(indexName)); + } + + @Nonnull + CompletableFuture markIndexDisabled(@Nonnull Index index) { + return markIndexNotReadable(index.getName(), IndexState.DISABLED).thenApply(changed -> { + if (changed) { + clearIndexData(index); + } + return changed; + }); + } + + @Nonnull + CompletableFuture markIndexReadableOrUniquePending(@Nonnull Index index) { + return markIndexReadable(index, true); + } + + @Nonnull + @SuppressWarnings("PMD.CloseResource") + CompletableFuture markIndexReadable(@Nonnull Index index) { + return markIndexReadable(index, false); + } + + @Nonnull + private CompletableFuture markIndexReadable(@Nonnull Index index, boolean allowUniquePending) { + if (store.recordStoreStateRef.get() == null) { + return store.preloadRecordStoreStateAsync().thenCompose(vignore -> markIndexReadable(index, allowUniquePending)); + } + + store.addIndexStateReadConflict(index.getName()); + + beginRecordStoreStateWrite(); + boolean haveFuture = false; + try { + @SuppressWarnings("PMD.CloseResource") + Transaction tr = store.ensureContextActive(); + byte[] indexKey = store.indexStateSubspace().pack(index.getName()); + CompletableFuture future = tr.get(indexKey).thenCompose(previous -> { + if (previous != null) { + return checkAndUpdateBuiltIndexState(index, indexKey, allowUniquePending); + } else { + return AsyncUtil.READY_FALSE; + } + }).whenComplete((b, t) -> endRecordStoreStateWrite()).thenApply(this::addRemoveReplacedIndexesCommitCheckIfChanged); + haveFuture = true; + return future; + } finally { + if (!haveFuture) { + endRecordStoreStateWrite(); + } + } + } + + @Nonnull + CompletableFuture markIndexReadable(@Nonnull String indexName) { + return markIndexReadable(store.getRecordMetaData().getIndex(indexName)); + } + + private CompletableFuture checkAndUpdateBuiltIndexState(Index index, byte[] indexKey, boolean allowUniquePending) { + CompletableFuture> builtFuture = firstUnbuiltRange(index); + CompletableFuture> uniquenessFuture; + if (index.isUnique()) { + uniquenessFuture = store.whenAllIndexUniquenessCommitChecks(index) + .thenCompose(vignore -> store.scanUniquenessViolations(index, 1).first()); + } else { + uniquenessFuture = CompletableFuture.completedFuture(Optional.empty()); + } + return CompletableFuture.allOf(builtFuture, uniquenessFuture).thenApply(vignore -> { + Optional firstUnbuilt = store.context.join(builtFuture); + Optional uniquenessViolation = store.context.join(uniquenessFuture); + + if (firstUnbuilt.isPresent()) { + throw new FDBRecordStore.IndexNotBuiltException("Attempted to make unbuilt index readable", firstUnbuilt.get(), + LogMessageKeys.INDEX_NAME, index.getName(), + "unbuiltRangeBegin", ByteArrayUtil2.loggable(firstUnbuilt.get().begin), + "unbuiltRangeEnd", ByteArrayUtil2.loggable(firstUnbuilt.get().end), + store.subspaceProvider.logKey(), store.subspaceProvider.toString(store.context), + LogMessageKeys.SUBSPACE_KEY, index.getSubspaceKey()); + } else if (uniquenessViolation.isPresent()) { + if (allowUniquePending) { + if (isIndexReadableUniquePending(index)) { + return false; // Unchanged + } + updateIndexState(index.getName(), indexKey, IndexState.READABLE_UNIQUE_PENDING); + return true; + } + RecordIndexUniquenessViolation wrapped = new RecordIndexUniquenessViolation("Uniqueness violation when making index readable", + uniquenessViolation.get()); + wrapped.addLogInfo( + LogMessageKeys.INDEX_NAME, index.getName(), + store.subspaceProvider.logKey(), store.subspaceProvider.toString(store.context)); + throw wrapped; + } else { + updateIndexState(index.getName(), indexKey, IndexState.READABLE); + clearReadableIndexBuildData(index); + return true; + } + }); + } + + @Nonnull + @SuppressWarnings("PMD.CloseResource") + CompletableFuture uncheckedMarkIndexReadable(@Nonnull String indexName) { + if (store.recordStoreStateRef.get() == null) { + return store.preloadRecordStoreStateAsync().thenCompose(vignore -> uncheckedMarkIndexReadable(indexName)); + } + + store.addIndexStateReadConflict(indexName); + + beginRecordStoreStateWrite(); + boolean haveFuture = false; + try { + Transaction tr = store.ensureContextActive(); + byte[] indexKey = store.indexStateSubspace().pack(indexName); + CompletableFuture future = tr.get(indexKey).thenApply(previous -> { + if (previous != null) { + updateIndexState(indexName, indexKey, IndexState.READABLE); + return true; + } else { + return false; + } + }).whenComplete((b, t) -> endRecordStoreStateWrite()).thenApply(this::addRemoveReplacedIndexesCommitCheckIfChanged); + haveFuture = true; + return future; + } finally { + if (!haveFuture) { + endRecordStoreStateWrite(); + } + } + } + + @Nonnull + CompletableFuture clearAndMarkIndexWriteOnly(@Nonnull String indexName) { + return clearAndMarkIndexWriteOnly(store.getRecordMetaData().getIndex(indexName)); + } + + @Nonnull + CompletableFuture clearAndMarkIndexWriteOnly(@Nonnull Index index) { + return markIndexWriteOnly(index) + .thenRun(() -> clearIndexData(index)); + } + + @Nonnull + @SuppressWarnings("PMD.CloseResource") + CompletableFuture> firstUnbuiltRange(@Nonnull Index index) { + if (!store.getRecordMetaData().hasIndex(index.getName())) { + throw new MetaDataException("Index " + index.getName() + " does not exist in meta-data."); + } + IndexingRangeSet rangeSet = IndexingRangeSet.forIndexBuild(store, index); + return rangeSet.firstMissingRangeAsync().thenApply(Optional::ofNullable); + } + + // endregion + + // region Replaced Index Management + + boolean addRemoveReplacedIndexesCommitCheckIfChanged(boolean changed) { + if (changed) { + final String commitCheckName = "removeReplacedIndexes_" + ByteArrayUtil2.toHexString(store.getSubspace().pack()); + store.getRecordContext().getOrCreateCommitCheck(commitCheckName, name -> this::removeReplacedIndexes); + } + return changed; + } + + @Nonnull + CompletableFuture removeReplacedIndexesIfChanged(boolean changed) { + if (changed) { + return removeReplacedIndexes().thenApply(vignore -> true); + } else { + return AsyncUtil.READY_FALSE; + } + } + + @Nonnull + CompletableFuture removeReplacedIndexes() { + if (store.recordStoreStateRef.get() == null) { + return store.preloadRecordStoreStateAsync().thenCompose(vignore -> removeReplacedIndexes()); + } + + beginRecordStoreStateRead(); + final RecordMetaData metaData = store.getRecordMetaData(); + final List indexesToRemove = new ArrayList<>(); + try { + for (Index index : metaData.getAllIndexes()) { + final List replacedByNames = index.getReplacedByIndexNames(); + if (!replacedByNames.isEmpty()) { + if (replacedByNames.stream() + .allMatch(replacedByName -> metaData.hasIndex(replacedByName) && isIndexReadable(replacedByName))) { + indexesToRemove.add(index); + } + } + } + } finally { + endRecordStoreStateRead(); + } + + if (indexesToRemove.isEmpty()) { + return AsyncUtil.DONE; + } + + beginRecordStoreStateWrite(); + boolean haveFuture = false; + try { + final List> indexRemoveFutures = new ArrayList<>(indexesToRemove.size()); + for (Index index : indexesToRemove) { + indexRemoveFutures.add(markIndexDisabled(index)); + } + CompletableFuture future = AsyncUtil.whenAll(indexRemoveFutures) + .whenComplete((vignore, errIgnore) -> endRecordStoreStateWrite()); + haveFuture = true; + return future; + } finally { + if (!haveFuture) { + endRecordStoreStateWrite(); + } + } + } + + // endregion + + // region Index Data Management + + @SuppressWarnings("PMD.CloseResource") + void clearIndexData(@Nonnull Index index) { + store.context.clear(Range.startsWith(store.indexSubspace(index).pack())); + store.context.clear(store.indexSecondarySubspace(index).range()); + IndexingRangeSet.forIndexBuild(store, index).clear(); + store.context.clear(store.indexUniquenessViolationsSubspace(index).range()); + IndexingSubspaces.eraseAllIndexingDataButTheLock(store.context, store, index); + } + + private void clearReadableIndexBuildData(Index index) { + IndexingRangeSet.forIndexBuild(store, index).clear(); + IndexingHeartbeat.clearAllHeartbeats(store, index); + } + + @SuppressWarnings("PMD.CloseResource") + void removeFormerIndex(FormerIndex formerIndex) { + if (LOGGER.isDebugEnabled()) { + KeyValueLogMessage msg = KeyValueLogMessage.build("removing index", + store.subspaceProvider.logKey(), store.subspaceProvider.toString(store.context), + LogMessageKeys.SUBSPACE_KEY, formerIndex.getSubspaceKey()); + if (formerIndex.getFormerName() != null) { + msg.addKeyAndValue(LogMessageKeys.INDEX_NAME, formerIndex.getFormerName()); + } + LOGGER.debug(msg.toString()); + } + final long startTime = System.nanoTime(); + store.context.clear(store.getSubspace().range(Tuple.from(FDBRecordStore.INDEX_KEY, formerIndex.getSubspaceTupleKey()))); + store.context.clear(store.getSubspace().range(Tuple.from(FDBRecordStore.INDEX_SECONDARY_SPACE_KEY, formerIndex.getSubspaceTupleKey()))); + store.context.clear(store.getSubspace().range(Tuple.from(FDBRecordStore.INDEX_RANGE_SPACE_KEY, formerIndex.getSubspaceTupleKey()))); + final String formerIndexName = formerIndex.getFormerName(); + if (formerIndexName != null) { + updateIndexState(formerIndexName, store.getSubspace().pack(Tuple.from(FDBRecordStore.INDEX_STATE_SPACE_KEY, formerIndexName)), IndexState.READABLE); + } + store.context.clear(store.getSubspace().range(Tuple.from(FDBRecordStore.INDEX_UNIQUENESS_VIOLATIONS_KEY, formerIndex.getSubspaceTupleKey()))); + if (store.getTimer() != null) { + store.getTimer().recordSinceNanoTime(FDBStoreTimer.Events.REMOVE_FORMER_INDEX, startTime); + } + } + + @SuppressWarnings("PMD.CloseResource") + void vacuumReadableIndexesBuildData() { + Map indexStates = getAllIndexStates(); + for (Map.Entry entry : indexStates.entrySet()) { + if (entry.getValue().equals(IndexState.READABLE)) { + clearReadableIndexBuildData(entry.getKey()); + } + } + } + + @Nonnull + Map> getIndexesToBuild() { + if (store.recordStoreStateRef.get() == null) { + throw store.uninitializedStoreException("cannot get indexes to build on uninitialized store"); + } + final Map> indexesToBuild = store.getRecordMetaData().getIndexesToBuildSince(-1); + beginRecordStoreStateRead(); + try { + indexesToBuild.keySet().removeIf(this::isIndexReadable); + return indexesToBuild; + } finally { + endRecordStoreStateRead(); + } + } + + // endregion + + // region Rebuild Index Methods + + @Nonnull + @SuppressWarnings("PMD.CloseResource") + CompletableFuture rebuildAllIndexes() { + store.context.clear(store.getSubspace().range(Tuple.from(FDBRecordStore.INDEX_KEY))); + store.context.clear(store.getSubspace().range(Tuple.from(FDBRecordStore.INDEX_SECONDARY_SPACE_KEY))); + store.context.clear(store.getSubspace().range(Tuple.from(FDBRecordStore.INDEX_RANGE_SPACE_KEY))); + store.context.clear(store.getSubspace().range(Tuple.from(FDBRecordStore.INDEX_UNIQUENESS_VIOLATIONS_KEY))); + List> work = new LinkedList<>(); + addRebuildRecordCountsJob(work); + return rebuildIndexes(store.getRecordMetaData().getIndexesToBuildSince(-1), Collections.emptyMap(), work, + FDBRecordStore.RebuildIndexReason.REBUILD_ALL, null); + } + + private CompletableFuture> rebuildIndexesGetDesiredIndexStates( + @Nonnull List> preWork, + @Nonnull Map> newStates) { + final ConcurrentHashMap desiredIndexStates = new ConcurrentHashMap<>(); + final List> allWork = new ArrayList<>(preWork); + for (Map.Entry> entry : newStates.entrySet()) { + allWork.add(entry.getValue().thenAccept(state -> desiredIndexStates.put(entry.getKey(), state))); + } + return AsyncUtil.whenAll(allWork).thenApply(ignore -> desiredIndexStates); + } + + @Nonnull + CompletableFuture rebuildIndexes(@Nonnull Map> indexes, + @Nonnull Map> newStates, + @Nonnull List> work, + @Nonnull FDBRecordStore.RebuildIndexReason reason, + @Nullable Integer oldMetaDataVersion) { + return rebuildIndexesGetDesiredIndexStates(work, newStates).thenCompose(desiredIndexStates -> + rebuildIndexes(indexes, desiredIndexStates, reason, oldMetaDataVersion)); + } + + @Nonnull + CompletableFuture rebuildIndexes(@Nonnull Map> indexes, + @Nonnull Map desiredIndexStates, + @Nonnull FDBRecordStore.RebuildIndexReason reason, + @Nullable Integer oldMetaDataVersion) { + List> work = new ArrayList<>(); + Iterator>> indexIter = indexes.entrySet().iterator(); + return AsyncUtil.whileTrue(() -> { + Iterator> workIter = work.iterator(); + while (workIter.hasNext()) { + CompletableFuture workItem = workIter.next(); + if (workItem.isDone()) { + store.context.asyncToSync(FDBStoreTimer.Waits.WAIT_ERROR_CHECK, workItem); + workIter.remove(); + } + } + while (work.size() < FDBRecordStore.MAX_PARALLEL_INDEX_REBUILD) { + if (indexIter.hasNext()) { + Map.Entry> indexItem = indexIter.next(); + Index index = indexItem.getKey(); + List recordTypes = indexItem.getValue(); + IndexState indexState = desiredIndexStates.getOrDefault(index, IndexState.READABLE); + final StringBuilder errMessageBuilder = new StringBuilder("unable to "); + final CompletableFuture rebuildOrMarkIndexSafely = MoreAsyncUtil.handleOnException( + () -> rebuildOrMarkIndex(index, indexState, recordTypes, reason, oldMetaDataVersion, errMessageBuilder), + exception -> { + logExceptionAsWarn(KeyValueLogMessage.build(errMessageBuilder.toString(), + LogMessageKeys.INDEX_NAME, index.getName() + ), exception); + return markIndexDisabled(index).thenApply(b -> null); + }); + work.add(rebuildOrMarkIndexSafely); + } else { + break; + } + } + if (work.isEmpty()) { + return AsyncUtil.READY_FALSE; + } + return AsyncUtil.whenAny(work).thenApply(v -> true); + }, store.getExecutor()); + } + + boolean areAllRecordTypesSince(@Nullable Collection recordTypes, @Nullable Integer oldMetaDataVersion) { + return oldMetaDataVersion != null && (oldMetaDataVersion == -1 || (recordTypes != null && recordTypes.stream().allMatch(recordType -> { + Integer sinceVersion = recordType.getSinceVersion(); + return sinceVersion != null && sinceVersion > oldMetaDataVersion; + }))); + } + + CompletableFuture rebuildOrMarkIndex(@Nonnull Index index, @Nonnull IndexState indexState, + @Nullable List recordTypes, + @Nonnull FDBRecordStore.RebuildIndexReason reason, + @Nullable Integer oldMetaDataVersion, + @Nonnull StringBuilder errMessageBuilder) { + if (indexState != IndexState.DISABLED && areAllRecordTypesSince(recordTypes, oldMetaDataVersion)) { + errMessageBuilder.append("rebuild index with no records"); + return rebuildIndexWithNoRecord(index, reason); + } + + switch (indexState) { + case WRITE_ONLY: + errMessageBuilder.append("clear and mark index write only"); + return clearAndMarkIndexWriteOnly(index).thenApply(b -> null); + case DISABLED: + errMessageBuilder.append("mark index disabled"); + return markIndexDisabled(index).thenApply(b -> null); + case READABLE: + default: + errMessageBuilder.append("rebuild index"); + return rebuildIndex(index, reason); + } + } + + @Nonnull + private CompletableFuture rebuildIndexWithNoRecord(@Nonnull final Index index, + @Nonnull FDBRecordStore.RebuildIndexReason reason) { + final boolean newStore = reason == FDBRecordStore.RebuildIndexReason.NEW_STORE; + if (newStore ? LOGGER.isDebugEnabled() : LOGGER.isInfoEnabled()) { + final KeyValueLogMessage msg = KeyValueLogMessage.build("rebuilding index with no record", + LogMessageKeys.INDEX_NAME, index.getName(), + LogMessageKeys.INDEX_VERSION, index.getLastModifiedVersion(), + LogMessageKeys.REASON, reason.name(), + store.subspaceProvider.logKey(), store.subspaceProvider.toString(store.context), + LogMessageKeys.SUBSPACE_KEY, index.getSubspaceKey()); + if (newStore) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug(msg.toString()); + } + } else { + if (LOGGER.isInfoEnabled()) { + LOGGER.info(msg.toString()); + } + } + } + + return markIndexReadable(index).thenApply(b -> null); + } + + @Nonnull + CompletableFuture rebuildIndex(@Nonnull Index index) { + return rebuildIndex(index, FDBRecordStore.RebuildIndexReason.EXPLICIT); + } + + @Nonnull + @SuppressWarnings({"squid:S2095", "PMD.CloseResource"}) + CompletableFuture rebuildIndex(@Nonnull final Index index, @Nonnull FDBRecordStore.RebuildIndexReason reason) { + final boolean newStore = reason == FDBRecordStore.RebuildIndexReason.NEW_STORE; + if (newStore ? LOGGER.isDebugEnabled() : LOGGER.isInfoEnabled()) { + final KeyValueLogMessage msg = KeyValueLogMessage.build("rebuilding index", + LogMessageKeys.INDEX_NAME, index.getName(), + LogMessageKeys.INDEX_VERSION, index.getLastModifiedVersion(), + LogMessageKeys.REASON, reason.name(), + store.subspaceProvider.logKey(), store.subspaceProvider.toString(store.context), + LogMessageKeys.SUBSPACE_KEY, index.getSubspaceKey()); + if (newStore) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug(msg.toString()); + } + } else { + if (LOGGER.isInfoEnabled()) { + LOGGER.info(msg.toString()); + } + } + } + + long startTime = System.nanoTime(); + OnlineIndexer indexBuilder = OnlineIndexer.newBuilder().setRecordStore(store).setIndex(index).build(); + CompletableFuture future = indexBuilder.rebuildIndexAsync(store) + .thenCompose(vignore -> markIndexReadable(index)) + .handle((b, t) -> { + if (t != null) { + logExceptionAsWarn(KeyValueLogMessage.build("rebuilding index failed", + LogMessageKeys.INDEX_NAME, index.getName(), + LogMessageKeys.INDEX_VERSION, index.getLastModifiedVersion(), + LogMessageKeys.REASON, reason.name(), + LogMessageKeys.SUBSPACE_KEY, index.getSubspaceKey()), t); + } + indexBuilder.close(); + return null; + }); + + return store.context.instrument(FDBStoreTimer.Events.REBUILD_INDEX, + store.context.instrument(reason.event, future, startTime), + startTime); + } + + // endregion + + // region Rebuild Orchestration (checkVersion path) + + @SuppressWarnings("PMD.GuardLogStatement") + CompletableFuture checkPossiblyRebuild(@Nullable FDBRecordStoreBase.UserVersionChecker userVersionChecker, + @Nonnull RecordMetaDataProto.DataStoreInfo.Builder info, + @Nonnull boolean[] dirty) { + final int oldFormatVersion = info.getFormatVersion(); + final int newFormatVersion = Math.max(oldFormatVersion, store.formatVersion.getValueForSerialization()); + final boolean formatVersionChanged = oldFormatVersion != newFormatVersion; + store.formatVersion = FormatVersion.getFormatVersion(newFormatVersion); + + final boolean newStore = oldFormatVersion == 0; + final int oldMetaDataVersion = newStore ? -1 : info.getMetaDataversion(); + final RecordMetaData metaData = store.getRecordMetaData(); + final int newMetaDataVersion = metaData.getVersion(); + if (oldMetaDataVersion > newMetaDataVersion) { + CompletableFuture ret = new CompletableFuture<>(); + ret.completeExceptionally(new RecordStoreStaleMetaDataVersionException("Local meta-data has stale version", + LogMessageKeys.LOCAL_VERSION, newMetaDataVersion, + LogMessageKeys.STORED_VERSION, oldMetaDataVersion, + store.subspaceProvider.logKey(), store.subspaceProvider.toString(store.context))); + return ret; + } + final boolean metaDataVersionChanged = oldMetaDataVersion != newMetaDataVersion; + + if (!formatVersionChanged && !metaDataVersionChanged) { + return AsyncUtil.DONE; + } + + if (LOGGER.isInfoEnabled()) { + if (newStore) { + LOGGER.info(KeyValueLogMessage.of("new record store", + LogMessageKeys.FORMAT_VERSION, newFormatVersion, + LogMessageKeys.META_DATA_VERSION, newMetaDataVersion, + store.subspaceProvider.logKey(), store.subspaceProvider.toString(store.context))); + } else { + if (formatVersionChanged) { + LOGGER.info(KeyValueLogMessage.of("format version changed", + LogMessageKeys.OLD_VERSION, oldFormatVersion, + LogMessageKeys.NEW_VERSION, newFormatVersion, + store.subspaceProvider.logKey(), store.subspaceProvider.toString(store.context))); + } + if (metaDataVersionChanged) { + LOGGER.info(KeyValueLogMessage.of("meta-data version changed", + LogMessageKeys.OLD_VERSION, oldMetaDataVersion, + LogMessageKeys.NEW_VERSION, newMetaDataVersion, + store.subspaceProvider.logKey(), store.subspaceProvider.toString(store.context))); + } + } + } + + dirty[0] = true; + return checkRebuild(userVersionChecker, info, metaData); + } + + @SuppressWarnings("PMD.CloseResource") + private CompletableFuture checkRebuild(@Nullable FDBRecordStoreBase.UserVersionChecker userVersionChecker, + @Nonnull RecordMetaDataProto.DataStoreInfo.Builder info, + @Nonnull RecordMetaData metaData) { + final List> work = new LinkedList<>(); + + final int oldFormatVersion = info.getFormatVersion(); + if (oldFormatVersion != store.formatVersion.getValueForSerialization()) { + info.setFormatVersion(store.formatVersion.getValueForSerialization()); + if ((oldFormatVersion >= FormatVersion.getMinimumVersion().getValueForSerialization() + && oldFormatVersion < FormatVersion.SAVE_UNSPLIT_WITH_SUFFIX.getValueForSerialization() + && store.formatVersion.isAtLeast(FormatVersion.SAVE_UNSPLIT_WITH_SUFFIX) + && !metaData.isSplitLongRecords())) { + if (LOGGER.isInfoEnabled()) { + LOGGER.info(KeyValueLogMessage.of("unsplit records stored at old format", + LogMessageKeys.OLD_VERSION, oldFormatVersion, + LogMessageKeys.NEW_VERSION, store.formatVersion, + store.subspaceProvider.logKey(), store.subspaceProvider.toString(store.context))); + } + info.setOmitUnsplitRecordSuffix(true); + store.omitUnsplitRecordSuffix = true; + } + if (oldFormatVersion >= FormatVersion.getMinimumVersion().getValueForSerialization() + && oldFormatVersion < FormatVersion.SAVE_VERSION_WITH_RECORD.getValueForSerialization() + && metaData.isStoreRecordVersions() && !store.useOldVersionFormat()) { + if (LOGGER.isInfoEnabled()) { + LOGGER.info(KeyValueLogMessage.of("migrating record versions to new format", + LogMessageKeys.OLD_VERSION, oldFormatVersion, + LogMessageKeys.NEW_VERSION, store.formatVersion, + store.subspaceProvider.logKey(), store.subspaceProvider.toString(store.context))); + } + addConvertRecordVersions(work); + } + } + + final boolean newStore = oldFormatVersion == 0; + final int oldMetaDataVersion = newStore ? -1 : info.getMetaDataversion(); + final int newMetaDataVersion = metaData.getVersion(); + final boolean metaDataVersionChanged = oldMetaDataVersion != newMetaDataVersion; + if (metaDataVersionChanged) { + if (!metaData.isStoreRecordVersions() && !newStore + && store.useOldVersionFormat()) { + final Transaction tr = store.ensureContextActive(); + tr.clear(store.getSubspace().subspace(Tuple.from(FDBRecordStore.RECORD_VERSION_KEY)).range()); + } + info.setMetaDataversion(newMetaDataVersion); + } + + final boolean rebuildRecordCounts = checkPossiblyRebuildRecordCounts(metaData, info, work, oldFormatVersion); + + if (!metaDataVersionChanged) { + return work.isEmpty() ? AsyncUtil.DONE : AsyncUtil.whenReady(work.get(0)); + } + + for (FormerIndex formerIndex : metaData.getFormerIndexesSince(oldMetaDataVersion)) { + removeFormerIndex(formerIndex); + } + + return checkRebuildIndexes(userVersionChecker, info, oldFormatVersion, metaData, oldMetaDataVersion, rebuildRecordCounts, work); + } + + private CompletableFuture checkRebuildIndexes(@Nullable FDBRecordStoreBase.UserVersionChecker userVersionChecker, + @Nonnull RecordMetaDataProto.DataStoreInfo.Builder info, + int oldFormatVersion, @Nonnull RecordMetaData metaData, + int oldMetaDataVersion, boolean rebuildRecordCounts, + List> work) { + final boolean newStore = oldFormatVersion == 0; + final Map> indexes = metaData.getIndexesToBuildSince(oldMetaDataVersion); + handleNoLongerUniqueIndex(metaData, work, indexes); + if (!indexes.isEmpty()) { + RecordType singleRecordTypeWithPrefixKey = singleRecordTypeWithPrefixKey(indexes); + final AtomicLong recordCountRef = new AtomicLong(-1); + final Supplier> lazyRecordCount = getAndRememberFutureLong(recordCountRef, + () -> store.getRecordCountForRebuildIndexes(newStore, rebuildRecordCounts, indexes, singleRecordTypeWithPrefixKey)); + AtomicLong recordsSizeRef = new AtomicLong(-1); + final Supplier> lazyRecordsSize = getAndRememberFutureLong(recordsSizeRef, + () -> getRecordSizeForRebuildIndexes(singleRecordTypeWithPrefixKey)); + if (singleRecordTypeWithPrefixKey == null + && store.formatVersion.isAtLeast(FormatVersion.SAVE_UNSPLIT_WITH_SUFFIX) + && store.omitUnsplitRecordSuffix) { + work.add(lazyRecordCount.get().thenAccept(recordCount -> { + if (recordCount == 0) { + if (newStore ? LOGGER.isDebugEnabled() : LOGGER.isInfoEnabled()) { + KeyValueLogMessage msg = KeyValueLogMessage.build("upgrading unsplit format on empty store", + LogMessageKeys.NEW_FORMAT_VERSION, store.formatVersion, + store.subspaceProvider.logKey(), store.subspaceProvider.toString(store.context)); + if (newStore) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug(msg.toString()); + } + } else { + if (LOGGER.isInfoEnabled()) { + LOGGER.info(msg.toString()); + } + } + } + store.omitUnsplitRecordSuffix = !store.formatVersion.isAtLeast(FormatVersion.SAVE_UNSPLIT_WITH_SUFFIX); + info.clearOmitUnsplitRecordSuffix(); + store.addRecordsReadConflict(); + } + })); + } + + Map> newStates = getStatesForRebuildIndexes( + userVersionChecker, indexes, lazyRecordCount, lazyRecordsSize, newStore, oldMetaDataVersion, oldFormatVersion); + return rebuildIndexes(indexes, newStates, work, + newStore ? FDBRecordStore.RebuildIndexReason.NEW_STORE : FDBRecordStore.RebuildIndexReason.FEW_RECORDS, + oldMetaDataVersion).thenRun(() -> { + maybeLogIndexesNeedingRebuilding(newStates, recordCountRef, recordsSizeRef, rebuildRecordCounts, newStore); + store.context.increment(FDBStoreTimer.Counts.INDEXES_NEED_REBUILDING, newStates.entrySet().size()); + }); + } else { + return work.isEmpty() ? AsyncUtil.DONE : AsyncUtil.whenAll(work); + } + } + + private void handleNoLongerUniqueIndex(@Nonnull final RecordMetaData metaData, + @Nonnull final List> work, + @Nonnull final Map> indexesToBuildSince) { + for (Index index : metaData.getAllIndexes()) { + if (!indexesToBuildSince.containsKey(index) && + !index.isUnique()) { + final IndexState indexState = getIndexState(index); + if (indexState == IndexState.READABLE_UNIQUE_PENDING || indexState == IndexState.WRITE_ONLY) { + final CompletableFuture uniquenessFuture = AsyncUtil.getAll(store.getRecordContext().removeCommitChecks( + commitCheck -> { + if (commitCheck instanceof IndexUniquenessCommitCheck) { + return ((IndexUniquenessCommitCheck)commitCheck).getIndexSubspace().equals(store.indexSubspace(index)); + } else { + return false; + } + }, + err -> err instanceof RecordIndexUniquenessViolation)) + .thenCompose(vignore -> store.getIndexMaintainer(index).clearUniquenessViolations()); + if (indexState == IndexState.READABLE_UNIQUE_PENDING) { + work.add(uniquenessFuture + .thenCompose(vignore -> markIndexReadable(index, false)) + .thenApply(vignore2 -> null)); + } else { + work.add(uniquenessFuture); + } + } + } + } + } + + static Supplier> getAndRememberFutureLong(@Nonnull AtomicLong ref, + @Nonnull Supplier> lazyFuture) { + return Suppliers.memoize(() -> lazyFuture.get().whenComplete((val, err) -> { + if (err == null) { + ref.set(val); + } + })); + } + + @Nonnull + @SuppressWarnings({"PMD.EmptyCatchBlock", "PMD.CloseResource"}) + CompletableFuture getRecordCountForRebuildIndexesInternal(boolean newStore, boolean rebuildRecordCounts, + @Nonnull Map> indexes, + @Nullable RecordType singleRecordTypeWithPrefixKey) { + final IndexQueryabilityFilter indexQueryabilityFilter = index -> !indexes.containsKey(index); + if (singleRecordTypeWithPrefixKey != null) { + try { + return store.getSnapshotRecordCountForRecordType(singleRecordTypeWithPrefixKey.getName(), indexQueryabilityFilter); + } catch (RecordCoreException ex) { + // No such index; have to use total record count. + } + } + if (!rebuildRecordCounts) { + try { + return store.getSnapshotRecordCount(EmptyKeyExpression.EMPTY, Key.Evaluated.EMPTY, indexQueryabilityFilter); + } catch (RecordCoreException ex) { + // Probably this was from the lack of appropriate index on count; treat like rebuildRecordCounts = true. + } + } + final ExecuteProperties executeProperties = ExecuteProperties.newBuilder() + .setReturnedRowLimit(1) + .setIsolationLevel(IsolationLevel.SNAPSHOT) + .build(); + final ScanProperties scanProperties = new ScanProperties(executeProperties); + final RecordCursor> records; + if (singleRecordTypeWithPrefixKey == null) { + records = store.scanRecords(null, scanProperties); + } else { + records = store.scanRecords(TupleRange.allOf(singleRecordTypeWithPrefixKey.getRecordTypeKeyTuple()), null, scanProperties); + } + return records.onNext().thenApply(result -> { + if (result.hasNext()) { + if (LOGGER.isInfoEnabled()) { + LOGGER.info(KeyValueLogMessage.of("version check scan found non-empty store", + store.subspaceProvider.logKey(), store.subspaceProvider.toString(store.context))); + } + return Long.MAX_VALUE; + } else { + if (newStore ? LOGGER.isDebugEnabled() : LOGGER.isInfoEnabled()) { + KeyValueLogMessage msg = KeyValueLogMessage.build("version check scan found empty store", + store.subspaceProvider.logKey(), store.subspaceProvider.toString(store.context)); + if (newStore) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug(msg.toString()); + } + } else { + if (LOGGER.isInfoEnabled()) { + LOGGER.info(msg.toString()); + } + } + } + return 0L; + } + }); + } + + @Nonnull + private CompletableFuture getRecordSizeForRebuildIndexes(@Nullable RecordType singleRecordTypeWithPrefixKey) { + if (singleRecordTypeWithPrefixKey == null) { + return store.estimateRecordsSizeAsync(); + } else { + return store.estimateRecordsSizeAsync(TupleRange.allOf(singleRecordTypeWithPrefixKey.getRecordTypeKeyTuple())); + } + } + + @Nullable + @SuppressWarnings("PMD.CompareObjectsWithEquals") + RecordType singleRecordTypeWithPrefixKey(@Nonnull Map> indexes) { + RecordType recordType = null; + for (List entry : indexes.values()) { + Collection types = entry != null ? entry : store.getRecordMetaData().getRecordTypes().values(); + if (types.size() != 1) { + return null; + } + RecordType type1 = entry != null ? entry.get(0) : types.iterator().next(); + if (recordType == null) { + if (!type1.primaryKeyHasRecordTypePrefix()) { + return null; + } + recordType = type1; + } else if (type1 != recordType) { + return null; + } + } + return recordType; + } + + @Nonnull + Map> getStatesForRebuildIndexes( + @Nullable FDBRecordStoreBase.UserVersionChecker userVersionChecker, + @Nonnull Map> indexes, + @Nonnull Supplier> lazyRecordCount, + @Nonnull Supplier> lazyRecordsSize, + boolean newStore, + int oldMetaDataVersion, + int oldFormatVersion) { + Map> newStates = new HashMap<>(); + for (Map.Entry> entry : indexes.entrySet()) { + Index index = entry.getKey(); + List recordTypes = entry.getValue(); + boolean indexOnNewRecordTypes = areAllRecordTypesSince(recordTypes, oldMetaDataVersion); + CompletableFuture stateFuture = userVersionChecker == null ? + lazyRecordCount.get().thenApply(recordCount -> FDBRecordStore.disabledIfTooManyRecordsForRebuild(recordCount, indexOnNewRecordTypes)) : + userVersionChecker.needRebuildIndex(index, lazyRecordCount, lazyRecordsSize, indexOnNewRecordTypes); + if (IndexTypes.VERSION.equals(index.getType()) + && !newStore + && oldFormatVersion < FormatVersion.SAVE_VERSION_WITH_RECORD.getValueForSerialization() + && !store.useOldVersionFormat()) { + stateFuture = stateFuture.thenApply(state -> { + if (IndexState.READABLE.equals(state)) { + return IndexState.DISABLED; + } + return state; + }); + } + newStates.put(index, stateFuture); + } + return newStates; + } + + private void maybeLogIndexesNeedingRebuilding(@Nonnull Map> newStates, + @Nonnull AtomicLong recordCountRef, + @Nonnull AtomicLong recordsSizeRef, + boolean rebuildRecordCounts, + boolean newStore) { + if (LOGGER.isDebugEnabled()) { + KeyValueLogMessage msg = KeyValueLogMessage.build("indexes need rebuilding", + store.subspaceProvider.logKey(), store.subspaceProvider.toString(store.context)); + + long recordCount = recordCountRef.get(); + if (recordCount >= 0L) { + msg.addKeyAndValue(LogMessageKeys.RECORD_COUNT, recordCount == Long.MAX_VALUE ? "unknown" : Long.toString(recordCount)); + } + long recordsSize = recordsSizeRef.get(); + if (recordsSize >= 0L) { + msg.addKeyAndValue(LogMessageKeys.RECORDS_SIZE_ESTIMATE, Long.toString(recordsSize)); + } + + if (rebuildRecordCounts) { + msg.addKeyAndValue(LogMessageKeys.REBUILD_RECORD_COUNTS, "true"); + } + Map> stateNames = new HashMap<>(); + for (Map.Entry> stateEntry : newStates.entrySet()) { + final String stateName; + if (MoreAsyncUtil.isCompletedNormally(stateEntry.getValue())) { + stateName = stateEntry.getValue().join().getLogName(); + } else { + stateName = "UNKNOWN"; + } + stateNames.compute(stateName, (key, names) -> { + if (names == null) { + names = new ArrayList<>(); + } + names.add(stateEntry.getKey().getName()); + return names; + }); + } + msg.addKeysAndValues(stateNames); + if (newStore) { + msg.addKeyAndValue(LogMessageKeys.NEW_STORE, "true"); + } + LOGGER.debug(msg.toString()); + } + } + + // endregion + + // region Record Count Rebuild + + @SuppressWarnings("PMD.CloseResource") + boolean checkPossiblyRebuildRecordCounts(@Nonnull RecordMetaData metaData, + @Nonnull RecordMetaDataProto.DataStoreInfo.Builder info, + @Nonnull List> work, + int oldFormatVersion) { + boolean existingStore = oldFormatVersion > 0; + KeyExpression countKeyExpression = metaData.getRecordCountKey(); + + boolean rebuildRecordCounts = + (existingStore && oldFormatVersion < FormatVersion.RECORD_COUNT_ADDED.getValueForSerialization()) + || (countKeyExpression != null && store.formatVersion.isAtLeast(FormatVersion.RECORD_COUNT_KEY_ADDED) && + (!info.hasRecordCountKey() || !KeyExpression.fromProto(info.getRecordCountKey()).equals(countKeyExpression))) + || (countKeyExpression == null && info.hasRecordCountKey()); + + if (rebuildRecordCounts) { + if (existingStore) { + store.context.clear(store.getSubspace().range(Tuple.from(FDBRecordStore.RECORD_COUNT_KEY))); + } + + if (store.formatVersion.isAtLeast(FormatVersion.RECORD_COUNT_KEY_ADDED)) { + if (countKeyExpression != null) { + info.setRecordCountKey(countKeyExpression.toKeyExpression()); + } else { + info.clearRecordCountKey(); + } + } + + if (existingStore) { + addRebuildRecordCountsJob(work); + } + } + return rebuildRecordCounts; + } + + @SuppressWarnings("PMD.CloseResource") + void addRebuildRecordCountsJob(List> work) { + final KeyExpression recordCountKey = store.getRecordMetaData().getRecordCountKey(); + if (recordCountKey == null || + store.getRecordStoreState().getStoreHeader().getRecordCountState() == RecordMetaDataProto.DataStoreInfo.RecordCountState.DISABLED) { + return; + } + if (LOGGER.isDebugEnabled()) { + LOGGER.debug(KeyValueLogMessage.of("recounting all records", + store.subspaceProvider.logKey(), store.subspaceProvider.toString(store.context))); + } + final Map counts = new HashMap<>(); + final RecordCursor> records = store.scanRecords(null, ScanProperties.FORWARD_SCAN); + CompletableFuture future = records.forEach(rec -> { + Key.Evaluated subkey = recordCountKey.evaluateSingleton(rec); + counts.compute(subkey, (k, v) -> (v == null) ? 1 : v + 1); + }).thenApply(vignore -> { + final Transaction tr = store.ensureContextActive(); + final byte[] bytes = new byte[8]; + final ByteBuffer buf = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN); + for (Map.Entry entry : counts.entrySet()) { + buf.putLong(entry.getValue()); + tr.set(store.getSubspace().pack(Tuple.from(FDBRecordStore.RECORD_COUNT_KEY).addAll(entry.getKey().toTupleAppropriateList())), + bytes); + buf.clear(); + } + return null; + }); + future = store.context.instrument(FDBStoreTimer.Events.RECOUNT_RECORDS, future); + work.add(future); + } + + @SuppressWarnings("PMD.CloseResource") + void addConvertRecordVersions(@Nonnull List> work) { + if (store.useOldVersionFormat()) { + throw new RecordCoreException("attempted to convert record versions when still using older format"); + } + final Subspace legacyVersionSubspace = store.getLegacyVersionSubspace(); + + KeyValueCursor kvCursor = KeyValueCursor.Builder.withSubspace(legacyVersionSubspace) + .setContext(store.getRecordContext()) + .setScanProperties(ScanProperties.FORWARD_SCAN) + .build(); + CompletableFuture workFuture = kvCursor.forEach(kv -> { + final Tuple primaryKey = legacyVersionSubspace.unpack(kv.getKey()); + final FDBRecordVersion version = FDBRecordVersion.fromBytes(kv.getValue(), false); + final byte[] newKeyBytes = store.getSubspace().pack(store.recordVersionKey(primaryKey)); + final byte[] newValueBytes = SplitHelper.packVersion(version); + store.ensureContextActive().set(newKeyBytes, newValueBytes); + }).thenAccept(ignore -> store.ensureContextActive().clear(legacyVersionSubspace.range())); + work.add(workFuture); + } + + // endregion + + // region Helpers + + void logExceptionAsWarn(KeyValueLogMessage message, Throwable exception) { + if (LOGGER.isWarnEnabled()) { + for (Throwable ex = exception; + ex != null; + ex = ex.getCause()) { + if (ex instanceof LoggableException) { + message.addKeysAndValues(((LoggableException)ex).getLogInfo()); + } + } + message.addKeyAndValue(store.subspaceProvider.logKey(), store.subspaceProvider.toString(store.context)); + LOGGER.warn(message.toString(), exception); + } + } + + // endregion +}