Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -165,7 +166,8 @@ public void newRegionMustNotBeAllocatedOnRemovingDataNodeTest() throws Exception
// kill -9 the target DataNode so that it becomes Unknown (this is the exact condition under
// which the failure detector overrides the Removing status back to Unknown).
final List<DataNodeWrapper> removeDataNodeWrappers =
List.of(EnvFactory.getEnv().dataNodeIdToWrapper(removeDataNodeId).get());
Collections.singletonList(
EnvFactory.getEnv().dataNodeIdToWrapper(removeDataNodeId).get());
stopDataNodes(removeDataNodeWrappers);
LOGGER.info("DataNode {} is stopped.", removeDataNodeId);
} catch (InconsistentDataException e) {
Expand Down Expand Up @@ -201,7 +203,8 @@ public void newRegionMustNotBeAllocatedOnRemovingDataNodeTest() throws Exception
// this point belongs to the allocation we are about to force.
final Set<Integer> preExistingRegionIds = new HashSet<>(getAllRegionMap(statement).keySet());

final String removeDataNodeSQL = generateRemoveString(Set.of(removeDataNodeId));
final String removeDataNodeSQL =
generateRemoveString(Collections.singleton(removeDataNodeId));
LOGGER.info("Submitting: {}", removeDataNodeSQL);
statement.execute(removeDataNodeSQL);
LOGGER.info("Remove DataNode {} submitted.", removeDataNodeId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@

public class PipeTransferTabletBatchReq extends TPipeTransferReq {

private final transient List<PipeTransferTabletBinaryReq> binaryReqs = new ArrayList<>();
private final transient List<PipeTransferTabletInsertNodeReq> insertNodeReqs = new ArrayList<>();
private final transient List<PipeTransferTabletRawReq> tabletReqs = new ArrayList<>();

Expand All @@ -60,6 +61,26 @@ public Pair<InsertRowsStatement, InsertMultiTabletsStatement> constructStatement
final List<InsertRowStatement> insertRowStatementList = new ArrayList<>();
final List<InsertTabletStatement> insertTabletStatementList = new ArrayList<>();

for (final PipeTransferTabletBinaryReq binaryReq : binaryReqs) {
final InsertBaseStatement statement = binaryReq.constructStatement();
if (statement.isEmpty()) {
continue;
}
if (statement instanceof InsertRowStatement) {
insertRowStatementList.add((InsertRowStatement) statement);
} else if (statement instanceof InsertTabletStatement) {
insertTabletStatementList.add((InsertTabletStatement) statement);
} else if (statement instanceof InsertRowsStatement) {
insertRowStatementList.addAll(
((InsertRowsStatement) statement).getInsertRowStatementList());
} else {
throw new UnsupportedOperationException(
String.format(
"Unknown InsertBaseStatement %s constructed from PipeTransferTabletBinaryReq.",
statement));
}
}

for (final PipeTransferTabletInsertNodeReq insertNodeReq : insertNodeReqs) {
final InsertBaseStatement statement = insertNodeReq.constructStatement();
if (statement.isEmpty()) {
Expand Down Expand Up @@ -132,19 +153,52 @@ public static PipeTransferTabletBatchReq fromTPipeTransferReq(
final PipeTransferTabletBatchReq batchReq = new PipeTransferTabletBatchReq();
final TabletStringInternPool tabletStringInternPool = new TabletStringInternPool();

// Binary size, for rolling upgrade
ReadWriteIOUtils.readInt(transferReq.body);
int size = ReadWriteIOUtils.readInt(transferReq.body);
// Legacy 1.3.x batch bodies may carry WAL binary requests before insert nodes and tablets.
int size = readNonNegativeSize(transferReq.body, "binary request count");
for (int i = 0; i < size; ++i) {
batchReq.insertNodeReqs.add(
PipeTransferTabletInsertNodeReq.toTPipeTransferRawReq(
(InsertNode) PlanFragment.deserializeHelper(transferReq.body, null)));
final int length = readNonNegativeSize(transferReq.body, "binary request body length");
if (length > transferReq.body.remaining()) {
throw new IllegalArgumentException(
String.format(
"Invalid binary request body length %s, remaining body length %s.",
length, transferReq.body.remaining()));
}
final byte[] body = new byte[length];
transferReq.body.get(body);
batchReq.binaryReqs.add(
PipeTransferTabletBinaryReq.toTPipeTransferReq(ByteBuffer.wrap(body)));
}

size = ReadWriteIOUtils.readInt(transferReq.body);
size = readNonNegativeSize(transferReq.body, "insert node count");
for (int i = 0; i < size; ++i) {
batchReq.tabletReqs.add(
PipeTransferTabletRawReq.toTPipeTransferRawReq(transferReq.body, tabletStringInternPool));
final int startPosition = transferReq.body.position();
try {
batchReq.insertNodeReqs.add(
PipeTransferTabletInsertNodeReq.toTPipeTransferRawReq(
(InsertNode) PlanFragment.deserializeHelper(transferReq.body, null)));
} catch (final RuntimeException e) {
throw new IllegalArgumentException(
String.format(
"Failed to deserialize insert node %s/%s in tablet batch at body position %s with remaining body length %s.",
i + 1, size, startPosition, transferReq.body.remaining()),
e);
}
}

size = readNonNegativeSize(transferReq.body, "raw tablet count");
for (int i = 0; i < size; ++i) {
final int startPosition = transferReq.body.position();
try {
batchReq.tabletReqs.add(
PipeTransferTabletRawReq.toTPipeTransferRawReq(
transferReq.body, tabletStringInternPool));
} catch (final RuntimeException e) {
throw new IllegalArgumentException(
String.format(
"Failed to deserialize raw tablet %s/%s in tablet batch at body position %s with remaining body length %s.",
i + 1, size, startPosition, transferReq.body.remaining()),
e);
}
}

batchReq.version = transferReq.version;
Expand All @@ -153,8 +207,29 @@ public static PipeTransferTabletBatchReq fromTPipeTransferReq(
return batchReq;
}

private static int readNonNegativeSize(final ByteBuffer buffer, final String fieldName) {
if (buffer.remaining() < Integer.BYTES) {
throw new IllegalArgumentException(
String.format(
"Insufficient bytes to read %s in tablet batch, remaining body length %s.",
fieldName, buffer.remaining()));
}

final int size = ReadWriteIOUtils.readInt(buffer);
if (size < 0) {
throw new IllegalArgumentException(
String.format("Invalid negative %s %s in tablet batch.", fieldName, size));
}
return size;
}

/////////////////////////////// TestOnly ///////////////////////////////

@TestOnly
public List<PipeTransferTabletBinaryReq> getBinaryReqs() {
return binaryReqs;
}

@TestOnly
public List<PipeTransferTabletInsertNodeReq> getInsertNodeReqs() {
return insertNodeReqs;
Expand All @@ -176,7 +251,8 @@ public boolean equals(final Object obj) {
return false;
}
final PipeTransferTabletBatchReq that = (PipeTransferTabletBatchReq) obj;
return insertNodeReqs.equals(that.insertNodeReqs)
return binaryReqs.equals(that.binaryReqs)
&& insertNodeReqs.equals(that.insertNodeReqs)
&& tabletReqs.equals(that.tabletReqs)
&& version == that.version
&& type == that.type
Expand All @@ -185,6 +261,6 @@ public boolean equals(final Object obj) {

@Override
public int hashCode() {
return Objects.hash(insertNodeReqs, tabletReqs, version, type, body);
return Objects.hash(binaryReqs, insertNodeReqs, tabletReqs, version, type, body);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,17 @@ private void deserializeTPipeTransferRawReq(
buffer.position(startPosition);
}

tablet = PipeTabletUtils.internTablet(Tablet.deserialize(buffer), tabletStringInternPool);
isAligned = ReadWriteIOUtils.readBool(buffer);
try {
tablet = PipeTabletUtils.internTablet(Tablet.deserialize(buffer), tabletStringInternPool);
isAligned = ReadWriteIOUtils.readBool(buffer);
} catch (final RuntimeException e) {
buffer.position(startPosition);
throw new IllegalArgumentException(
String.format(
"Failed to deserialize raw tablet request at body position %s with remaining body length %s.",
startPosition, buffer.remaining()),
e);
}
}

private static void ensureStatementDeserializedFromCurrentTabletFormat(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.utils.BytesUtils;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.apache.tsfile.write.UnSupportedDataTypeException;
Expand Down Expand Up @@ -116,12 +115,19 @@ private static InsertTabletStatement deserializeStatementFromTabletFormat(
intern(ReadWriteIOUtils.readString(byteBuffer), tabletStringInternPool);

final int rowSize = ReadWriteIOUtils.readInt(byteBuffer);
if (rowSize < 0) {
throw new IllegalArgumentException(
String.format("Invalid row size %s in tablet format deserialization.", rowSize));
}

// deserialize schemas
final int schemaSize =
BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer))
? ReadWriteIOUtils.readInt(byteBuffer)
: 0;
readBooleanByte(byteBuffer, "schema existence") ? ReadWriteIOUtils.readInt(byteBuffer) : 0;
if (schemaSize < 0) {
throw new IllegalArgumentException(
String.format("Invalid schema size %s in tablet format deserialization.", schemaSize));
}
ensureRemaining(byteBuffer, schemaSize, "measurement schema existence flags");
final String[] measurement = new String[schemaSize];
final TsTableColumnCategory[] columnCategories = new TsTableColumnCategory[schemaSize];
final TSDataType[] dataTypes = new TSDataType[schemaSize];
Expand All @@ -148,15 +154,26 @@ private static InsertTabletStatement deserializeStatementFromTabletFormat(

// Deserialize and calculate memory in the same loop
for (int i = 0; i < schemaSize; i++) {
final boolean hasSchema = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
final boolean hasSchema = readBooleanByte(byteBuffer, "measurement schema existence");
if (hasSchema) {
final Pair<String, TSDataType> pair = readMeasurement(byteBuffer, tabletStringInternPool);
measurement[i] = pair.getLeft();
dataTypes[i] = pair.getRight();
if (readColumnCategory) {
if (!byteBuffer.hasRemaining()) {
throw new IllegalArgumentException(
"Missing column category in current tablet format deserialization.");
}
final byte columnCategory = byteBuffer.get();
if (columnCategory < 0 || columnCategory >= ColumnCategory.values().length) {
throw new IllegalArgumentException(
String.format(
"Invalid column category %s in current tablet format deserialization.",
columnCategory));
}
columnCategories[i] =
TsTableColumnCategory.fromTsFileColumnCategory(
ColumnCategory.values()[byteBuffer.get()]);
ColumnCategory.values()[columnCategory]);
}

// Calculate memory for each measurement string
Expand All @@ -178,14 +195,22 @@ private static InsertTabletStatement deserializeStatementFromTabletFormat(
memorySize += measurementMemorySize;
memorySize += dataTypesMemorySize;

final boolean isTimesNotNull = readBooleanByte(byteBuffer, "timestamp column existence");
if (rowSize > 0 && !isTimesNotNull) {
throw new IllegalArgumentException(
"Missing timestamps in tablet format deserialization with non-empty rows.");
}
if (isTimesNotNull) {
ensureRemaining(byteBuffer, (long) Long.BYTES * rowSize, "timestamps");
}

// deserialize times and calculate memory during deserialization
final long[] times = new long[rowSize];
// Calculate memory: array header + long size * rowSize
final long timesMemorySize =
org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize(
NUM_BYTES_ARRAY_HEADER + (long) Long.BYTES * rowSize);

final boolean isTimesNotNull = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
if (isTimesNotNull) {
for (int i = 0; i < rowSize; i++) {
times[i] = ReadWriteIOUtils.readLong(byteBuffer);
Expand All @@ -199,7 +224,7 @@ private static InsertTabletStatement deserializeStatementFromTabletFormat(
final BitMap[] bitMaps;
final long bitMapsMemorySize;

final boolean isBitMapsNotNull = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
final boolean isBitMapsNotNull = readBooleanByte(byteBuffer, "bitmap column existence");
if (isBitMapsNotNull) {
// Use the method that returns both BitMap array and memory size
final Pair<BitMap[], Long> bitMapsAndMemory =
Expand All @@ -218,7 +243,11 @@ private static InsertTabletStatement deserializeStatementFromTabletFormat(
final Object[] values;
final long valuesMemorySize;

final boolean isValuesNotNull = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
final boolean isValuesNotNull = readBooleanByte(byteBuffer, "value column existence");
if (rowSize > 0 && schemaSize > 0 && !isValuesNotNull) {
throw new IllegalArgumentException(
"Missing values in tablet format deserialization with non-empty rows.");
}
if (isValuesNotNull) {
// Use the method that returns both values array and memory size
final Pair<Object[], Long> valuesAndMemory =
Expand All @@ -236,7 +265,7 @@ private static InsertTabletStatement deserializeStatementFromTabletFormat(
// Add values memory to total
memorySize += valuesMemorySize;

final boolean isAligned = ReadWriteIOUtils.readBoolean(byteBuffer);
final boolean isAligned = readBooleanByte(byteBuffer, "alignment");

statement.setMeasurements(measurement);
statement.setTimes(times);
Expand Down Expand Up @@ -321,6 +350,30 @@ private static void skipString(final ByteBuffer buffer) {
}
}

private static boolean readBooleanByte(final ByteBuffer buffer, final String fieldName) {
if (!buffer.hasRemaining()) {
throw new IllegalArgumentException(
String.format("Missing %s flag in tablet format deserialization.", fieldName));
}

final byte value = ReadWriteIOUtils.readByte(buffer);
if (value != 0 && value != 1) {
throw new IllegalArgumentException(
String.format("Invalid %s flag %s in tablet format deserialization.", fieldName, value));
}
return value == 1;
}

private static void ensureRemaining(
final ByteBuffer buffer, final long expectedSize, final String fieldName) {
if (expectedSize > buffer.remaining()) {
throw new IllegalArgumentException(
String.format(
"Insufficient bytes for %s in tablet format deserialization, expected %s, remaining %s.",
fieldName, expectedSize, buffer.remaining()));
}
}

/**
* Read measurement name and data type from buffer, skipping other measurement schema fields
* (encoding, compression, and tags/attributes) that are not needed for InsertTabletStatement.
Expand Down Expand Up @@ -364,9 +417,13 @@ private static Pair<BitMap[], Long> readBitMapsFromBufferWithMemory(
boolean hasMarkedBitMap = false;

for (int i = 0; i < columns; i++) {
final boolean hasBitMap = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
final boolean hasBitMap = readBooleanByte(byteBuffer, "bitmap existence");
if (hasBitMap) {
final int size = ReadWriteIOUtils.readInt(byteBuffer);
if (size < 0) {
throw new IllegalArgumentException(
String.format("Invalid bitmap size %s in tablet format deserialization.", size));
}
final Binary valueBinary = ReadWriteIOUtils.readBinary(byteBuffer);
final byte[] byteArray = valueBinary.getValues();
final BitMap bitMap = new BitMap(size, byteArray);
Expand Down Expand Up @@ -416,9 +473,8 @@ private static Pair<Object[], Long> readValuesFromBufferWithMemory(
NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * columns);

for (int i = 0; i < columns; i++) {
final boolean isValueColumnsNotNull =
BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
if (isValueColumnsNotNull && types[i] == null) {
final boolean isValueColumnsNotNull = readBooleanByte(byteBuffer, "value column existence");
if (types[i] == null) {
continue;
}

Expand All @@ -427,7 +483,7 @@ private static Pair<Object[], Long> readValuesFromBufferWithMemory(
final boolean[] boolValues = new boolean[rowSize];
if (isValueColumnsNotNull) {
for (int index = 0; index < rowSize; index++) {
boolValues[index] = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
boolValues[index] = readBooleanByte(byteBuffer, "boolean value");
}
}
values[i] = boolValues;
Expand Down Expand Up @@ -503,8 +559,7 @@ private static Pair<Object[], Long> readValuesFromBufferWithMemory(

if (isValueColumnsNotNull) {
for (int index = 0; index < rowSize; index++) {
final boolean isNotNull =
BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
final boolean isNotNull = readBooleanByte(byteBuffer, "binary value existence");
if (isNotNull) {
binaryValues[index] = ReadWriteIOUtils.readBinary(byteBuffer);
// Calculate memory for each Binary object during deserialization
Expand Down
Loading
Loading