From 35aeee3f40c81bffd004ecb1acbb709a37f2026c Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 25 Jun 2026 12:41:55 +0800 Subject: [PATCH 1/5] Fix tag index memory accounting --- .../schemaregion/tag/TagManager.java | 123 +++++++------ .../schemaregion/tag/TagManagerTest.java | 169 ++++++++++++++++++ 2 files changed, 236 insertions(+), 56 deletions(-) create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagManagerTest.java diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagManager.java index 6be09486b4310..42f5956fcdacf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagManager.java @@ -52,7 +52,6 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -166,34 +165,33 @@ public void addIndex(String tagKey, String tagValue, IMeasurementMNode measur return; } - int tagIndexOldSize = tagIndex.size(); - Map>> tagValueMap = - tagIndex.computeIfAbsent(tagKey, k -> new ConcurrentHashMap<>()); - int tagIndexNewSize = tagIndex.size(); - - int tagValueMapOldSize = tagValueMap.size(); - Set> measurementsSet = - tagValueMap.computeIfAbsent(tagValue, v -> Collections.synchronizedSet(new HashSet<>())); - int tagValueMapNewSize = tagValueMap.size(); + tagIndex.compute( + tagKey, + (key, tagValueMap) -> { + long memorySize = 0; + if (tagValueMap == null) { + tagValueMap = new ConcurrentHashMap<>(); + // the last 4 is the memory occupied by the size of tagvaluemap + memorySize += RamUsageEstimator.sizeOf(tagKey) + 4; + } - int measurementsSetOldSize = measurementsSet.size(); - measurementsSet.add(measurementMNode); - int measurementsSetNewSize = measurementsSet.size(); + Set> measurementsSet = tagValueMap.get(tagValue); + if (measurementsSet == null) { + measurementsSet = ConcurrentHashMap.newKeySet(); + tagValueMap.put(tagValue, measurementsSet); + // the last 4 is the memory occupied by the size of measurementsSet + memorySize += RamUsageEstimator.sizeOf(tagValue) + 4; + } - long memorySize = 0; - if (tagIndexNewSize - tagIndexOldSize == 1) { - // the last 4 is the memory occupied by the size of tagvaluemap - memorySize += RamUsageEstimator.sizeOf(tagKey) + 4; - } - if (tagValueMapNewSize - tagValueMapOldSize == 1) { - // the last 4 is the memory occupied by the size of measurementsSet - memorySize += RamUsageEstimator.sizeOf(tagValue) + 4; - } - if (measurementsSetNewSize - measurementsSetOldSize == 1) { - // 8 is the memory occupied by the length of the IMeasurementMNode - memorySize += RamUsageEstimator.NUM_BYTES_OBJECT_REF + 4; - } - requestMemory(memorySize); + if (measurementsSet.add(measurementMNode)) { + // 8 is the memory occupied by the length of the IMeasurementMNode + memorySize += RamUsageEstimator.NUM_BYTES_OBJECT_REF + 4; + } + if (memorySize > 0) { + requestMemory(memorySize); + } + return tagValueMap; + }); } public void addIndex(Map tagsMap, IMeasurementMNode measurementMNode) { @@ -208,32 +206,47 @@ public void removeIndex(String tagKey, String tagValue, IMeasurementMNode mea if (tagKey == null || tagValue == null || measurementMNode == null) { return; } - // init memory size - long memorySize = 0; - if (tagIndex.get(tagKey).get(tagValue).remove(measurementMNode)) { - memorySize += RamUsageEstimator.NUM_BYTES_OBJECT_REF + 4; - } - if (tagIndex.get(tagKey).get(tagValue).isEmpty()) { - if (tagIndex.get(tagKey).remove(tagValue) != null) { - // the last 4 is the memory occupied by the size of IMeasurementMNodeSet - memorySize += RamUsageEstimator.sizeOf(tagValue) + 4; - } - } - if (tagIndex.get(tagKey).isEmpty()) { - if (tagIndex.remove(tagKey) != null) { - // the last 4 is the memory occupied by the size of tagValueMap - memorySize += RamUsageEstimator.sizeOf(tagKey) + 4; - } - } - releaseMemory(memorySize); + tagIndex.computeIfPresent( + tagKey, + (key, tagValueMap) -> { + long memorySize = 0; + Set> measurementsSet = tagValueMap.get(tagValue); + if (measurementsSet == null) { + return tagValueMap; + } + + if (measurementsSet.remove(measurementMNode)) { + memorySize += RamUsageEstimator.NUM_BYTES_OBJECT_REF + 4; + } + if (measurementsSet.isEmpty()) { + if (tagValueMap.remove(tagValue, measurementsSet)) { + // the last 4 is the memory occupied by the size of IMeasurementMNodeSet + memorySize += RamUsageEstimator.sizeOf(tagValue) + 4; + } + } + if (tagValueMap.isEmpty()) { + // the last 4 is the memory occupied by the size of tagValueMap + memorySize += RamUsageEstimator.sizeOf(tagKey) + 4; + if (memorySize > 0) { + releaseMemory(memorySize); + } + return null; + } + if (memorySize > 0) { + releaseMemory(memorySize); + } + return tagValueMap; + }); + } + + private boolean containsIndex(String tagKey, String tagValue) { + Map>> tagValueMap = tagIndex.get(tagKey); + return tagValueMap != null && tagValueMap.containsKey(tagValue); } private List> getMatchedTimeseriesInIndex(TagFilter tagFilter) { - if (!tagIndex.containsKey(tagFilter.getKey())) { - return Collections.emptyList(); - } Map>> value2Node = tagIndex.get(tagFilter.getKey()); - if (value2Node.isEmpty()) { + if (value2Node == null || value2Node.isEmpty()) { return Collections.emptyList(); } @@ -364,8 +377,7 @@ public void removeFromTagInvertedIndex(IMeasurementMNode node) throws IOExcep Map tagMap = tagLogFile.readTag(node.getOffset()); if (tagMap != null) { for (Map.Entry entry : tagMap.entrySet()) { - if (tagIndex.containsKey(entry.getKey()) - && tagIndex.get(entry.getKey()).containsKey(entry.getValue())) { + if (containsIndex(entry.getKey(), entry.getValue())) { if (logger.isDebugEnabled()) { logger.debug( String.format( @@ -418,7 +430,7 @@ public void updateTagsAndAttributes( // we should remove before key-value from inverted index map if (beforeValue != null && !beforeValue.equals(value)) { - if (tagIndex.containsKey(key) && tagIndex.get(key).containsKey(beforeValue)) { + if (containsIndex(key, beforeValue)) { if (logger.isDebugEnabled()) { logger.debug( String.format( @@ -551,8 +563,7 @@ public void dropTagsOrAttributes( if (!deleteTag.isEmpty()) { for (Map.Entry entry : deleteTag.entrySet()) { - if (tagIndex.containsKey((entry.getKey())) - && tagIndex.get(entry.getKey()).containsKey(entry.getValue())) { + if (containsIndex(entry.getKey(), entry.getValue())) { if (logger.isDebugEnabled()) { logger.debug( String.format( @@ -622,7 +633,7 @@ public void setTagsOrAttributesValue( String beforeValue = entry.getValue(); String currentValue = newTagValue.get(key); // change the tag inverted index map - if (tagIndex.containsKey(key) && tagIndex.get(key).containsKey(beforeValue)) { + if (containsIndex(key, beforeValue)) { if (logger.isDebugEnabled()) { logger.debug( @@ -680,7 +691,7 @@ public void renameTagOrAttributeKey( // persist the change to disk tagLogFile.write(pair.left, pair.right, leafMNode.getOffset()); // change the tag inverted index map - if (tagIndex.containsKey(oldKey) && tagIndex.get(oldKey).containsKey(value)) { + if (containsIndex(oldKey, value)) { if (logger.isDebugEnabled()) { logger.debug( diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagManagerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagManagerTest.java new file mode 100644 index 0000000000000..a63a00ae3b4bb --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagManagerTest.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.schemaengine.schemaregion.tag; + +import org.apache.iotdb.commons.schema.node.role.IMeasurementMNode; +import org.apache.iotdb.commons.schema.node.utils.IMNodeFactory; +import org.apache.iotdb.db.schemaengine.rescon.MemSchemaEngineStatistics; +import org.apache.iotdb.db.schemaengine.rescon.MemSchemaRegionStatistics; +import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.IMemMNode; +import org.apache.iotdb.db.schemaengine.schemaregion.mtree.loader.MNodeFactoryLoader; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.external.commons.io.FileUtils; +import org.apache.tsfile.file.metadata.enums.CompressionType; +import org.apache.tsfile.file.metadata.enums.TSEncoding; +import org.apache.tsfile.utils.RamUsageEstimator; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +public class TagManagerTest { + + private File tempDir; + private MemSchemaRegionStatistics regionStatistics; + private TagManager tagManager; + + @After + public void tearDown() throws Exception { + if (tagManager != null) { + tagManager.clear(); + } + if (regionStatistics != null) { + regionStatistics.clear(); + } + if (tempDir != null) { + FileUtils.deleteDirectory(tempDir); + } + } + + @Test + public void removeIndexIgnoresMissingEntriesAndReleasesOnlyExistingMemory() throws Exception { + initTagManager(); + final IMeasurementMNode node = newMeasurementMNode("s0"); + + tagManager.removeIndex("missingKey", "missingValue", node); + Assert.assertEquals(0, regionStatistics.getRegionMemoryUsage()); + + tagManager.addIndex("key", "value", node); + final long expectedMemory = indexMemory("key", "value", 1); + Assert.assertEquals(expectedMemory, regionStatistics.getRegionMemoryUsage()); + + tagManager.removeIndex("key", "missingValue", node); + Assert.assertEquals(expectedMemory, regionStatistics.getRegionMemoryUsage()); + + tagManager.removeIndex("key", "value", newMeasurementMNode("other")); + Assert.assertEquals(expectedMemory, regionStatistics.getRegionMemoryUsage()); + + tagManager.removeIndex("key", "value", node); + Assert.assertEquals(0, regionStatistics.getRegionMemoryUsage()); + + tagManager.removeIndex("key", "value", node); + Assert.assertEquals(0, regionStatistics.getRegionMemoryUsage()); + } + + @Test + public void concurrentAddIndexRequestsMemoryForActualInsertionsOnly() throws Exception { + initTagManager(); + final String tagKey = "key"; + final String tagValue = "value"; + final int measurementCount = 128; + final List> nodes = new ArrayList<>(); + for (int i = 0; i < measurementCount; i++) { + nodes.add(newMeasurementMNode("s" + i)); + } + + final int workerCount = 16; + final ExecutorService executorService = Executors.newFixedThreadPool(workerCount); + final CountDownLatch readyLatch = new CountDownLatch(workerCount); + final CountDownLatch startLatch = new CountDownLatch(1); + final List> futures = new ArrayList<>(); + for (final IMeasurementMNode node : nodes) { + futures.add( + executorService.submit( + () -> { + readyLatch.countDown(); + startLatch.await(); + tagManager.addIndex(tagKey, tagValue, node); + return null; + })); + } + + try { + Assert.assertTrue(readyLatch.await(10, TimeUnit.SECONDS)); + startLatch.countDown(); + for (final Future future : futures) { + future.get(10, TimeUnit.SECONDS); + } + } finally { + executorService.shutdownNow(); + } + Assert.assertTrue(executorService.awaitTermination(10, TimeUnit.SECONDS)); + + final long expectedMemory = indexMemory(tagKey, tagValue, measurementCount); + Assert.assertEquals(expectedMemory, regionStatistics.getRegionMemoryUsage()); + + for (final IMeasurementMNode node : nodes) { + tagManager.addIndex(tagKey, tagValue, node); + } + Assert.assertEquals(expectedMemory, regionStatistics.getRegionMemoryUsage()); + + for (final IMeasurementMNode node : nodes) { + tagManager.removeIndex(tagKey, tagValue, node); + } + Assert.assertEquals(0, regionStatistics.getRegionMemoryUsage()); + } + + private void initTagManager() throws Exception { + tempDir = Files.createTempDirectory("tag-manager").toFile(); + regionStatistics = new MemSchemaRegionStatistics(0, new MemSchemaEngineStatistics()); + tagManager = new TagManager(tempDir.getAbsolutePath(), regionStatistics); + } + + private static IMeasurementMNode newMeasurementMNode(final String measurement) { + final IMNodeFactory nodeFactory = + MNodeFactoryLoader.getInstance().getMemMNodeIMNodeFactory(); + return nodeFactory.createMeasurementMNode( + null, + measurement, + new MeasurementSchema( + measurement, TSDataType.INT64, TSEncoding.PLAIN, CompressionType.SNAPPY), + null); + } + + private static long indexMemory( + final String tagKey, final String tagValue, final int measurementCount) { + return RamUsageEstimator.sizeOf(tagKey) + + 4 + + RamUsageEstimator.sizeOf(tagValue) + + 4 + + (RamUsageEstimator.NUM_BYTES_OBJECT_REF + 4) * measurementCount; + } +} From 848b26314e7c8808eef107dae03751cb876f7ee8 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 25 Jun 2026 14:44:30 +0800 Subject: [PATCH 2/5] Add concurrent tag index memory test --- .../schemaregion/tag/TagManagerTest.java | 47 +++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagManagerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagManagerTest.java index a63a00ae3b4bb..c96495c635c15 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagManagerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagManagerTest.java @@ -141,6 +141,53 @@ public void concurrentAddIndexRequestsMemoryForActualInsertionsOnly() throws Exc Assert.assertEquals(0, regionStatistics.getRegionMemoryUsage()); } + @Test + public void concurrentAddAndRemoveIndexEventuallyReleasesAllMemory() throws Exception { + initTagManager(); + final String tagKey = "key"; + final String tagValue = "value"; + final IMeasurementMNode node = newMeasurementMNode("s0"); + + final int workerCount = 16; + final int roundCount = 1000; + final ExecutorService executorService = Executors.newFixedThreadPool(workerCount); + final CountDownLatch readyLatch = new CountDownLatch(workerCount); + final CountDownLatch startLatch = new CountDownLatch(1); + final List> futures = new ArrayList<>(); + for (int i = 0; i < workerCount; i++) { + futures.add( + executorService.submit( + () -> { + readyLatch.countDown(); + startLatch.await(); + for (int round = 0; round < roundCount; round++) { + tagManager.addIndex(tagKey, tagValue, node); + tagManager.removeIndex(tagKey, tagValue, node); + } + return null; + })); + } + + try { + Assert.assertTrue(readyLatch.await(10, TimeUnit.SECONDS)); + startLatch.countDown(); + for (final Future future : futures) { + future.get(10, TimeUnit.SECONDS); + } + } finally { + executorService.shutdownNow(); + } + Assert.assertTrue(executorService.awaitTermination(10, TimeUnit.SECONDS)); + + Assert.assertEquals(0, regionStatistics.getRegionMemoryUsage()); + + tagManager.addIndex(tagKey, tagValue, node); + Assert.assertEquals(indexMemory(tagKey, tagValue, 1), regionStatistics.getRegionMemoryUsage()); + + tagManager.removeIndex(tagKey, tagValue, node); + Assert.assertEquals(0, regionStatistics.getRegionMemoryUsage()); + } + private void initTagManager() throws Exception { tempDir = Files.createTempDirectory("tag-manager").toFile(); regionStatistics = new MemSchemaRegionStatistics(0, new MemSchemaEngineStatistics()); From ee56b901e8cfd82f56a0faa1f21c911ca43b90b5 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 25 Jun 2026 16:12:54 +0800 Subject: [PATCH 3/5] Clarify tag index size accounting --- .../schemaregion/tag/TagManager.java | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagManager.java index 42f5956fcdacf..572609b4b2e12 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagManager.java @@ -70,6 +70,7 @@ public class TagManager { private static final String PREVIOUS_CONDITION = "before deleting it, tag key is %s, tag value is %s, tlog offset is %d, contains key %b"; + private static final long COLLECTION_SIZE_FIELD_BYTES = Integer.BYTES; private static final Logger logger = LoggerFactory.getLogger(TagManager.class); private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig(); @@ -171,21 +172,18 @@ public void addIndex(String tagKey, String tagValue, IMeasurementMNode measur long memorySize = 0; if (tagValueMap == null) { tagValueMap = new ConcurrentHashMap<>(); - // the last 4 is the memory occupied by the size of tagvaluemap - memorySize += RamUsageEstimator.sizeOf(tagKey) + 4; + memorySize += RamUsageEstimator.sizeOf(tagKey) + COLLECTION_SIZE_FIELD_BYTES; } Set> measurementsSet = tagValueMap.get(tagValue); if (measurementsSet == null) { measurementsSet = ConcurrentHashMap.newKeySet(); tagValueMap.put(tagValue, measurementsSet); - // the last 4 is the memory occupied by the size of measurementsSet - memorySize += RamUsageEstimator.sizeOf(tagValue) + 4; + memorySize += RamUsageEstimator.sizeOf(tagValue) + COLLECTION_SIZE_FIELD_BYTES; } if (measurementsSet.add(measurementMNode)) { - // 8 is the memory occupied by the length of the IMeasurementMNode - memorySize += RamUsageEstimator.NUM_BYTES_OBJECT_REF + 4; + memorySize += RamUsageEstimator.NUM_BYTES_OBJECT_REF + COLLECTION_SIZE_FIELD_BYTES; } if (memorySize > 0) { requestMemory(memorySize); @@ -216,17 +214,15 @@ public void removeIndex(String tagKey, String tagValue, IMeasurementMNode mea } if (measurementsSet.remove(measurementMNode)) { - memorySize += RamUsageEstimator.NUM_BYTES_OBJECT_REF + 4; + memorySize += RamUsageEstimator.NUM_BYTES_OBJECT_REF + COLLECTION_SIZE_FIELD_BYTES; } if (measurementsSet.isEmpty()) { if (tagValueMap.remove(tagValue, measurementsSet)) { - // the last 4 is the memory occupied by the size of IMeasurementMNodeSet - memorySize += RamUsageEstimator.sizeOf(tagValue) + 4; + memorySize += RamUsageEstimator.sizeOf(tagValue) + COLLECTION_SIZE_FIELD_BYTES; } } if (tagValueMap.isEmpty()) { - // the last 4 is the memory occupied by the size of tagValueMap - memorySize += RamUsageEstimator.sizeOf(tagKey) + 4; + memorySize += RamUsageEstimator.sizeOf(tagKey) + COLLECTION_SIZE_FIELD_BYTES; if (memorySize > 0) { releaseMemory(memorySize); } From 28a97160245bf6350264f44eaf08d9f0d10b426d Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 25 Jun 2026 16:30:22 +0800 Subject: [PATCH 4/5] Clarify tag index overhead estimate --- .../schemaregion/tag/TagManager.java | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagManager.java index 572609b4b2e12..d8a92e152468e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagManager.java @@ -70,7 +70,10 @@ public class TagManager { private static final String PREVIOUS_CONDITION = "before deleting it, tag key is %s, tag value is %s, tlog offset is %d, contains key %b"; - private static final long COLLECTION_SIZE_FIELD_BYTES = Integer.BYTES; + // The tag index memory model adds one int-sized estimated overhead for each indexed key, value, + // and measurement reference. This is an accounting estimate rather than a specific + // ConcurrentHashMap or Set field. + private static final long INDEX_ENTRY_OVERHEAD_ESTIMATE_BYTES = Integer.BYTES; private static final Logger logger = LoggerFactory.getLogger(TagManager.class); private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig(); @@ -172,18 +175,19 @@ public void addIndex(String tagKey, String tagValue, IMeasurementMNode measur long memorySize = 0; if (tagValueMap == null) { tagValueMap = new ConcurrentHashMap<>(); - memorySize += RamUsageEstimator.sizeOf(tagKey) + COLLECTION_SIZE_FIELD_BYTES; + memorySize += RamUsageEstimator.sizeOf(tagKey) + INDEX_ENTRY_OVERHEAD_ESTIMATE_BYTES; } Set> measurementsSet = tagValueMap.get(tagValue); if (measurementsSet == null) { measurementsSet = ConcurrentHashMap.newKeySet(); tagValueMap.put(tagValue, measurementsSet); - memorySize += RamUsageEstimator.sizeOf(tagValue) + COLLECTION_SIZE_FIELD_BYTES; + memorySize += RamUsageEstimator.sizeOf(tagValue) + INDEX_ENTRY_OVERHEAD_ESTIMATE_BYTES; } if (measurementsSet.add(measurementMNode)) { - memorySize += RamUsageEstimator.NUM_BYTES_OBJECT_REF + COLLECTION_SIZE_FIELD_BYTES; + memorySize += + RamUsageEstimator.NUM_BYTES_OBJECT_REF + INDEX_ENTRY_OVERHEAD_ESTIMATE_BYTES; } if (memorySize > 0) { requestMemory(memorySize); @@ -214,15 +218,17 @@ public void removeIndex(String tagKey, String tagValue, IMeasurementMNode mea } if (measurementsSet.remove(measurementMNode)) { - memorySize += RamUsageEstimator.NUM_BYTES_OBJECT_REF + COLLECTION_SIZE_FIELD_BYTES; + memorySize += + RamUsageEstimator.NUM_BYTES_OBJECT_REF + INDEX_ENTRY_OVERHEAD_ESTIMATE_BYTES; } if (measurementsSet.isEmpty()) { if (tagValueMap.remove(tagValue, measurementsSet)) { - memorySize += RamUsageEstimator.sizeOf(tagValue) + COLLECTION_SIZE_FIELD_BYTES; + memorySize += + RamUsageEstimator.sizeOf(tagValue) + INDEX_ENTRY_OVERHEAD_ESTIMATE_BYTES; } } if (tagValueMap.isEmpty()) { - memorySize += RamUsageEstimator.sizeOf(tagKey) + COLLECTION_SIZE_FIELD_BYTES; + memorySize += RamUsageEstimator.sizeOf(tagKey) + INDEX_ENTRY_OVERHEAD_ESTIMATE_BYTES; if (memorySize > 0) { releaseMemory(memorySize); } From 746f34df1258c2338039ae74eb0310ef9d74b933 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Fri, 26 Jun 2026 11:09:30 +0800 Subject: [PATCH 5/5] Fix pipe source time range IT flakiness --- .../treemodel/auto/basic/IoTDBPipeSourceIT.java | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java index d93cb5e42d461..0536d513d3301 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java @@ -50,6 +50,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import static org.junit.Assert.fail; @@ -771,6 +772,11 @@ public void testExtractorTimeRangeMatch() throws Exception { final String receiverIp = receiverDataNode.getIp(); final int receiverPort = receiverDataNode.getPort(); + final Consumer handleFailure = + o -> { + TestUtils.executeNonQueryWithRetry(senderEnv, "flush"); + TestUtils.executeNonQueryWithRetry(receiverEnv, "flush"); + }; try (final SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { @@ -810,7 +816,8 @@ public void testExtractorTimeRangeMatch() throws Exception { receiverEnv, "select count(*) from root.db.**", "count(root.db.d1.at1),", - Collections.singleton("3,")); + Collections.singleton("3,"), + handleFailure); // Insert realtime data that overlapped with time range TestUtils.executeNonQueries( @@ -825,7 +832,8 @@ public void testExtractorTimeRangeMatch() throws Exception { receiverEnv, "select count(*) from root.db.**", "count(root.db.d1.at1),count(root.db.d3.at1),", - Collections.singleton("3,3,")); + Collections.singleton("3,3,"), + handleFailure); // Insert realtime data that does not overlap with time range TestUtils.executeNonQueries( @@ -840,7 +848,9 @@ public void testExtractorTimeRangeMatch() throws Exception { receiverEnv, "select count(*) from root.db.**", "count(root.db.d1.at1),count(root.db.d3.at1),", - Collections.singleton("3,3,")); + Collections.singleton("3,3,"), + 600, + handleFailure); } }