Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,7 @@

<dependency groupId="io.micrometer" artifactId="micrometer-core" version="1.5.5"/>
<dependency groupId="org.latencyutils" artifactId="LatencyUtils" version="2.0.3"/>
<dependency groupId="software.amazon.awssdk.crt" artifactId="aws-crt" version="0.45.4" scope="provided"/>
</dependencyManagement>
<developer id="adelapena" name="Andres de la Peña"/>
<developer id="alakshman" name="Avinash Lakshman"/>
Expand Down Expand Up @@ -946,6 +947,7 @@

<dependency groupId="io.micrometer" artifactId="micrometer-core"/>
<dependency groupId="org.latencyutils" artifactId="LatencyUtils"/>
<dependency groupId="software.amazon.awssdk.crt" artifactId="aws-crt" optional="true"/>
</artifact:pom>
</target>

Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/cache/AutoSavingCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public InputStream getInputStream(File dataPath, File crcPath) throws IOExceptio

public OutputStream getOutputStream(File dataPath, File crcPath)
{
return new ChecksummedSequentialWriter(dataPath, crcPath, null, writerOption);
return new ChecksummedSequentialWriter(dataPath, crcPath, writerOption);
}
};

Expand Down
15 changes: 11 additions & 4 deletions src/java/org/apache/cassandra/db/compaction/Verifier.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@

public class Verifier implements Closeable
{
private static final List<Component> DIGEST_COMPONENTS = ImmutableList.of(Component.DIGEST, Component.DIGEST_CRC32C, Component.DIGEST_CRC64NVME);

private final @Nullable CompactionRealm realm;
private final SSTableReader sstable;

Expand Down Expand Up @@ -250,12 +252,17 @@ public void verify()
{
validator = null;

if (sstable.descriptor.fileFor(Component.DIGEST).exists())
for (Component component : DIGEST_COMPONENTS)
{
validator = DataIntegrityMetadata.fileDigestValidator(sstable.descriptor);
validator.validate();
if (sstable.descriptor.fileFor(component).exists())
{
validator = DataIntegrityMetadata.fileDigestValidator(sstable.descriptor, component);
validator.validate();
break;
}
}
else

if (validator == null)
{
outputHandler.output("Data digest missing, assuming extended verification of disk values");
extended = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public final class ComponentManifest implements Iterable<Component>
{
private static final List<Component> STREAM_COMPONENTS = ImmutableList.of(Component.DATA, Component.PRIMARY_INDEX, Component.PARTITION_INDEX, Component.ROW_INDEX,
Component.STATS, Component.COMPRESSION_INFO, Component.FILTER, Component.SUMMARY,
Component.DIGEST, Component.CRC);
Component.DIGEST, Component.DIGEST_CRC32C, Component.DIGEST_CRC64NVME, Component.CRC);

private final LinkedHashMap<Component, Long> components;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import java.util.Optional;
import java.util.zip.CRC32;
import java.util.zip.CheckedInputStream;
import java.util.zip.Checksum;

import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -40,6 +42,7 @@
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.schema.CompressionParams;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.ChecksumType;

import static org.apache.cassandra.utils.Throwables.merge;

Expand Down Expand Up @@ -70,6 +73,8 @@ public class CompressedSequentialWriter extends SequentialWriter

private final int maxCompressedLength;

private final ChecksumType checksumType;

/**
* When corruption is found, the file writer is reset to previous data point but we can't reset the CRC checksum.
* So we have to recompute digest value.
Expand All @@ -82,13 +87,15 @@ public class CompressedSequentialWriter extends SequentialWriter
* @param file File to write
* @param offsetsPath File name to write compression metadata
* @param digestFile File to write digest
* @param checksumType The type of checksum in the digest file
* @param option Write option (buffer size and type will be set the same as compression params)
* @param parameters Compression mparameters
* @param sstableMetadataCollector Metadata collector
*/
public CompressedSequentialWriter(File file,
File offsetsPath,
File digestFile,
ChecksumType checksumType,
SequentialWriterOption option,
CompressionParams parameters,
MetadataCollector sstableMetadataCollector)
Expand All @@ -112,7 +119,18 @@ public CompressedSequentialWriter(File file,
metadataWriter = CompressionMetadata.Writer.open(parameters, offsetsPath);

this.sstableMetadataCollector = sstableMetadataCollector;
crcMetadata = new ChecksumWriter(new DataOutputStream(Channels.newOutputStream(channel)));
this.checksumType = checksumType;
crcMetadata = new ChecksumWriter(new DataOutputStream(Channels.newOutputStream(channel)), checksumType);
}

public CompressedSequentialWriter(File file,
File offsetsPath,
File digestFile,
SequentialWriterOption option,
CompressionParams parameters,
MetadataCollector sstableMetadataCollector)
{
this(file, offsetsPath, digestFile, ChecksumType.CRC32, option, parameters, sstableMetadataCollector);
}

@Override
Expand Down Expand Up @@ -460,7 +478,7 @@ private void maybeWriteChecksum()
try (FileChannel fileChannel = StorageProvider.instance.writeTimeReadFileChannelFor(file);
InputStream stream = Channels.newInputStream(fileChannel))
{
CRC32 checksum = new CRC32();
Checksum checksum = checksumType.newInstance();
try (CheckedInputStream checkedInputStream = new CheckedInputStream(stream, checksum))
{
byte[] chunk = new byte[64 * 1024];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.cassandra.io.util.SequentialWriter;
import org.apache.cassandra.io.util.SequentialWriterOption;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.ChecksumType;
import org.apache.cassandra.utils.PageAware;

/**
Expand Down Expand Up @@ -103,7 +104,7 @@ public EncryptedSequentialWriter(File file,
this.encrypted = BufferType.preferredForCompression().allocate(CHUNK_SIZE);

maxBytesInChunk = buffer.capacity();
crcMetadata = new ChecksumWriter(new DataOutputStream(Channels.newOutputStream(channel)));
crcMetadata = new ChecksumWriter(new DataOutputStream(Channels.newOutputStream(channel)), ChecksumType.CRC32);
}

public static int maxBytesInPage(ICompressor encryptor)
Expand Down
8 changes: 8 additions & 0 deletions src/java/org/apache/cassandra/io/sstable/Component.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ public enum Type
STATS("Statistics.db"),
// holds CRC32 checksum of the data file
DIGEST("Digest.crc32"),
// holds CRC32C checksum of the data file
DIGEST_CRC32C("Digest.crc32c"),
// holds CRC64-NVMe checksum of the data file
DIGEST_CRC64NMVE("Digest.crc64nvme"),
// holds the CRC32 for chunks in an a uncompressed file.
CRC("CRC.db"),
// holds SSTable Index Summary (sampling of Index component)
Expand Down Expand Up @@ -104,6 +108,8 @@ public static Type fromRepresentation(String repr)
public final static Component COMPRESSION_INFO = new Component(Type.COMPRESSION_INFO);
public final static Component STATS = new Component(Type.STATS);
public final static Component DIGEST = new Component(Type.DIGEST);
public final static Component DIGEST_CRC32C = new Component(Type.DIGEST_CRC32C);
public final static Component DIGEST_CRC64NVME = new Component(Type.DIGEST_CRC64NMVE);
public final static Component CRC = new Component(Type.CRC);
public final static Component SUMMARY = new Component(Type.SUMMARY);
public final static Component TOC = new Component(Type.TOC);
Expand Down Expand Up @@ -156,6 +162,8 @@ public static Component parse(String name)
case COMPRESSION_INFO: return Component.COMPRESSION_INFO;
case STATS: return Component.STATS;
case DIGEST: return Component.DIGEST;
case DIGEST_CRC32C: return Component.DIGEST_CRC32C;
case DIGEST_CRC64NMVE: return Component.DIGEST_CRC64NVME;
case CRC: return Component.CRC;
case SUMMARY: return Component.SUMMARY;
case TOC: return Component.TOC;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.Rows;
import org.apache.cassandra.db.rows.Unfiltered;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.guardrails.Guardrails;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.compress.CompressedSequentialWriter;
Expand All @@ -66,13 +67,18 @@
import org.apache.cassandra.schema.CompressionParams;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.TableMetadataRef;
import org.apache.cassandra.utils.ChecksumType;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.IFilter;
import org.apache.cassandra.utils.Throwables;

public abstract class SortedTableWriter extends SSTableWriter
{
protected static final Logger logger = LoggerFactory.getLogger(SortedTableWriter.class);

private static final ChecksumType checksumType = getChecksumType();
private static final Component digestComponent = getDigestComponent();
public static final String SSTABLE_CHECKSUM_TYPE_PROPERTY = "cassandra.sstable_checksum_type";

protected final FileHandle.Builder dbuilder;
protected final SequentialWriter dataFile;
protected DataPosition dataMark;
Expand Down Expand Up @@ -104,6 +110,34 @@ protected SortedTableWriter(Descriptor descriptor,
isInternalKeyspace = SchemaConstants.isInternalKeyspace(metadata.keyspace);
}

private static ChecksumType getChecksumType()
{
String checksumTypeProp = System.getProperty(SSTABLE_CHECKSUM_TYPE_PROPERTY, ChecksumType.CRC32.name());
try
{
return ChecksumType.valueOf(checksumTypeProp.toUpperCase());
}
catch (IllegalArgumentException e)
{
throw new ConfigurationException(String.format("Invalid value for system property 'cassandra.sstable_checksum_type': %s", checksumTypeProp), e);
}
}

private static Component getDigestComponent()
{
switch (checksumType)
{
case CRC32:
return Component.DIGEST;
case CRC32C:
return Component.DIGEST_CRC32C;
case CRC64NVME:
return Component.DIGEST_CRC64NVME;
default:
throw new IllegalStateException("Unexpected checksumType for digest file: " + checksumType);
}
}

protected static SequentialWriter constructDataFileWriter(Descriptor descriptor,
TableMetadataRef metadata,
MetadataCollector metadataCollector,
Expand All @@ -116,7 +150,8 @@ protected static SequentialWriter constructDataFileWriter(Descriptor descriptor,

return new CompressedSequentialWriter(descriptor.fileFor(Component.DATA),
descriptor.fileFor(Component.COMPRESSION_INFO),
descriptor.fileFor(Component.DIGEST),
descriptor.fileFor(digestComponent),
checksumType,
writerOption,
compressionParams,
metadataCollector);
Expand All @@ -125,7 +160,8 @@ protected static SequentialWriter constructDataFileWriter(Descriptor descriptor,
{
return new ChecksummedSequentialWriter(descriptor.fileFor(Component.DATA),
descriptor.fileFor(Component.CRC),
descriptor.fileFor(Component.DIGEST),
descriptor.fileFor(digestComponent),
checksumType,
writerOption);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public class BigFormat implements SSTableFormat
Component.COMPRESSION_INFO,
Component.STATS,
Component.DIGEST,
Component.DIGEST_CRC32C,
Component.DIGEST_CRC64NVME,
Component.CRC,
Component.SUMMARY,
Component.TOC);
Expand All @@ -76,6 +78,8 @@ public class BigFormat implements SSTableFormat
Component.COMPRESSION_INFO,
Component.FILTER,
Component.DIGEST,
Component.DIGEST_CRC32C,
Component.DIGEST_CRC64NVME,
Component.CRC);

private final static Set<Component> PRIMARY_INDEX_COMPONENTS = ImmutableSet.of(Component.PRIMARY_INDEX);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ private static Set<Component> components(TableMetadata metadata, Collection<Comp
Component.STATS,
Component.SUMMARY,
Component.TOC,
Component.DIGEST);
Component.DIGEST,
Component.DIGEST_CRC32C,
Component.DIGEST_CRC64NVME);

if (metadata.params.bloomFilterFpChance < 1.0)
components.add(Component.FILTER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ public class TrieIndexFormat implements SSTableFormat
Component.COMPRESSION_INFO,
Component.STATS,
Component.DIGEST,
Component.DIGEST_CRC32C,
Component.DIGEST_CRC64NVME,
Component.CRC,
Component.TOC);

Expand All @@ -88,6 +90,8 @@ public class TrieIndexFormat implements SSTableFormat
Component.COMPRESSION_INFO,
Component.FILTER,
Component.DIGEST,
Component.DIGEST_CRC32C,
Component.DIGEST_CRC64NVME,
Component.CRC);

private final static Set<Component> PRIMARY_INDEX_COMPONENTS = ImmutableSet.of(Component.PARTITION_INDEX,
Expand Down
10 changes: 6 additions & 4 deletions src/java/org/apache/cassandra/io/util/ChecksumWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,23 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
import javax.annotation.Nonnull;

import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.compress.CompressedSequentialWriter;
import org.apache.cassandra.utils.ChecksumType;

public class ChecksumWriter
{
private final CRC32 incrementalChecksum = new CRC32();
private final Checksum incrementalChecksum = ChecksumType.CRC32.newInstance();
private final DataOutput incrementalOut;
private final CRC32 fullChecksum = new CRC32();
private final Checksum fullChecksum;

public ChecksumWriter(DataOutput incrementalOut)
public ChecksumWriter(DataOutput incrementalOut, ChecksumType fullChecksumType)
{
this.incrementalOut = incrementalOut;
this.fullChecksum = fullChecksumType.newInstance();
}

public void writeChunkSize(int length)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.nio.ByteBuffer;
import java.util.Optional;

import org.apache.cassandra.utils.ChecksumType;

public class ChecksummedSequentialWriter extends SequentialWriter
{
private static final SequentialWriterOption CRC_WRITER_OPTION = SequentialWriterOption.newBuilder()
Expand All @@ -30,15 +32,20 @@ public class ChecksummedSequentialWriter extends SequentialWriter
private final ChecksumWriter crcMetadata;
private final Optional<File> digestFile;

public ChecksummedSequentialWriter(File file, File crcPath, File digestFile, SequentialWriterOption option)
public ChecksummedSequentialWriter(File file, File crcPath, File digestFile, ChecksumType checksumType, SequentialWriterOption option)
{
super(file, option);
crcWriter = new SequentialWriter(crcPath, CRC_WRITER_OPTION);
crcMetadata = new ChecksumWriter(crcWriter);
crcMetadata = new ChecksumWriter(crcWriter, checksumType);
crcMetadata.writeChunkSize(buffer.capacity());
this.digestFile = Optional.ofNullable(digestFile);
}

public ChecksummedSequentialWriter(File file, File crcPath, SequentialWriterOption option)
{
this(file, crcPath, null, ChecksumType.CRC32, option);
}

@Override
protected void flushData()
{
Expand Down
Loading