Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static java.lang.Math.toIntExact;
import static java.nio.file.StandardOpenOption.CREATE_NEW;
import static java.nio.file.StandardOpenOption.WRITE;
import static org.apache.ignite.internal.raft.storage.segstore.SegmentFileManager.GROUP_DESTROY_LOG_INDEX;
import static org.apache.ignite.internal.util.IgniteUtils.atomicMoveFile;
import static org.apache.ignite.internal.util.IgniteUtils.fsyncFile;

Expand Down Expand Up @@ -137,7 +138,6 @@ class IndexFileManager {
/**
* Index file metadata grouped by Raft Group ID.
*/
// FIXME: This map is never cleaned up, see https://issues.apache.org/jira/browse/IGNITE-27926.
private final Map<Long, GroupIndexMeta> groupIndexMetas = new ConcurrentHashMap<>();

IndexFileManager(Path baseDir) throws IOException {
Expand Down Expand Up @@ -430,6 +430,10 @@ private static FileHeaderWithIndexMetas serializeHeaderAndFillMetadata(
return null;
}

if (firstIndexKept == GROUP_DESTROY_LOG_INDEX) {
return null;
}

if (firstIndexKept == -1 || firstIndexKept <= firstLogIndexInclusive) {
// No prefix truncation required, simply create a new meta.
return new IndexFileMeta(firstLogIndexInclusive, lastLogIndexExclusive, payloadOffset, fileProperties);
Expand All @@ -444,13 +448,19 @@ private static FileHeaderWithIndexMetas serializeHeaderAndFillMetadata(
}

private void putIndexFileMeta(IndexMetaSpec metaSpec) {
IndexFileMeta indexFileMeta = metaSpec.indexFileMeta();

// Using boxed value to avoid unnecessary autoboxing later.
Long groupId = metaSpec.groupId();

long firstIndexKept = metaSpec.firstIndexKept();

if (firstIndexKept == GROUP_DESTROY_LOG_INDEX) {
groupIndexMetas.remove(groupId);

return;
}
Comment thread
sashapolo marked this conversation as resolved.

IndexFileMeta indexFileMeta = metaSpec.indexFileMeta();

GroupIndexMeta existingGroupIndexMeta = groupIndexMetas.get(groupId);

if (existingGroupIndexMeta == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,13 @@ class SegmentFileManager implements ManuallyCloseable {
*/
static final byte[] SWITCH_SEGMENT_RECORD = new byte[8]; // 8 zero bytes.

/**
* Special "destroy group" sentinel value for the log reset index.
*
* <p>Must not overlap with a valid stored log index.
*/
static final long GROUP_DESTROY_LOG_INDEX = Long.MIN_VALUE;

private final String storageName;

private final Path segmentFilesDir;
Expand Down Expand Up @@ -422,6 +429,14 @@ void reset(long groupId, long nextLogIndex) throws IOException {
}
}

/**
* Destroys all log data for the given group. Writes a tombstone using {@link #GROUP_DESTROY_LOG_INDEX} so that the GC can discard
* the group's entries on the next compaction pass.
*/
void destroyGroup(long groupId) throws IOException {
reset(groupId, GROUP_DESTROY_LOG_INDEX);
}

private WriteBufferWithMemtable reserveBytesWithRollover(int size) throws IOException {
while (true) {
SegmentFileWithMemtable segmentFileWithMemtable = currentSegmentFile();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,11 @@ public LogStorage createLogStorage(String raftNodeStorageId, RaftOptions raftOpt

@Override
public void destroyLogStorage(String raftNodeStorageId) {
// TODO IGNITE-28527 Implement.
try {
fileManager.destroyGroup(convertNodeId(raftNodeStorageId));
} catch (IOException e) {
throw new LogStorageException(String.format("Failed to destroy log storage [storageId=%s]", raftNodeStorageId), e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.ignite.internal.raft.storage.segstore;

import static org.apache.ignite.internal.raft.storage.segstore.SegmentFileManager.GROUP_DESTROY_LOG_INDEX;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
Expand Down Expand Up @@ -644,4 +645,54 @@ void testGetSegmentFilePointerReturnsNullForEmptyMetaRange() throws IOException

assertThat(indexFileManager.getSegmentFilePointer(1, 2), is(nullValue()));
}

@Test
void testDestroyGroupSoleTombstoneClearsIndexMetaOnCheckpoint() throws IOException {
var memtable = new SingleThreadMemTable();
memtable.appendSegmentFileOffset(0, 1, 10);
memtable.appendSegmentFileOffset(1, 1, 20);
indexFileManager.saveNewIndexMemtable(memtable);

assertThat(indexFileManager.firstLogIndexInclusive(0), is(1L));
assertThat(indexFileManager.firstLogIndexInclusive(1), is(1L));

// Sole tombstone: the segment file had no entries for group 0, only the destroy tombstone.
var destroyMemtable = new SingleThreadMemTable();
destroyMemtable.reset(0, GROUP_DESTROY_LOG_INDEX);
indexFileManager.saveNewIndexMemtable(destroyMemtable);

assertThat(indexFileManager.getSegmentFilePointer(0, 1), is(nullValue()));
assertThat(indexFileManager.firstLogIndexInclusive(0), is(-1L));
assertThat(indexFileManager.lastLogIndexExclusive(0), is(-1L));

// Group 1 must be unaffected.
assertThat(indexFileManager.getSegmentFilePointer(1, 1), is(new SegmentFilePointer(new FileProperties(0), 20)));
assertThat(indexFileManager.firstLogIndexInclusive(1), is(1L));
}

@Test
void testDestroyGroupWithPriorDataClearsIndexMetaOnCheckpoint() throws IOException {
var memtable = new SingleThreadMemTable();
memtable.appendSegmentFileOffset(0, 1, 10);
memtable.appendSegmentFileOffset(1, 1, 20);
indexFileManager.saveNewIndexMemtable(memtable);

assertThat(indexFileManager.firstLogIndexInclusive(0), is(1L));

// Group 0 writes more entries in the next segment, then gets destroyed in the same segment.
var destroyMemtable = new SingleThreadMemTable();
destroyMemtable.appendSegmentFileOffset(0, 2, 100);
destroyMemtable.appendSegmentFileOffset(0, 3, 200);
destroyMemtable.reset(0, GROUP_DESTROY_LOG_INDEX);
indexFileManager.saveNewIndexMemtable(destroyMemtable);

assertThat(indexFileManager.getSegmentFilePointer(0, 1), is(nullValue()));
assertThat(indexFileManager.getSegmentFilePointer(0, 2), is(nullValue()));
assertThat(indexFileManager.firstLogIndexInclusive(0), is(-1L));
assertThat(indexFileManager.lastLogIndexExclusive(0), is(-1L));

// Group 1 must be unaffected.
assertThat(indexFileManager.getSegmentFilePointer(1, 1), is(new SegmentFilePointer(new FileProperties(0), 20)));
assertThat(indexFileManager.firstLogIndexInclusive(1), is(1L));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.not;
import static org.junit.jupiter.api.Assertions.fail;

import java.io.IOException;
import java.io.UncheckedIOException;
Expand Down Expand Up @@ -646,6 +647,100 @@ void testCompactionOfFileAdjacentToStaleEntryInDequeCausesCorruption() throws Ex
});
}

/**
* Verifies that after {@link SegmentFileManager#destroyGroup} is called, the GC can remove segment files belonging
* to the destroyed group.
*/
@Test
void testSegmentFilesRemovedByGcAfterGroupDestroy() throws Exception {
List<byte[]> batches = createRandomData(FILE_SIZE / 4, 10);

for (int i = 0; i < batches.size(); i++) {
appendBytes(GROUP_ID_1, batches.get(i), i);
}

await().until(this::indexFiles, hasSize(equalTo(segmentFiles().size() - 1)));

List<Path> oldSegmentFiles = segmentFiles().subList(0, segmentFiles().size() - 1);
List<Path> oldIndexFiles = indexFiles();

fileManager.destroyGroup(GROUP_ID_1);

// The destroy tombstone is in the current segment file's memtable. Trigger a rollover using a different group so the
// checkpoint thread processes the tombstone and removes GROUP_ID_1 from the in-memory index.
triggerAndAwaitCheckpoint(GROUP_ID_2, 0);

Comment thread
sashapolo marked this conversation as resolved.
for (Path segmentFile : oldSegmentFiles) {
runCompaction(segmentFile);
}

for (Path segmentFile : oldSegmentFiles) {
assertThat(segmentFile, not(exists()));
}

for (Path indexFile : oldIndexFiles) {
assertThat(indexFile, not(exists()));
}

for (int i = 0; i < batches.size(); i++) {
int index = i;

fileManager.getEntry(GROUP_ID_1, index, bs -> {
fail("Entry for index " + index + " must be missing");

return null;
});
}
}

/**
* Verifies that when a group is destroyed while another group shares the same segment files, GC compacts each shared file by dropping
* only the destroyed group's entries while preserving the surviving group's entries.
*/
@Test
void testSegmentFilesCompactedByGcAfterGroupDestroyWithSurvivingGroup() throws Exception {
List<byte[]> batches = createRandomData(FILE_SIZE / 4, 10);

for (int i = 0; i < batches.size(); i++) {
appendBytes(GROUP_ID_1, batches.get(i), i);
appendBytes(GROUP_ID_2, batches.get(i), i);
}

await().until(this::indexFiles, hasSize(equalTo(segmentFiles().size() - 1)));

List<Path> oldSegmentFiles = segmentFiles().subList(0, segmentFiles().size() - 1);

// Destroy one group while the other remains active.
fileManager.destroyGroup(GROUP_ID_1);

// Trigger a rollover using GROUP_ID_2 so the checkpoint thread processes the tombstone and removes GROUP_ID_1 from
// the in-memory index, enabling GC to compact the shared segment files.
triggerAndAwaitCheckpoint(GROUP_ID_2, batches.size() - 1);

for (Path segmentFile : oldSegmentFiles) {
FileProperties originalProperties = SegmentFile.fileProperties(segmentFile);

runCompaction(segmentFile);

// Original file must have been replaced by a compacted generation (GROUP_ID_2 entries survive).
assertThat(segmentFile, not(exists()));

var newFileProperties = new FileProperties(originalProperties.ordinal(), originalProperties.generation() + 1);

assertThat(fileManager.segmentFilesDir().resolve(SegmentFile.fileName(newFileProperties)), exists());
}

// GROUP_ID_2 entries must all still be readable.
for (int i = 0; i < batches.size(); i++) {
int index = i;

fileManager.getEntry(GROUP_ID_2, i, bs -> {
assertThat(bs, is(batches.get(index)));
return null;
});
}
}

@Test
void testLogSizeBytesInitializedCorrectlyOnStartup() throws Exception {
// Fill multiple segment files to create both segment and index files.
Expand Down Expand Up @@ -717,13 +812,17 @@ public int size(LogEntry logEntry) {
}

private void triggerAndAwaitCheckpoint(long lastGroupIndex) throws IOException {
triggerAndAwaitCheckpoint(GROUP_ID_1, lastGroupIndex);
}

private void triggerAndAwaitCheckpoint(long groupId, long lastGroupIndex) throws IOException {
List<Path> segmentFilesBeforeCheckpoint = segmentFiles();

// Insert some entries to trigger a rollover (and a checkpoint).
List<byte[]> batches = createRandomData(FILE_SIZE / 4, 5);

for (int i = 0; i < batches.size(); i++) {
appendBytes(GROUP_ID_1, batches.get(i), lastGroupIndex + i + 1);
appendBytes(groupId, batches.get(i), lastGroupIndex + i + 1);
}

List<Path> segmentFilesAfterCheckpoint = segmentFiles();
Expand Down