From f6f6a8c4a6373d30f7ee8a679e87c9105160748a Mon Sep 17 00:00:00 2001 From: Weihao Li <18110526956@163.com> Date: Mon, 29 Jun 2026 12:34:35 +0800 Subject: [PATCH 1/5] fix some Signed-off-by: Weihao Li <18110526956@163.com> --- .../fragment/FragmentInstanceContext.java | 5 + .../plan/planner/LocalExecutionPlanner.java | 91 ++++++++++++++----- ...NotThreadSafeMemoryReservationManager.java | 43 ++++++++- .../ThreadSafeMemoryReservationManager.java | 5 + ...alExecutionPlannerOperatorsMemoryTest.java | 85 +++++++++++++++++ 5 files changed, 203 insertions(+), 26 deletions(-) create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlannerOperatorsMemoryTest.java diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java index 81dd89387bf64..4cf29a80dda44 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java @@ -45,6 +45,7 @@ import org.apache.iotdb.db.queryengine.metric.QueryRelatedResourceMetricSet; import org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet; import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet; +import org.apache.iotdb.db.queryengine.plan.planner.memory.NotThreadSafeMemoryReservationManager; import org.apache.iotdb.db.queryengine.plan.planner.memory.ThreadSafeMemoryReservationManager; import org.apache.iotdb.db.queryengine.plan.planner.plan.TimePredicate; import org.apache.iotdb.db.queryengine.plan.relational.function.tvf.read_tsfile.ExternalTsFileQueryDataSource; @@ -1365,6 +1366,10 @@ public boolean isHighestPriority() { public void setHighestPriority(boolean highestPriority) { this.highestPriority = highestPriority; + if (memoryReservationManager instanceof NotThreadSafeMemoryReservationManager) { + ((NotThreadSafeMemoryReservationManager) memoryReservationManager) + .setHighestPriority(highestPriority); + } } public boolean isSingleSourcePath() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java index 95f28052c1fd5..470aa4257a74e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java @@ -27,6 +27,7 @@ import org.apache.iotdb.commons.path.IFullPath; import org.apache.iotdb.commons.queryengine.common.SqlDialect; import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.conf.DataNodeMemoryConfig; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -118,7 +119,7 @@ public List plan( context.invalidateParentPlanNodeIdToMemoryEstimator(); // check whether current free memory is enough to execute current query - long estimatedMemorySize = checkMemory(memoryEstimator, instanceContext.getStateMachine()); + long estimatedMemorySize = checkMemory(memoryEstimator, instanceContext); context.addPipelineDriverFactory(root, context.getDriverContext(), estimatedMemorySize); @@ -157,7 +158,7 @@ public List plan( context.invalidateParentPlanNodeIdToMemoryEstimator(); // check whether current free memory is enough to execute current query - checkMemory(memoryEstimator, instanceContext.getStateMachine()); + checkMemory(memoryEstimator, instanceContext); context.addPipelineDriverFactory(root, context.getDriverContext(), 0); @@ -193,7 +194,7 @@ private Operator generateOperator( } private long checkMemory( - final PipelineMemoryEstimator memoryEstimator, FragmentInstanceStateMachine stateMachine) + final PipelineMemoryEstimator memoryEstimator, FragmentInstanceContext instanceContext) throws MemoryNotEnoughException { // if it is disabled, just return @@ -206,14 +207,75 @@ private long checkMemory( QueryRelatedResourceMetricSet.getInstance().updateEstimatedMemory(estimatedMemorySize); - if (OPERATORS_MEMORY_BLOCK.allocate(estimatedMemorySize)) { + if (instanceContext.isHighestPriority()) { + return 0L; + } + + long reservedBytes = allocateOperatorsMemory(estimatedMemorySize); + if (reservedBytes < 0) { + throw new MemoryNotEnoughException( + String.format( + "There is not enough memory to execute current fragment instance, " + + "current remaining free memory is %dB, " + + "estimated memory usage for current fragment instance is %dB", + OPERATORS_MEMORY_BLOCK.getFreeMemoryInBytes(), estimatedMemorySize)); + } + FragmentInstanceStateMachine stateMachine = instanceContext.getStateMachine(); + if (reservedBytes > 0) { + stateMachine.addStateChangeListener( + newState -> { + if (newState.isDone()) { + try (SetThreadName fragmentInstanceName = + new SetThreadName(stateMachine.getFragmentInstanceId().getFullId())) { + OPERATORS_MEMORY_BLOCK.release(reservedBytes); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "[ReleaseMemory] release: {}, current remaining memory: {}", + reservedBytes, + OPERATORS_MEMORY_BLOCK.getFreeMemoryInBytes()); + } + } + } + }); + } + return reservedBytes; + } + + /** + * Try to reserve bytes from the operators memory block. + * + * @return allocated bytes on success ({@code >= 0}), {@code -1} if allocation failed + */ + private long allocateOperatorsMemory(final long memoryInBytes) { + if (memoryInBytes <= 0) { + return 0L; + } + if (OPERATORS_MEMORY_BLOCK.allocate(memoryInBytes)) { if (LOGGER.isDebugEnabled()) { LOGGER.debug( "[ConsumeMemory] consume: {}, current remaining memory: {}", - estimatedMemorySize, + memoryInBytes, OPERATORS_MEMORY_BLOCK.getFreeMemoryInBytes()); } - } else { + return memoryInBytes; + } + return -1L; + } + + @TestOnly + long allocateOperatorsMemoryForTest(final long memoryInBytes) { + return allocateOperatorsMemory(memoryInBytes); + } + + @TestOnly + long reserveOperatorsMemoryForFragmentForTest( + final long estimatedMemorySize, final boolean isHighestPriority) + throws MemoryNotEnoughException { + if (isHighestPriority) { + return 0L; + } + long reservedBytes = allocateOperatorsMemory(estimatedMemorySize); + if (reservedBytes < 0) { throw new MemoryNotEnoughException( String.format( "There is not enough memory to execute current fragment instance, " @@ -221,22 +283,7 @@ private long checkMemory( + "estimated memory usage for current fragment instance is %dB", OPERATORS_MEMORY_BLOCK.getFreeMemoryInBytes(), estimatedMemorySize)); } - stateMachine.addStateChangeListener( - newState -> { - if (newState.isDone()) { - try (SetThreadName fragmentInstanceName = - new SetThreadName(stateMachine.getFragmentInstanceId().getFullId())) { - OPERATORS_MEMORY_BLOCK.release(estimatedMemorySize); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug( - "[ReleaseMemory] release: {}, current remaining memory: {}", - estimatedMemorySize, - OPERATORS_MEMORY_BLOCK.getFreeMemoryInBytes()); - } - } - } - }); - return estimatedMemorySize; + return reservedBytes; } private QueryDataSourceType getQueryDataSourceType(DataDriverContext dataDriverContext) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java index d156628532c34..9f49c5c77861a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.queryengine.plan.planner.memory; import org.apache.iotdb.calc.plan.planner.memory.MemoryReservationManager; +import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.queryengine.common.QueryId; import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner; @@ -39,6 +40,8 @@ public class NotThreadSafeMemoryReservationManager implements MemoryReservationM private final String contextHolder; + private boolean isHighestPriority; + private long reservedBytesInTotal = 0; private long bytesToBeReserved = 0; @@ -50,8 +53,24 @@ public NotThreadSafeMemoryReservationManager(final QueryId queryId, final String this.contextHolder = contextHolder; } + public void setHighestPriority(boolean isHighestPriority) { + this.isHighestPriority = isHighestPriority; + } + + public boolean isHighestPriority() { + return isHighestPriority; + } + + @TestOnly + public long getReservedBytesInTotalForTest() { + return reservedBytesInTotal; + } + @Override public void reserveMemoryCumulatively(final long size) { + if (isHighestPriority) { + return; + } bytesToBeReserved += size; if (bytesToBeReserved >= MEMORY_BATCH_THRESHOLD) { reserveMemoryImmediately(); @@ -60,6 +79,9 @@ public void reserveMemoryCumulatively(final long size) { @Override public void reserveMemoryImmediately() { + if (isHighestPriority) { + return; + } if (bytesToBeReserved != 0) { LOCAL_EXECUTION_PLANNER.reserveFromFreeMemoryForOperators( bytesToBeReserved, reservedBytesInTotal, queryId.getId(), contextHolder); @@ -70,15 +92,19 @@ public void reserveMemoryImmediately() { @Override public void reserveMemoryImmediately(final long size) { - if (size != 0) { - LOCAL_EXECUTION_PLANNER.reserveFromFreeMemoryForOperators( - size, reservedBytesInTotal, queryId.getId(), contextHolder); - reservedBytesInTotal += size; + if (isHighestPriority || size == 0) { + return; } + LOCAL_EXECUTION_PLANNER.reserveFromFreeMemoryForOperators( + size, reservedBytesInTotal, queryId.getId(), contextHolder); + reservedBytesInTotal += size; } @Override public void releaseMemoryCumulatively(final long size) { + if (isHighestPriority) { + return; + } bytesToBeReleased += size; if (bytesToBeReleased >= MEMORY_BATCH_THRESHOLD) { long bytesToRelease; @@ -96,6 +122,9 @@ public void releaseMemoryCumulatively(final long size) { @Override public void releaseAllReservedMemory() { + if (isHighestPriority) { + return; + } if (reservedBytesInTotal != 0) { LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(reservedBytesInTotal); reservedBytesInTotal = 0; @@ -106,6 +135,9 @@ public void releaseAllReservedMemory() { @Override public Pair releaseMemoryVirtually(final long size) { + if (isHighestPriority) { + return new Pair<>(size, 0L); + } if (bytesToBeReserved >= size) { bytesToBeReserved -= size; return new Pair<>(size, 0L); @@ -121,6 +153,9 @@ public Pair releaseMemoryVirtually(final long size) { @Override public void reserveMemoryVirtually( final long bytesToBeReserved, final long bytesAlreadyReserved) { + if (isHighestPriority) { + return; + } reservedBytesInTotal += bytesAlreadyReserved; reserveMemoryCumulatively(bytesToBeReserved); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java index 2a544421f3ffd..0a1c6eee4181e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java @@ -66,4 +66,9 @@ public synchronized void reserveMemoryVirtually( final long bytesToBeReserved, final long bytesAlreadyReserved) { super.reserveMemoryVirtually(bytesToBeReserved, bytesAlreadyReserved); } + + @Override + public synchronized void setHighestPriority(boolean isHighestPriority) { + super.setHighestPriority(isHighestPriority); + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlannerOperatorsMemoryTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlannerOperatorsMemoryTest.java new file mode 100644 index 0000000000000..4f255d642beb7 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlannerOperatorsMemoryTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.planner; + +import org.apache.iotdb.calc.exception.MemoryNotEnoughException; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +public class LocalExecutionPlannerOperatorsMemoryTest { + + private static final LocalExecutionPlanner PLANNER = LocalExecutionPlanner.getInstance(); + + private long bytesHeldByTest = 0L; + + @After + public void tearDown() { + if (bytesHeldByTest > 0) { + PLANNER.releaseToFreeMemoryForOperators(bytesHeldByTest); + bytesHeldByTest = 0L; + } + } + + @Test + public void testAllocateOperatorsMemoryFailsWhenInsufficient() { + long free = PLANNER.getFreeMemoryForOperators(); + Assert.assertEquals(-1L, PLANNER.allocateOperatorsMemoryForTest(free + 1024L)); + } + + @Test + public void testAllocateOperatorsMemorySucceedsWhenAvailable() { + long request = Math.min(1024L, PLANNER.getFreeMemoryForOperators()); + long reserved = PLANNER.allocateOperatorsMemoryForTest(request); + Assert.assertEquals(request, reserved); + bytesHeldByTest = reserved; + } + + @Test + public void testHighestPriorityBypassWithoutConsumingOperatorsPool() + throws MemoryNotEnoughException { + long freeBefore = PLANNER.getFreeMemoryForOperators(); + long request = freeBefore + 1024L; + + Assert.assertEquals(0L, PLANNER.reserveOperatorsMemoryForFragmentForTest(request, true)); + Assert.assertEquals(freeBefore, PLANNER.getFreeMemoryForOperators()); + } + + @Test + public void testHighestPriorityBypassWhenPoolInsufficient() throws MemoryNotEnoughException { + long free = PLANNER.getFreeMemoryForOperators(); + long request = free + 1024L; + + Assert.assertEquals(0L, PLANNER.reserveOperatorsMemoryForFragmentForTest(request, true)); + } + + @Test + public void testNormalPriorityThrowsWhenPoolInsufficient() { + long free = PLANNER.getFreeMemoryForOperators(); + long request = free + 1024L; + + try { + PLANNER.reserveOperatorsMemoryForFragmentForTest(request, false); + Assert.fail("Expect MemoryNotEnoughException"); + } catch (MemoryNotEnoughException ignore) { + } + } +} From cfa024ddb6a919fe15cadf947d29ec462376ddca Mon Sep 17 00:00:00 2001 From: Weihao Li <18110526956@163.com> Date: Tue, 30 Jun 2026 00:47:09 +0800 Subject: [PATCH 2/5] perfect Signed-off-by: Weihao Li <18110526956@163.com> --- .../memory/MemoryReservationManager.java | 6 + .../fragment/FragmentInstanceContext.java | 6 +- .../execution/operator/OperatorContext.java | 5 + .../source/relational/CteScanOperator.java | 2 +- .../plan/planner/LocalExecutionPlanner.java | 55 +++----- .../memory/FakedMemoryReservationManager.java | 3 + ...NotThreadSafeMemoryReservationManager.java | 119 ++++++++++-------- .../iotdb/db/utils/cte/MemoryReader.java | 18 ++- .../fragment/QueryModificationLoaderTest.java | 3 + ...alExecutionPlannerOperatorsMemoryTest.java | 113 ++++++++++++++--- .../queryengine/utils/cte/CteDataStore.java | 10 ++ 11 files changed, 226 insertions(+), 114 deletions(-) diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/memory/MemoryReservationManager.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/memory/MemoryReservationManager.java index b52534b4ec2ed..f0420330652e8 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/memory/MemoryReservationManager.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/memory/MemoryReservationManager.java @@ -80,4 +80,10 @@ public interface MemoryReservationManager { * @param bytesAlreadyReserved the amount of memory that has already been reserved */ void reserveMemoryVirtually(final long bytesToBeReserved, final long bytesAlreadyReserved); + + /** + * Mark this manager as highest-priority (e.g. SHOW QUERIES). When operators memory is + * insufficient, allocation will fall back to zero bytes instead of failing. + */ + void setHighestPriority(boolean isHighestPriority); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java index 4cf29a80dda44..98a385d3a389f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java @@ -45,7 +45,6 @@ import org.apache.iotdb.db.queryengine.metric.QueryRelatedResourceMetricSet; import org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet; import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet; -import org.apache.iotdb.db.queryengine.plan.planner.memory.NotThreadSafeMemoryReservationManager; import org.apache.iotdb.db.queryengine.plan.planner.memory.ThreadSafeMemoryReservationManager; import org.apache.iotdb.db.queryengine.plan.planner.plan.TimePredicate; import org.apache.iotdb.db.queryengine.plan.relational.function.tvf.read_tsfile.ExternalTsFileQueryDataSource; @@ -1366,9 +1365,8 @@ public boolean isHighestPriority() { public void setHighestPriority(boolean highestPriority) { this.highestPriority = highestPriority; - if (memoryReservationManager instanceof NotThreadSafeMemoryReservationManager) { - ((NotThreadSafeMemoryReservationManager) memoryReservationManager) - .setHighestPriority(highestPriority); + if (memoryReservationManager != null) { + memoryReservationManager.setHighestPriority(highestPriority); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java index 584cb4fe74c00..0ff922f889436 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java @@ -71,6 +71,11 @@ public SessionInfo getSessionInfo() { return getInstanceContext().getSessionInfo(); } + public boolean isHighestPriority() { + FragmentInstanceContext instanceContext = getInstanceContext(); + return instanceContext != null && instanceContext.isHighestPriority(); + } + @Override public void recordScanAggregationFromRawDataCost(long costTimeInNanos) { if (driverContext != null && driverContext.getFragmentInstanceContext() != null) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/CteScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/CteScanOperator.java index c25fe15d93e81..c995ab62bca5e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/CteScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/CteScanOperator.java @@ -59,7 +59,7 @@ public CteScanOperator( requireNonNull(dataStore, "dataStore is null"); this.operatorContext = operatorContext; this.sourceId = sourceId; - this.dataReader = new MemoryReader(dataStore, queryId); + this.dataReader = new MemoryReader(dataStore, queryId, operatorContext.isHighestPriority()); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java index 470aa4257a74e..cfdd39c15b3ec 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java @@ -207,11 +207,8 @@ private long checkMemory( QueryRelatedResourceMetricSet.getInstance().updateEstimatedMemory(estimatedMemorySize); - if (instanceContext.isHighestPriority()) { - return 0L; - } - - long reservedBytes = allocateOperatorsMemory(estimatedMemorySize); + long reservedBytes = + allocateOperatorsMemory(estimatedMemorySize, instanceContext.isHighestPriority()); if (reservedBytes < 0) { throw new MemoryNotEnoughException( String.format( @@ -244,9 +241,10 @@ private long checkMemory( /** * Try to reserve bytes from the operators memory block. * - * @return allocated bytes on success ({@code >= 0}), {@code -1} if allocation failed + * @return allocated bytes on success ({@code > 0}), {@code 0} if nothing to allocate or + * highest-priority fallback applies, {@code -1} if allocation failed */ - private long allocateOperatorsMemory(final long memoryInBytes) { + private long allocateOperatorsMemory(final long memoryInBytes, final boolean isHighestPriority) { if (memoryInBytes <= 0) { return 0L; } @@ -259,31 +257,15 @@ private long allocateOperatorsMemory(final long memoryInBytes) { } return memoryInBytes; } + if (isHighestPriority) { + return 0L; + } return -1L; } @TestOnly - long allocateOperatorsMemoryForTest(final long memoryInBytes) { - return allocateOperatorsMemory(memoryInBytes); - } - - @TestOnly - long reserveOperatorsMemoryForFragmentForTest( - final long estimatedMemorySize, final boolean isHighestPriority) - throws MemoryNotEnoughException { - if (isHighestPriority) { - return 0L; - } - long reservedBytes = allocateOperatorsMemory(estimatedMemorySize); - if (reservedBytes < 0) { - throw new MemoryNotEnoughException( - String.format( - "There is not enough memory to execute current fragment instance, " - + "current remaining free memory is %dB, " - + "estimated memory usage for current fragment instance is %dB", - OPERATORS_MEMORY_BLOCK.getFreeMemoryInBytes(), estimatedMemorySize)); - } - return reservedBytes; + long allocateOperatorsMemoryForTest(final long memoryInBytes, final boolean isHighestPriority) { + return allocateOperatorsMemory(memoryInBytes, isHighestPriority); } private QueryDataSourceType getQueryDataSourceType(DataDriverContext dataDriverContext) { @@ -338,23 +320,19 @@ public synchronized long tryAllocateFreeMemory4Load(final long memoryInBytes) { } } - public void reserveFromFreeMemoryForOperators( + public long reserveFromFreeMemoryForOperators( final long memoryInBytes, final long reservedBytes, final String queryId, - final String contextHolder) { + final String contextHolder, + final boolean isHighestPriority) + throws MemoryNotEnoughException { if (memoryInBytes <= 0) { throw new IllegalArgumentException( "Bytes to reserve from free memory for operators should be larger than 0"); } - if (OPERATORS_MEMORY_BLOCK.allocate(memoryInBytes)) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug( - "[ConsumeMemory] consume: {}, current remaining memory: {}", - memoryInBytes, - OPERATORS_MEMORY_BLOCK.getFreeMemoryInBytes()); - } - } else { + long allocated = allocateOperatorsMemory(memoryInBytes, isHighestPriority); + if (allocated < 0) { throw new MemoryNotEnoughException( String.format( "There is not enough memory for Query %s, the contextHolder is %s," @@ -367,6 +345,7 @@ public void reserveFromFreeMemoryForOperators( reservedBytes, memoryInBytes)); } + return allocated; } public void releaseToFreeMemoryForOperators(final long memoryInBytes) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java index 7cee8034a053d..d1c34e365efbc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java @@ -48,4 +48,7 @@ public Pair releaseMemoryVirtually(final long size) { @Override public void reserveMemoryVirtually( final long bytesToBeReserved, final long bytesAlreadyReserved) {} + + @Override + public void setHighestPriority(boolean isHighestPriority) {} } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java index 9f49c5c77861a..df721f030e1c6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java @@ -44,6 +44,12 @@ public class NotThreadSafeMemoryReservationManager implements MemoryReservationM private long reservedBytesInTotal = 0; + /** + * Bytes logically reserved but not taken from the operators pool due to highest-priority + * fallback. + */ + private long fallbackBytesInTotal = 0; + private long bytesToBeReserved = 0; private long bytesToBeReleased = 0; @@ -53,24 +59,23 @@ public NotThreadSafeMemoryReservationManager(final QueryId queryId, final String this.contextHolder = contextHolder; } + @Override public void setHighestPriority(boolean isHighestPriority) { this.isHighestPriority = isHighestPriority; } - public boolean isHighestPriority() { - return isHighestPriority; - } - @TestOnly public long getReservedBytesInTotalForTest() { return reservedBytesInTotal; } + @TestOnly + public long getFallbackBytesInTotalForTest() { + return fallbackBytesInTotal; + } + @Override public void reserveMemoryCumulatively(final long size) { - if (isHighestPriority) { - return; - } bytesToBeReserved += size; if (bytesToBeReserved >= MEMORY_BATCH_THRESHOLD) { reserveMemoryImmediately(); @@ -79,83 +84,97 @@ public void reserveMemoryCumulatively(final long size) { @Override public void reserveMemoryImmediately() { - if (isHighestPriority) { - return; - } if (bytesToBeReserved != 0) { - LOCAL_EXECUTION_PLANNER.reserveFromFreeMemoryForOperators( - bytesToBeReserved, reservedBytesInTotal, queryId.getId(), contextHolder); - reservedBytesInTotal += bytesToBeReserved; + long actualReserved = + LOCAL_EXECUTION_PLANNER.reserveFromFreeMemoryForOperators( + bytesToBeReserved, + reservedBytesInTotal, + queryId.getId(), + contextHolder, + isHighestPriority); + if (actualReserved == 0) { + fallbackBytesInTotal += bytesToBeReserved; + } else { + reservedBytesInTotal += actualReserved; + } bytesToBeReserved = 0; } } @Override public void reserveMemoryImmediately(final long size) { - if (isHighestPriority || size == 0) { - return; + if (size != 0) { + long actualReserved = + LOCAL_EXECUTION_PLANNER.reserveFromFreeMemoryForOperators( + size, reservedBytesInTotal, queryId.getId(), contextHolder, isHighestPriority); + if (actualReserved == 0) { + fallbackBytesInTotal += size; + } else { + reservedBytesInTotal += actualReserved; + } } - LOCAL_EXECUTION_PLANNER.reserveFromFreeMemoryForOperators( - size, reservedBytesInTotal, queryId.getId(), contextHolder); - reservedBytesInTotal += size; } @Override public void releaseMemoryCumulatively(final long size) { - if (isHighestPriority) { - return; - } bytesToBeReleased += size; if (bytesToBeReleased >= MEMORY_BATCH_THRESHOLD) { - long bytesToRelease; - if (bytesToBeReleased <= bytesToBeReserved) { - bytesToBeReserved -= bytesToBeReleased; - } else { - bytesToRelease = bytesToBeReleased - bytesToBeReserved; - bytesToBeReserved = 0; - LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(bytesToRelease); - reservedBytesInTotal -= bytesToRelease; - } + releaseBytesImmediately(bytesToBeReleased); bytesToBeReleased = 0; } } + private void releaseBytesImmediately(final long size) { + long poolBytes = deductReleaseAccounting(size); + if (poolBytes > 0) { + LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(poolBytes); + } + } + + /** Deduct release size from pending reserve, fallback quota, then pool reservation in order. */ + private long deductReleaseAccounting(final long size) { + long remaining = size; + if (remaining <= bytesToBeReserved) { + bytesToBeReserved -= remaining; + return 0L; + } + remaining -= bytesToBeReserved; + bytesToBeReserved = 0; + + if (remaining <= fallbackBytesInTotal) { + fallbackBytesInTotal -= remaining; + return 0L; + } + remaining -= fallbackBytesInTotal; + fallbackBytesInTotal = 0; + + reservedBytesInTotal -= remaining; + return remaining; + } + @Override public void releaseAllReservedMemory() { - if (isHighestPriority) { - return; + if (bytesToBeReleased != 0) { + releaseBytesImmediately(bytesToBeReleased); + bytesToBeReleased = 0; } if (reservedBytesInTotal != 0) { LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(reservedBytesInTotal); reservedBytesInTotal = 0; - bytesToBeReserved = 0; - bytesToBeReleased = 0; } + fallbackBytesInTotal = 0; + bytesToBeReserved = 0; } @Override public Pair releaseMemoryVirtually(final long size) { - if (isHighestPriority) { - return new Pair<>(size, 0L); - } - if (bytesToBeReserved >= size) { - bytesToBeReserved -= size; - return new Pair<>(size, 0L); - } else { - long releasedBytesInReserved = bytesToBeReserved; - long releasedBytesInTotal = size - bytesToBeReserved; - bytesToBeReserved = 0; - reservedBytesInTotal -= releasedBytesInTotal; - return new Pair<>(releasedBytesInReserved, releasedBytesInTotal); - } + long poolBytes = deductReleaseAccounting(size); + return new Pair<>(size - poolBytes, poolBytes); } @Override public void reserveMemoryVirtually( final long bytesToBeReserved, final long bytesAlreadyReserved) { - if (isHighestPriority) { - return; - } reservedBytesInTotal += bytesAlreadyReserved; reserveMemoryCumulatively(bytesToBeReserved); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/cte/MemoryReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/cte/MemoryReader.java index 7daad8d46e416..8dd7bc3b1573e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/cte/MemoryReader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/cte/MemoryReader.java @@ -38,12 +38,18 @@ public class MemoryReader implements CteDataReader { private final CteDataStore dataStore; private int tsBlockIndex; - public MemoryReader(CteDataStore dataStore, QueryId queryId) { + public MemoryReader(CteDataStore dataStore, QueryId queryId, boolean isHighestPriority) { this.dataStore = dataStore; this.tsBlockIndex = 0; if (dataStore.incrementAndGetCount() == 1) { - LOCAL_EXECUTION_PLANNER.reserveFromFreeMemoryForOperators( - dataStore.ramBytesUsed(), 0L, queryId.getId(), MemoryReader.class.getName()); + long actualReserved = + LOCAL_EXECUTION_PLANNER.reserveFromFreeMemoryForOperators( + dataStore.ramBytesUsed(), + 0L, + queryId.getId(), + MemoryReader.class.getName(), + isHighestPriority); + dataStore.setOperatorsMemoryReservedBytes(actualReserved); } } @@ -63,7 +69,11 @@ public TsBlock next() throws IoTDBException { @Override public void close() throws IoTDBException { if (dataStore.decrementAndGetCount() == 0) { - LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(dataStore.ramBytesUsed()); + long reservedBytes = dataStore.getOperatorsMemoryReservedBytes(); + if (reservedBytes > 0) { + LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(reservedBytes); + } + dataStore.setOperatorsMemoryReservedBytes(0L); } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoaderTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoaderTest.java index 39b9c41447cdf..65cbc0f5b9f2b 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoaderTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoaderTest.java @@ -339,6 +339,9 @@ public void reserveMemoryVirtually(long bytesToBeReserved, long bytesAlreadyRese reservedBytes += bytesToBeReserved + bytesAlreadyReserved; } + @Override + public void setHighestPriority(boolean isHighestPriority) {} + private long getReservedBytes() { return reservedBytes; } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlannerOperatorsMemoryTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlannerOperatorsMemoryTest.java index 4f255d642beb7..6d0cabb044313 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlannerOperatorsMemoryTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlannerOperatorsMemoryTest.java @@ -19,7 +19,8 @@ package org.apache.iotdb.db.queryengine.plan.planner; -import org.apache.iotdb.calc.exception.MemoryNotEnoughException; +import org.apache.iotdb.db.queryengine.common.QueryId; +import org.apache.iotdb.db.queryengine.plan.planner.memory.NotThreadSafeMemoryReservationManager; import org.junit.After; import org.junit.Assert; @@ -42,44 +43,122 @@ public void tearDown() { @Test public void testAllocateOperatorsMemoryFailsWhenInsufficient() { long free = PLANNER.getFreeMemoryForOperators(); - Assert.assertEquals(-1L, PLANNER.allocateOperatorsMemoryForTest(free + 1024L)); + Assert.assertEquals(-1L, PLANNER.allocateOperatorsMemoryForTest(free + 1024L, false)); } @Test public void testAllocateOperatorsMemorySucceedsWhenAvailable() { long request = Math.min(1024L, PLANNER.getFreeMemoryForOperators()); - long reserved = PLANNER.allocateOperatorsMemoryForTest(request); + long reserved = PLANNER.allocateOperatorsMemoryForTest(request, false); Assert.assertEquals(request, reserved); bytesHeldByTest = reserved; } @Test - public void testHighestPriorityBypassWithoutConsumingOperatorsPool() - throws MemoryNotEnoughException { + public void testHighestPriorityAllocatesWhenPoolHasRoom() { + long request = Math.min(1024L, PLANNER.getFreeMemoryForOperators()); + if (request <= 0) { + return; + } + long freeBefore = PLANNER.getFreeMemoryForOperators(); + + long reserved = PLANNER.allocateOperatorsMemoryForTest(request, true); + Assert.assertEquals(request, reserved); + Assert.assertEquals(freeBefore - request, PLANNER.getFreeMemoryForOperators()); + bytesHeldByTest = reserved; + } + + @Test + public void testHighestPriorityFallbackWhenPoolInsufficient() { long freeBefore = PLANNER.getFreeMemoryForOperators(); long request = freeBefore + 1024L; - Assert.assertEquals(0L, PLANNER.reserveOperatorsMemoryForFragmentForTest(request, true)); + Assert.assertEquals(0L, PLANNER.allocateOperatorsMemoryForTest(request, true)); Assert.assertEquals(freeBefore, PLANNER.getFreeMemoryForOperators()); } @Test - public void testHighestPriorityBypassWhenPoolInsufficient() throws MemoryNotEnoughException { - long free = PLANNER.getFreeMemoryForOperators(); - long request = free + 1024L; + public void testMemoryReservationManagerHighestPriorityAllocatesWhenPoolHasRoom() { + long request = Math.min(1024L, PLANNER.getFreeMemoryForOperators()); + if (request <= 0) { + return; + } + + NotThreadSafeMemoryReservationManager manager = + new NotThreadSafeMemoryReservationManager(new QueryId("show_queries"), "test"); + manager.setHighestPriority(true); + long freeBefore = PLANNER.getFreeMemoryForOperators(); - Assert.assertEquals(0L, PLANNER.reserveOperatorsMemoryForFragmentForTest(request, true)); + manager.reserveMemoryImmediately(request); + Assert.assertEquals(request, manager.getReservedBytesInTotalForTest()); + Assert.assertEquals(freeBefore - request, PLANNER.getFreeMemoryForOperators()); + + manager.releaseAllReservedMemory(); + Assert.assertEquals(0L, manager.getReservedBytesInTotalForTest()); + Assert.assertEquals(freeBefore, PLANNER.getFreeMemoryForOperators()); } @Test - public void testNormalPriorityThrowsWhenPoolInsufficient() { - long free = PLANNER.getFreeMemoryForOperators(); - long request = free + 1024L; + public void testMemoryReservationManagerHighestPriorityFallbackWhenPoolInsufficient() { + long freeBefore = PLANNER.getFreeMemoryForOperators(); + long request = freeBefore + 1024L; + + NotThreadSafeMemoryReservationManager manager = + new NotThreadSafeMemoryReservationManager(new QueryId("show_queries"), "test"); + manager.setHighestPriority(true); + + manager.reserveMemoryImmediately(request); + Assert.assertEquals(0L, manager.getReservedBytesInTotalForTest()); + Assert.assertEquals(request, manager.getFallbackBytesInTotalForTest()); + Assert.assertEquals(freeBefore, PLANNER.getFreeMemoryForOperators()); - try { - PLANNER.reserveOperatorsMemoryForFragmentForTest(request, false); - Assert.fail("Expect MemoryNotEnoughException"); - } catch (MemoryNotEnoughException ignore) { + manager.releaseMemoryCumulatively(request); + Assert.assertEquals(0L, manager.getFallbackBytesInTotalForTest()); + Assert.assertEquals(freeBefore, PLANNER.getFreeMemoryForOperators()); + + manager.releaseAllReservedMemory(); + Assert.assertEquals(0L, manager.getFallbackBytesInTotalForTest()); + Assert.assertEquals(freeBefore, PLANNER.getFreeMemoryForOperators()); + } + + @Test + public void testMemoryReservationManagerHighestPriorityFallbackReleaseViaBatchThreshold() { + long freeBefore = PLANNER.getFreeMemoryForOperators(); + long request = freeBefore + MEMORY_BATCH_THRESHOLD; + + NotThreadSafeMemoryReservationManager manager = + new NotThreadSafeMemoryReservationManager(new QueryId("show_queries"), "test"); + manager.setHighestPriority(true); + + manager.reserveMemoryImmediately(request); + Assert.assertEquals(0L, manager.getReservedBytesInTotalForTest()); + Assert.assertEquals(request, manager.getFallbackBytesInTotalForTest()); + + manager.releaseMemoryCumulatively(request); + Assert.assertEquals(0L, manager.getFallbackBytesInTotalForTest()); + Assert.assertEquals(0L, manager.getReservedBytesInTotalForTest()); + Assert.assertEquals(freeBefore, PLANNER.getFreeMemoryForOperators()); + } + + private static final long MEMORY_BATCH_THRESHOLD = 1024L * 1024L; + + @Test + public void testMemoryReservationManagerNormalPriorityReserveAndRelease() { + long request = Math.min(1024L, PLANNER.getFreeMemoryForOperators()); + if (request <= 0) { + return; } + + NotThreadSafeMemoryReservationManager manager = + new NotThreadSafeMemoryReservationManager(new QueryId("normal_query"), "test"); + long freeBefore = PLANNER.getFreeMemoryForOperators(); + + manager.reserveMemoryImmediately(request); + Assert.assertEquals(request, manager.getReservedBytesInTotalForTest()); + Assert.assertEquals(freeBefore - request, PLANNER.getFreeMemoryForOperators()); + + manager.releaseAllReservedMemory(); + Assert.assertEquals(0L, manager.getReservedBytesInTotalForTest()); + Assert.assertEquals(freeBefore, PLANNER.getFreeMemoryForOperators()); } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/utils/cte/CteDataStore.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/utils/cte/CteDataStore.java index d2cbcc17fa55b..edeb546655ad6 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/utils/cte/CteDataStore.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/utils/cte/CteDataStore.java @@ -48,6 +48,8 @@ public class CteDataStore implements Accountable { // reference count by CteScanReader private final AtomicInteger count; + private long operatorsMemoryReservedBytes; + public CteDataStore(TableSchema tableSchema, List columnIndex2TsBlockColumnIndexList) { this.tableSchema = tableSchema; this.columnIndex2TsBlockColumnIndexList = columnIndex2TsBlockColumnIndexList; @@ -106,4 +108,12 @@ public long ramBytesUsed() { public int getCount() { return count.get(); } + + public long getOperatorsMemoryReservedBytes() { + return operatorsMemoryReservedBytes; + } + + public void setOperatorsMemoryReservedBytes(long operatorsMemoryReservedBytes) { + this.operatorsMemoryReservedBytes = operatorsMemoryReservedBytes; + } } From 8b9036968016d807c8ea0c79158208bf5c819333 Mon Sep 17 00:00:00 2001 From: Weihao Li <18110526956@163.com> Date: Tue, 30 Jun 2026 09:49:15 +0800 Subject: [PATCH 3/5] fix Signed-off-by: Weihao Li <18110526956@163.com> --- .../NotThreadSafeMemoryReservationManager.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java index df721f030e1c6..697533672b908 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java @@ -117,6 +117,9 @@ public void reserveMemoryImmediately(final long size) { @Override public void releaseMemoryCumulatively(final long size) { + if (size <= 0) { + return; + } bytesToBeReleased += size; if (bytesToBeReleased >= MEMORY_BATCH_THRESHOLD) { releaseBytesImmediately(bytesToBeReleased); @@ -148,8 +151,9 @@ private long deductReleaseAccounting(final long size) { remaining -= fallbackBytesInTotal; fallbackBytesInTotal = 0; - reservedBytesInTotal -= remaining; - return remaining; + long poolBytes = Math.min(remaining, reservedBytesInTotal); + reservedBytesInTotal -= poolBytes; + return poolBytes; } @Override @@ -158,10 +162,10 @@ public void releaseAllReservedMemory() { releaseBytesImmediately(bytesToBeReleased); bytesToBeReleased = 0; } - if (reservedBytesInTotal != 0) { + if (reservedBytesInTotal > 0) { LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(reservedBytesInTotal); - reservedBytesInTotal = 0; } + reservedBytesInTotal = 0; fallbackBytesInTotal = 0; bytesToBeReserved = 0; } From fe29310fce753225b57529ada229131e97aeb3bf Mon Sep 17 00:00:00 2001 From: Weihao Li <18110526956@163.com> Date: Tue, 30 Jun 2026 10:49:15 +0800 Subject: [PATCH 4/5] fix UT and perfect Signed-off-by: Weihao Li <18110526956@163.com> --- .../NotThreadSafeMemoryReservationManager.java | 17 ++++++++--------- .../execution/operator/CteScanOperatorTest.java | 2 +- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java index 697533672b908..71924894c7c96 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java @@ -28,6 +28,8 @@ import javax.annotation.concurrent.NotThreadSafe; +import static com.google.common.base.Preconditions.checkState; + @NotThreadSafe public class NotThreadSafeMemoryReservationManager implements MemoryReservationManager { // To avoid reserving memory too frequently, we choose to do it in batches. This is the lower @@ -151,23 +153,20 @@ private long deductReleaseAccounting(final long size) { remaining -= fallbackBytesInTotal; fallbackBytesInTotal = 0; - long poolBytes = Math.min(remaining, reservedBytesInTotal); - reservedBytesInTotal -= poolBytes; - return poolBytes; + reservedBytesInTotal -= remaining; + checkState(reservedBytesInTotal >= 0, "Released bytes has been larger than reserved!"); + return remaining; } @Override public void releaseAllReservedMemory() { - if (bytesToBeReleased != 0) { - releaseBytesImmediately(bytesToBeReleased); - bytesToBeReleased = 0; - } - if (reservedBytesInTotal > 0) { + if (reservedBytesInTotal != 0) { LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(reservedBytesInTotal); + reservedBytesInTotal = 0; } - reservedBytesInTotal = 0; fallbackBytesInTotal = 0; bytesToBeReserved = 0; + bytesToBeReleased = 0; } @Override diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/CteScanOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/CteScanOperatorTest.java index 7309e2e06adcb..ab476f61ee9ea 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/CteScanOperatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/CteScanOperatorTest.java @@ -177,7 +177,7 @@ public void testMultipleCteScanOperators() throws Exception { new CteScanOperator(operatorContext, planNodeId, cteDataStore, queryId); assertEquals(2, cteDataStore.getCount()); - assertEquals(896, cteDataStore.ramBytesUsed()); + assertEquals(904, cteDataStore.ramBytesUsed()); // Both operators should be able to read data assertTrue(operator1.hasNext()); From 1e10da2ade0181596e723d7c2b239ca41bc44431 Mon Sep 17 00:00:00 2001 From: Weihao Li <18110526956@163.com> Date: Tue, 30 Jun 2026 12:24:02 +0800 Subject: [PATCH 5/5] fix according review Signed-off-by: Weihao Li <18110526956@163.com> --- .../apache/iotdb/db/utils/cte/MemoryReader.java | 6 +++--- .../queryengine/utils/cte/CteDataStore.java | 14 +++++++++----- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/cte/MemoryReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/cte/MemoryReader.java index 8dd7bc3b1573e..b4e8d97352919 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/cte/MemoryReader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/cte/MemoryReader.java @@ -49,7 +49,7 @@ public MemoryReader(CteDataStore dataStore, QueryId queryId, boolean isHighestPr queryId.getId(), MemoryReader.class.getName(), isHighestPriority); - dataStore.setOperatorsMemoryReservedBytes(actualReserved); + dataStore.setActualReservedBytes(actualReserved); } } @@ -69,11 +69,11 @@ public TsBlock next() throws IoTDBException { @Override public void close() throws IoTDBException { if (dataStore.decrementAndGetCount() == 0) { - long reservedBytes = dataStore.getOperatorsMemoryReservedBytes(); + long reservedBytes = dataStore.getActualReservedBytes(); if (reservedBytes > 0) { LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(reservedBytes); } - dataStore.setOperatorsMemoryReservedBytes(0L); + dataStore.setActualReservedBytes(0L); } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/utils/cte/CteDataStore.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/utils/cte/CteDataStore.java index edeb546655ad6..81484abfe0b3f 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/utils/cte/CteDataStore.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/utils/cte/CteDataStore.java @@ -48,7 +48,11 @@ public class CteDataStore implements Accountable { // reference count by CteScanReader private final AtomicInteger count; - private long operatorsMemoryReservedBytes; + /** + * Bytes actually reserved from the operators free-memory pool for this CTE data store. May be + * {@code 0} if reservation fell back (e.g. highest-priority query with insufficient pool). + */ + private long actualReservedBytes; public CteDataStore(TableSchema tableSchema, List columnIndex2TsBlockColumnIndexList) { this.tableSchema = tableSchema; @@ -109,11 +113,11 @@ public int getCount() { return count.get(); } - public long getOperatorsMemoryReservedBytes() { - return operatorsMemoryReservedBytes; + public long getActualReservedBytes() { + return actualReservedBytes; } - public void setOperatorsMemoryReservedBytes(long operatorsMemoryReservedBytes) { - this.operatorsMemoryReservedBytes = operatorsMemoryReservedBytes; + public void setActualReservedBytes(long actualReservedBytes) { + this.actualReservedBytes = actualReservedBytes; } }