diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java index 0855dbeb63a04..9c8e369188357 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java @@ -478,15 +478,19 @@ public List getChildren() { public String[] getRawMeasurements() { String[] measurements = getMeasurements(); MeasurementSchema[] measurementSchemas = getMeasurementSchemas(); - String[] rawMeasurements = new String[measurements.length]; + String[] rawMeasurements = measurements; for (int i = 0; i < measurements.length; i++) { if (measurementSchemas != null && i < measurementSchemas.length && measurementSchemas[i] != null) { // get raw measurement rather than alias - rawMeasurements[i] = measurementSchemas[i].getMeasurementName(); - } else { - rawMeasurements[i] = measurements[i]; + String rawMeasurement = measurementSchemas[i].getMeasurementName(); + if (!Objects.equals(rawMeasurement, measurements[i])) { + if (rawMeasurements == measurements) { + rawMeasurements = Arrays.copyOf(measurements, measurements.length); + } + rawMeasurements[i] = rawMeasurement; + } } } return rawMeasurements; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java index 73463864c0660..04768c57b502f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java @@ -1026,16 +1026,15 @@ private boolean canComposeTimeValuePair(final int columnIndex) { } public void updateLastCache(String databaseName) { - String[] rawMeasurements = getRawMeasurements(); - TimeValuePair[] timeValuePairs = new TimeValuePair[rawMeasurements.length]; - for (int i = 0; i < rawMeasurements.length; i++) { + TimeValuePair[] timeValuePairs = new TimeValuePair[measurements.length]; + for (int i = 0; i < measurements.length; i++) { timeValuePairs[i] = composeTimeValuePair(i); } TreeDeviceSchemaCacheManager.getInstance() .updateLastCacheIfExists( databaseName, getDeviceID(), - rawMeasurements, + measurements, timeValuePairs, isAligned, measurementSchemas); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java index becd347cfee8b..976223bf2cf10 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java @@ -1570,16 +1570,15 @@ public void updateLastCache(final String databaseName) { } public void updateLastCache(final String databaseName, final TSStatus[] results) { - final String[] rawMeasurements = getRawMeasurements(); - final TimeValuePair[] timeValuePairs = new TimeValuePair[rawMeasurements.length]; - for (int i = 0; i < rawMeasurements.length; i++) { + final TimeValuePair[] timeValuePairs = new TimeValuePair[measurements.length]; + for (int i = 0; i < measurements.length; i++) { timeValuePairs[i] = composeLastTimeValuePair(i, results, 0, rowCount); } TreeDeviceSchemaCacheManager.getInstance() .updateLastCacheIfExists( databaseName, getDeviceID(), - rawMeasurements, + measurements, timeValuePairs, isAligned, measurementSchemas); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java index d6ae162c6d04f..b11c0f6784e3d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java @@ -281,13 +281,13 @@ protected PartialPath readTargetPath(DataInputStream stream) @Override public void updateLastCache(String databaseName) { - String[] rawMeasurements = getRawMeasurements(); - TimeValuePair[] timeValuePairs = new TimeValuePair[rawMeasurements.length]; - for (int i = 0; i < rawMeasurements.length; i++) { + TimeValuePair[] timeValuePairs = new TimeValuePair[measurements.length]; + for (int i = 0; i < measurements.length; i++) { timeValuePairs[i] = composeTimeValuePair(i); } TableDeviceSchemaCache.getInstance() - .updateLastCacheIfExists(databaseName, getDeviceID(), rawMeasurements, timeValuePairs); + .updateLastCacheIfExists( + databaseName, getDeviceID(), measurements, measurementSchemas, timeValuePairs); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java index b842cfc5db718..d41b078cb9b83 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java @@ -444,20 +444,19 @@ public void updateLastCache(final String databaseName) { } public void updateLastCache(final String databaseName, final TSStatus[] results) { - final String[] rawMeasurements = getRawMeasurements(); - final List> deviceEndOffsetPairs = splitByDevice(0, rowCount); int startOffset = 0; for (final Pair deviceEndOffsetPair : deviceEndOffsetPairs) { final IDeviceID deviceID = deviceEndOffsetPair.getLeft(); final int endOffset = deviceEndOffsetPair.getRight(); - final TimeValuePair[] timeValuePairs = new TimeValuePair[rawMeasurements.length]; - for (int i = 0; i < rawMeasurements.length; i++) { + final TimeValuePair[] timeValuePairs = new TimeValuePair[measurements.length]; + for (int i = 0; i < measurements.length; i++) { timeValuePairs[i] = composeLastTimeValuePair(i, results, startOffset, endOffset); } TableDeviceSchemaCache.getInstance() - .updateLastCacheIfExists(databaseName, deviceID, rawMeasurements, timeValuePairs); + .updateLastCacheIfExists( + databaseName, deviceID, measurements, measurementSchemas, timeValuePairs); startOffset = endOffset; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceCacheEntry.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceCacheEntry.java index 4f151d15eaeb6..5ea51a5809f7c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceCacheEntry.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceCacheEntry.java @@ -30,6 +30,7 @@ import org.apache.tsfile.write.schema.IMeasurementSchema; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import java.util.Collections; @@ -186,9 +187,19 @@ int initOrInvalidateLastCache( int tryUpdateLastCache( final String[] measurements, final TimeValuePair[] timeValuePairs, boolean invalidateNull) { + return tryUpdateLastCache(measurements, null, timeValuePairs, invalidateNull); + } + + int tryUpdateLastCache( + final String[] measurements, + final @Nullable IMeasurementSchema[] measurementSchemas, + final TimeValuePair[] timeValuePairs, + boolean invalidateNull) { final TableDeviceLastCache cache = lastCache.get(); final int result = - Objects.nonNull(cache) ? cache.tryUpdate(measurements, timeValuePairs, invalidateNull) : 0; + Objects.nonNull(cache) + ? cache.tryUpdate(measurements, measurementSchemas, timeValuePairs, invalidateNull) + : 0; return Objects.nonNull(lastCache.get()) ? result : 0; } @@ -196,6 +207,13 @@ int tryUpdateLastCache(final String[] measurements, final TimeValuePair[] timeVa return tryUpdateLastCache(measurements, timeValuePairs, false); } + int tryUpdateLastCache( + final String[] measurements, + final @Nullable IMeasurementSchema[] measurementSchemas, + final TimeValuePair[] timeValuePairs) { + return tryUpdateLastCache(measurements, measurementSchemas, timeValuePairs, false); + } + int invalidateLastCache(final String measurement) { final TableDeviceLastCache cache = lastCache.get(); final int result = Objects.nonNull(cache) ? cache.invalidate(measurement) : 0; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceLastCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceLastCache.java index 271051b72b831..325484f6f6819 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceLastCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceLastCache.java @@ -25,6 +25,7 @@ import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.TsPrimitiveType; +import org.apache.tsfile.write.schema.IMeasurementSchema; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -134,25 +135,37 @@ int tryUpdate( final @Nonnull String[] measurements, final @Nonnull TimeValuePair[] timeValuePairs, final boolean invalidateNull) { + return tryUpdate(measurements, null, timeValuePairs, invalidateNull); + } + + int tryUpdate( + final @Nonnull String[] measurements, + final @Nullable IMeasurementSchema[] measurementSchemas, + final @Nonnull TimeValuePair[] timeValuePairs, + final boolean invalidateNull) { final AtomicInteger diff = new AtomicInteger(0); long lastTime = Long.MIN_VALUE; for (int i = 0; i < measurements.length; ++i) { + final String measurement = getRawMeasurement(measurements, measurementSchemas, i); + if (Objects.isNull(measurement)) { + continue; + } if (Objects.isNull(timeValuePairs[i])) { if (invalidateNull) { - diff.addAndGet(removeKnownNullTime(measurements[i])); + diff.addAndGet(removeKnownNullTime(measurement)); diff.addAndGet( - -((int) RamUsageEstimator.sizeOf(measurements[i]) - + getTvPairEntrySize(measurement2CachedLastMap.remove(measurements[i])))); + -((int) RamUsageEstimator.sizeOf(measurement) + + getTvPairEntrySize(measurement2CachedLastMap.remove(measurement)))); } continue; } - if (isKnownNullAtAlignedTime(measurements[i], timeValuePairs[i])) { + if (isKnownNullAtAlignedTime(measurement, timeValuePairs[i])) { if (lastTime < timeValuePairs[i].getTimestamp()) { lastTime = timeValuePairs[i].getTimestamp(); } - diff.addAndGet(tryUpdateKnownNullTime(measurements[i], timeValuePairs[i].getTimestamp())); + diff.addAndGet(tryUpdateKnownNullTime(measurement, timeValuePairs[i].getTimestamp())); continue; } @@ -161,13 +174,13 @@ int tryUpdate( lastTime = timeValuePairs[i].getTimestamp(); } measurement2CachedLastMap.computeIfPresent( - measurements[i], - (measurement, tvPair) -> { + measurement, + (measurementName, tvPair) -> { if (tvPair.getTimestamp() <= timeValuePairs[finalI].getTimestamp()) { diff.addAndGet( getDiffSize(tvPair, timeValuePairs[finalI]) + clearKnownNullTimeIfCovered( - measurement, timeValuePairs[finalI].getTimestamp())); + measurementName, timeValuePairs[finalI].getTimestamp())); return timeValuePairs[finalI]; } return tvPair; @@ -183,6 +196,21 @@ int tryUpdate( return diff.get(); } + @Nullable + private static String getRawMeasurement( + final @Nonnull String[] measurements, + final @Nullable IMeasurementSchema[] measurementSchemas, + final int index) { + if (Objects.isNull(measurements[index])) { + return null; + } + return Objects.nonNull(measurementSchemas) + && index < measurementSchemas.length + && Objects.nonNull(measurementSchemas[index]) + ? measurementSchemas[index].getMeasurementName() + : measurements[index]; + } + @GuardedBy("DataRegionInsertLock#writeLock") int invalidate(final String measurement) { final AtomicInteger diff = new AtomicInteger(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java index 8e40298e115bd..a328ffa221deb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java @@ -273,11 +273,33 @@ public void updateLastCacheIfExists( final String[] measurements, final TimeValuePair[] timeValuePairs, boolean invalidateNull) { + updateLastCacheIfExists(database, deviceId, measurements, null, timeValuePairs, invalidateNull); + } + + public void updateLastCacheIfExists( + final String database, + final IDeviceID deviceId, + final String[] measurements, + final @Nullable IMeasurementSchema[] measurementSchemas, + final TimeValuePair[] timeValuePairs) { + updateLastCacheIfExists( + database, deviceId, measurements, measurementSchemas, timeValuePairs, false); + } + + public void updateLastCacheIfExists( + final String database, + final IDeviceID deviceId, + final String[] measurements, + final @Nullable IMeasurementSchema[] measurementSchemas, + final TimeValuePair[] timeValuePairs, + boolean invalidateNull) { dualKeyCache.update( new TableId(database, deviceId.getTableName()), deviceId, null, - entry -> entry.tryUpdateLastCache(measurements, timeValuePairs, invalidateNull), + entry -> + entry.tryUpdateLastCache( + measurements, measurementSchemas, timeValuePairs, invalidateNull), false); } @@ -447,7 +469,7 @@ void updateLastCache( : entry -> entry.setMeasurementSchema( database2Use, isAligned, measurements, measurementSchemas) - + entry.tryUpdateLastCache(measurements, timeValuePairs), + + entry.tryUpdateLastCache(measurements, measurementSchemas, timeValuePairs), Objects.isNull(timeValuePairs)); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TreeDeviceNormalSchema.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TreeDeviceNormalSchema.java index a030946034a08..b9337974a6722 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TreeDeviceNormalSchema.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TreeDeviceNormalSchema.java @@ -66,13 +66,18 @@ public int update(final String[] measurements, final IMeasurementSchema[] schema final int length = measurements.length; for (int i = 0; i < length; ++i) { + final String inputMeasurement = measurements[i]; + if (Objects.isNull(inputMeasurement)) { + continue; + } + final IMeasurementSchema schema = i < schemas.length ? schemas[i] : null; + final String measurement = + Objects.nonNull(schema) ? schema.getMeasurementName() : inputMeasurement; // Skip this to avoid instance creation/gc for writing performance - if (measurements[i] == null - || schemas[i] == null - || measurementMap.containsKey(measurements[i])) { + if (schema == null || measurementMap.containsKey(measurement)) { continue; } - diff += putEntry(measurements[i], schemas[i], null); + diff += putEntry(measurement, schema, null); } return diff; } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/TreeDeviceSchemaCacheManagerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/TreeDeviceSchemaCacheManagerTest.java index cd5ca74661e06..f9188aea9afba 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/TreeDeviceSchemaCacheManagerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/TreeDeviceSchemaCacheManagerTest.java @@ -22,9 +22,11 @@ import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.commons.schema.template.Template; import org.apache.iotdb.db.queryengine.common.schematree.ClusterSchemaTree; import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.SchemaCacheEntry; import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceSchemaCache; import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TreeDeviceSchemaCacheManager; @@ -272,6 +274,43 @@ public void testUpdateLastCache() throws IllegalPathException { Assert.assertEquals(0, TableDeviceSchemaCache.getInstance().getMemoryUsage()); } + @Test + public void testUpdateLastCacheWithAliasDoesNotCopyMeasurements() throws IllegalPathException { + final String database = "root.db"; + final PartialPath device = new PartialPath("root.db.d_alias"); + final MeasurementSchema s1 = new MeasurementSchema("s1", TSDataType.INT32); + final MeasurementPath s1Path = new MeasurementPath(device.concatNode("s1"), s1); + + treeDeviceSchemaCacheManager.declareLastCache(database, s1Path); + + final InsertRowNode insertRowNode = + new InsertRowNode( + new PlanNodeId("testUpdateLastCacheWithAliasDoesNotCopyMeasurements"), + device, + false, + new String[] {"alias"}, + new TSDataType[] {TSDataType.INT32}, + new MeasurementSchema[] {s1}, + 1L, + new Object[] {1}, + false) { + @Override + public String[] getRawMeasurements() { + throw new AssertionError("Last cache update should not copy raw measurements"); + } + }; + + insertRowNode.updateLastCache(database); + + Assert.assertEquals( + new TimeValuePair(1L, new TsPrimitiveType.TsInt(1)), + treeDeviceSchemaCacheManager.getLastCache( + new MeasurementPath(device.getIDeviceID(), "s1"))); + Assert.assertNull( + treeDeviceSchemaCacheManager.getLastCache( + new MeasurementPath(device.getIDeviceID(), "alias"))); + } + @Test public void testInvalidateLastCacheByWildcardDevicePath() throws IllegalPathException { final MeasurementSchema s0 = new MeasurementSchema("s0", TSDataType.INT32); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNodeIsMeasurementFailedTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNodeIsMeasurementFailedTest.java index 082156e134efc..6acc174a2b30f 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNodeIsMeasurementFailedTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNodeIsMeasurementFailedTest.java @@ -35,7 +35,9 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; /** @@ -120,6 +122,26 @@ public void testInsertRowNode_nullMeasurements_nullSafe() throws IllegalPathExce assertTrue(node.allMeasurementFailed()); } + @Test + public void testGetRawMeasurementsReusesMeasurementsWhenSchemaNamesMatch() + throws IllegalPathException { + InsertRowNode node = buildInsertRowNode(new String[] {"s0", "s1"}); + + assertSame(node.getMeasurements(), node.getRawMeasurements()); + } + + @Test + public void testGetRawMeasurementsCopiesOnlyWhenSchemaNameDiffers() throws IllegalPathException { + InsertRowNode node = buildInsertRowNode(new String[] {"alias", "s1"}); + node.getMeasurementSchemas()[0] = new MeasurementSchema("s0", TSDataType.INT32); + + String[] rawMeasurements = node.getRawMeasurements(); + + assertNotSame(node.getMeasurements(), rawMeasurements); + assertArrayEquals(new String[] {"s0", "s1"}, rawMeasurements); + assertArrayEquals(new String[] {"alias", "s1"}, node.getMeasurements()); + } + @Test public void testRelationalInsertRowNode_nonFieldColumnsDoNotComposeLastCacheValue() throws IllegalPathException {