Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -478,15 +478,19 @@ public List<PlanNode> getChildren() {
public String[] getRawMeasurements() {
String[] measurements = getMeasurements();
MeasurementSchema[] measurementSchemas = getMeasurementSchemas();
String[] rawMeasurements = new String[measurements.length];
String[] rawMeasurements = measurements;
for (int i = 0; i < measurements.length; i++) {
if (measurementSchemas != null
&& i < measurementSchemas.length
&& measurementSchemas[i] != null) {
// get raw measurement rather than alias
rawMeasurements[i] = measurementSchemas[i].getMeasurementName();
} else {
rawMeasurements[i] = measurements[i];
String rawMeasurement = measurementSchemas[i].getMeasurementName();
if (!Objects.equals(rawMeasurement, measurements[i])) {
if (rawMeasurements == measurements) {
rawMeasurements = Arrays.copyOf(measurements, measurements.length);
}
rawMeasurements[i] = rawMeasurement;
}
}
}
return rawMeasurements;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1026,16 +1026,15 @@ private boolean canComposeTimeValuePair(final int columnIndex) {
}

public void updateLastCache(String databaseName) {
String[] rawMeasurements = getRawMeasurements();
TimeValuePair[] timeValuePairs = new TimeValuePair[rawMeasurements.length];
for (int i = 0; i < rawMeasurements.length; i++) {
TimeValuePair[] timeValuePairs = new TimeValuePair[measurements.length];
for (int i = 0; i < measurements.length; i++) {
timeValuePairs[i] = composeTimeValuePair(i);
}
TreeDeviceSchemaCacheManager.getInstance()
.updateLastCacheIfExists(
databaseName,
getDeviceID(),
rawMeasurements,
measurements,
timeValuePairs,
isAligned,
measurementSchemas);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1570,16 +1570,15 @@ public void updateLastCache(final String databaseName) {
}

public void updateLastCache(final String databaseName, final TSStatus[] results) {
final String[] rawMeasurements = getRawMeasurements();
final TimeValuePair[] timeValuePairs = new TimeValuePair[rawMeasurements.length];
for (int i = 0; i < rawMeasurements.length; i++) {
final TimeValuePair[] timeValuePairs = new TimeValuePair[measurements.length];
for (int i = 0; i < measurements.length; i++) {
timeValuePairs[i] = composeLastTimeValuePair(i, results, 0, rowCount);
}
TreeDeviceSchemaCacheManager.getInstance()
.updateLastCacheIfExists(
databaseName,
getDeviceID(),
rawMeasurements,
measurements,
timeValuePairs,
isAligned,
measurementSchemas);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,13 +281,13 @@ protected PartialPath readTargetPath(DataInputStream stream)

@Override
public void updateLastCache(String databaseName) {
String[] rawMeasurements = getRawMeasurements();
TimeValuePair[] timeValuePairs = new TimeValuePair[rawMeasurements.length];
for (int i = 0; i < rawMeasurements.length; i++) {
TimeValuePair[] timeValuePairs = new TimeValuePair[measurements.length];
for (int i = 0; i < measurements.length; i++) {
timeValuePairs[i] = composeTimeValuePair(i);
}
TableDeviceSchemaCache.getInstance()
.updateLastCacheIfExists(databaseName, getDeviceID(), rawMeasurements, timeValuePairs);
.updateLastCacheIfExists(
databaseName, getDeviceID(), measurements, measurementSchemas, timeValuePairs);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,20 +444,19 @@ public void updateLastCache(final String databaseName) {
}

public void updateLastCache(final String databaseName, final TSStatus[] results) {
final String[] rawMeasurements = getRawMeasurements();

final List<Pair<IDeviceID, Integer>> deviceEndOffsetPairs = splitByDevice(0, rowCount);
int startOffset = 0;
for (final Pair<IDeviceID, Integer> deviceEndOffsetPair : deviceEndOffsetPairs) {
final IDeviceID deviceID = deviceEndOffsetPair.getLeft();
final int endOffset = deviceEndOffsetPair.getRight();

final TimeValuePair[] timeValuePairs = new TimeValuePair[rawMeasurements.length];
for (int i = 0; i < rawMeasurements.length; i++) {
final TimeValuePair[] timeValuePairs = new TimeValuePair[measurements.length];
for (int i = 0; i < measurements.length; i++) {
timeValuePairs[i] = composeLastTimeValuePair(i, results, startOffset, endOffset);
}
TableDeviceSchemaCache.getInstance()
.updateLastCacheIfExists(databaseName, deviceID, rawMeasurements, timeValuePairs);
.updateLastCacheIfExists(
databaseName, deviceID, measurements, measurementSchemas, timeValuePairs);

startOffset = endOffset;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.tsfile.write.schema.IMeasurementSchema;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;

import java.util.Collections;
Expand Down Expand Up @@ -186,16 +187,33 @@ int initOrInvalidateLastCache(

int tryUpdateLastCache(
final String[] measurements, final TimeValuePair[] timeValuePairs, boolean invalidateNull) {
return tryUpdateLastCache(measurements, null, timeValuePairs, invalidateNull);
}

int tryUpdateLastCache(
final String[] measurements,
final @Nullable IMeasurementSchema[] measurementSchemas,
final TimeValuePair[] timeValuePairs,
boolean invalidateNull) {
final TableDeviceLastCache cache = lastCache.get();
final int result =
Objects.nonNull(cache) ? cache.tryUpdate(measurements, timeValuePairs, invalidateNull) : 0;
Objects.nonNull(cache)
? cache.tryUpdate(measurements, measurementSchemas, timeValuePairs, invalidateNull)
: 0;
return Objects.nonNull(lastCache.get()) ? result : 0;
}

int tryUpdateLastCache(final String[] measurements, final TimeValuePair[] timeValuePairs) {
return tryUpdateLastCache(measurements, timeValuePairs, false);
}

int tryUpdateLastCache(
final String[] measurements,
final @Nullable IMeasurementSchema[] measurementSchemas,
final TimeValuePair[] timeValuePairs) {
return tryUpdateLastCache(measurements, measurementSchemas, timeValuePairs, false);
}

int invalidateLastCache(final String measurement) {
final TableDeviceLastCache cache = lastCache.get();
final int result = Objects.nonNull(cache) ? cache.invalidate(measurement) : 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.TsPrimitiveType;
import org.apache.tsfile.write.schema.IMeasurementSchema;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -134,25 +135,37 @@ int tryUpdate(
final @Nonnull String[] measurements,
final @Nonnull TimeValuePair[] timeValuePairs,
final boolean invalidateNull) {
return tryUpdate(measurements, null, timeValuePairs, invalidateNull);
}

int tryUpdate(
final @Nonnull String[] measurements,
final @Nullable IMeasurementSchema[] measurementSchemas,
final @Nonnull TimeValuePair[] timeValuePairs,
final boolean invalidateNull) {
final AtomicInteger diff = new AtomicInteger(0);
long lastTime = Long.MIN_VALUE;

for (int i = 0; i < measurements.length; ++i) {
final String measurement = getRawMeasurement(measurements, measurementSchemas, i);
if (Objects.isNull(measurement)) {
continue;
}
if (Objects.isNull(timeValuePairs[i])) {
if (invalidateNull) {
diff.addAndGet(removeKnownNullTime(measurements[i]));
diff.addAndGet(removeKnownNullTime(measurement));
diff.addAndGet(
-((int) RamUsageEstimator.sizeOf(measurements[i])
+ getTvPairEntrySize(measurement2CachedLastMap.remove(measurements[i]))));
-((int) RamUsageEstimator.sizeOf(measurement)
+ getTvPairEntrySize(measurement2CachedLastMap.remove(measurement))));
}
continue;
}

if (isKnownNullAtAlignedTime(measurements[i], timeValuePairs[i])) {
if (isKnownNullAtAlignedTime(measurement, timeValuePairs[i])) {
if (lastTime < timeValuePairs[i].getTimestamp()) {
lastTime = timeValuePairs[i].getTimestamp();
}
diff.addAndGet(tryUpdateKnownNullTime(measurements[i], timeValuePairs[i].getTimestamp()));
diff.addAndGet(tryUpdateKnownNullTime(measurement, timeValuePairs[i].getTimestamp()));
continue;
}

Expand All @@ -161,13 +174,13 @@ int tryUpdate(
lastTime = timeValuePairs[i].getTimestamp();
}
measurement2CachedLastMap.computeIfPresent(
measurements[i],
(measurement, tvPair) -> {
measurement,
(measurementName, tvPair) -> {
if (tvPair.getTimestamp() <= timeValuePairs[finalI].getTimestamp()) {
diff.addAndGet(
getDiffSize(tvPair, timeValuePairs[finalI])
+ clearKnownNullTimeIfCovered(
measurement, timeValuePairs[finalI].getTimestamp()));
measurementName, timeValuePairs[finalI].getTimestamp()));
return timeValuePairs[finalI];
}
return tvPair;
Expand All @@ -183,6 +196,21 @@ int tryUpdate(
return diff.get();
}

@Nullable
private static String getRawMeasurement(
final @Nonnull String[] measurements,
final @Nullable IMeasurementSchema[] measurementSchemas,
final int index) {
if (Objects.isNull(measurements[index])) {
return null;
}
return Objects.nonNull(measurementSchemas)
&& index < measurementSchemas.length
&& Objects.nonNull(measurementSchemas[index])
? measurementSchemas[index].getMeasurementName()
: measurements[index];
}

@GuardedBy("DataRegionInsertLock#writeLock")
int invalidate(final String measurement) {
final AtomicInteger diff = new AtomicInteger();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,11 +273,33 @@ public void updateLastCacheIfExists(
final String[] measurements,
final TimeValuePair[] timeValuePairs,
boolean invalidateNull) {
updateLastCacheIfExists(database, deviceId, measurements, null, timeValuePairs, invalidateNull);
}

public void updateLastCacheIfExists(
final String database,
final IDeviceID deviceId,
final String[] measurements,
final @Nullable IMeasurementSchema[] measurementSchemas,
final TimeValuePair[] timeValuePairs) {
updateLastCacheIfExists(
database, deviceId, measurements, measurementSchemas, timeValuePairs, false);
}

public void updateLastCacheIfExists(
final String database,
final IDeviceID deviceId,
final String[] measurements,
final @Nullable IMeasurementSchema[] measurementSchemas,
final TimeValuePair[] timeValuePairs,
boolean invalidateNull) {
dualKeyCache.update(
new TableId(database, deviceId.getTableName()),
deviceId,
null,
entry -> entry.tryUpdateLastCache(measurements, timeValuePairs, invalidateNull),
entry ->
entry.tryUpdateLastCache(
measurements, measurementSchemas, timeValuePairs, invalidateNull),
false);
}

Expand Down Expand Up @@ -447,7 +469,7 @@ void updateLastCache(
: entry ->
entry.setMeasurementSchema(
database2Use, isAligned, measurements, measurementSchemas)
+ entry.tryUpdateLastCache(measurements, timeValuePairs),
+ entry.tryUpdateLastCache(measurements, measurementSchemas, timeValuePairs),
Objects.isNull(timeValuePairs));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,18 @@ public int update(final String[] measurements, final IMeasurementSchema[] schema
final int length = measurements.length;

for (int i = 0; i < length; ++i) {
final String inputMeasurement = measurements[i];
if (Objects.isNull(inputMeasurement)) {
continue;
}
final IMeasurementSchema schema = i < schemas.length ? schemas[i] : null;
final String measurement =
Objects.nonNull(schema) ? schema.getMeasurementName() : inputMeasurement;
// Skip this to avoid instance creation/gc for writing performance
if (measurements[i] == null
|| schemas[i] == null
|| measurementMap.containsKey(measurements[i])) {
if (schema == null || measurementMap.containsKey(measurement)) {
continue;
}
diff += putEntry(measurements[i], schemas[i], null);
diff += putEntry(measurement, schema, null);
}
return diff;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.commons.schema.template.Template;
import org.apache.iotdb.db.queryengine.common.schematree.ClusterSchemaTree;
import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.SchemaCacheEntry;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceSchemaCache;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TreeDeviceSchemaCacheManager;
Expand Down Expand Up @@ -272,6 +274,43 @@ public void testUpdateLastCache() throws IllegalPathException {
Assert.assertEquals(0, TableDeviceSchemaCache.getInstance().getMemoryUsage());
}

@Test
public void testUpdateLastCacheWithAliasDoesNotCopyMeasurements() throws IllegalPathException {
final String database = "root.db";
final PartialPath device = new PartialPath("root.db.d_alias");
final MeasurementSchema s1 = new MeasurementSchema("s1", TSDataType.INT32);
final MeasurementPath s1Path = new MeasurementPath(device.concatNode("s1"), s1);

treeDeviceSchemaCacheManager.declareLastCache(database, s1Path);

final InsertRowNode insertRowNode =
new InsertRowNode(
new PlanNodeId("testUpdateLastCacheWithAliasDoesNotCopyMeasurements"),
device,
false,
new String[] {"alias"},
new TSDataType[] {TSDataType.INT32},
new MeasurementSchema[] {s1},
1L,
new Object[] {1},
false) {
@Override
public String[] getRawMeasurements() {
throw new AssertionError("Last cache update should not copy raw measurements");
}
};

insertRowNode.updateLastCache(database);

Assert.assertEquals(
new TimeValuePair(1L, new TsPrimitiveType.TsInt(1)),
treeDeviceSchemaCacheManager.getLastCache(
new MeasurementPath(device.getIDeviceID(), "s1")));
Assert.assertNull(
treeDeviceSchemaCacheManager.getLastCache(
new MeasurementPath(device.getIDeviceID(), "alias")));
}

@Test
public void testInvalidateLastCacheByWildcardDevicePath() throws IllegalPathException {
final MeasurementSchema s0 = new MeasurementSchema("s0", TSDataType.INT32);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;

/**
Expand Down Expand Up @@ -120,6 +122,26 @@ public void testInsertRowNode_nullMeasurements_nullSafe() throws IllegalPathExce
assertTrue(node.allMeasurementFailed());
}

@Test
public void testGetRawMeasurementsReusesMeasurementsWhenSchemaNamesMatch()
throws IllegalPathException {
InsertRowNode node = buildInsertRowNode(new String[] {"s0", "s1"});

assertSame(node.getMeasurements(), node.getRawMeasurements());
}

@Test
public void testGetRawMeasurementsCopiesOnlyWhenSchemaNameDiffers() throws IllegalPathException {
InsertRowNode node = buildInsertRowNode(new String[] {"alias", "s1"});
node.getMeasurementSchemas()[0] = new MeasurementSchema("s0", TSDataType.INT32);

String[] rawMeasurements = node.getRawMeasurements();

assertNotSame(node.getMeasurements(), rawMeasurements);
assertArrayEquals(new String[] {"s0", "s1"}, rawMeasurements);
assertArrayEquals(new String[] {"alias", "s1"}, node.getMeasurements());
}

@Test
public void testRelationalInsertRowNode_nonFieldColumnsDoNotComposeLastCacheValue()
throws IllegalPathException {
Expand Down
Loading