From 52b51d329a9262c01e731ef345d8303edb0a9dd4 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Fri, 3 Jul 2026 11:23:06 +0800 Subject: [PATCH 1/2] Pipe: preserve TsFile parse OOM cause (cherry picked from commit f8e393e6f0c0f7c940942eadc2b10171b69d78f6) --- .../autocreate/IoTDBPipeTsFileParseOOMIT.java | 115 ++++++++++++++++++ .../tsfile/PipeTsFileInsertionEvent.java | 4 +- .../PipeProcessorSubtaskExecutorTest.java | 99 +++++++++++++++ .../throwing/ThrowingExceptionProcessor.java | 28 +++++ 4 files changed, 244 insertions(+), 2 deletions(-) create mode 100644 integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeTsFileParseOOMIT.java diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeTsFileParseOOMIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeTsFileParseOOMIT.java new file mode 100644 index 0000000000000..ebbee3eb02363 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeTsFileParseOOMIT.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.pipe.it.autocreate; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; +import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; +import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo; +import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq; +import org.apache.iotdb.db.it.utils.TestUtils; +import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@RunWith(IoTDBTestRunner.class) +@Category({MultiClusterIT2AutoCreateSchema.class}) +public class IoTDBPipeTsFileParseOOMIT extends AbstractPipeDualAutoIT { + + @Override + protected void setupConfig() { + super.setupConfig(); + senderEnv.getConfig().getCommonConfig().setPipeMemoryManagementEnabled(true); + senderEnv + .getConfig() + .getCommonConfig() + .setPipeDataStructureTabletMemoryBlockAllocationRejectThreshold(0); + senderEnv + .getConfig() + .getCommonConfig() + .setPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold(0); + } + + @Test + public void testPipeKeepsRunningWhenTsFileProcessingTemporarilyOutOfMemory() throws Exception { + final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + final String receiverIp = receiverDataNode.getIp(); + final int receiverPort = receiverDataNode.getPort(); + + TestUtils.executeNonQueries( + senderEnv, + Arrays.asList("insert into root.temporary_oom.d0(time,s1) values (0,1)", "flush"), + null); + + try (final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + final Map extractorAttributes = new HashMap<>(); + final Map processorAttributes = new HashMap<>(); + final Map connectorAttributes = new HashMap<>(); + + extractorAttributes.put("extractor.pattern", "root.temporary_oom"); + extractorAttributes.put("extractor.realtime.enable", "false"); + + processorAttributes.put("processor", "throwing-exception-processor"); + processorAttributes.put("stages", "process-tsfile-insertion-event-with-temporary-oom"); + + connectorAttributes.put("connector", "iotdb-thrift-connector"); + connectorAttributes.put("connector.batch.enable", "false"); + connectorAttributes.put("connector.ip", receiverIp); + connectorAttributes.put("connector.port", Integer.toString(receiverPort)); + + final TSStatus status = + client.createPipe( + new TCreatePipeReq("temporary_oom_pipe", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + assertPipeRunningFor(client, "temporary_oom_pipe", 35_000L); + } + } + + private void assertPipeRunningFor( + final SyncConfigNodeIServiceClient client, final String pipeName, final long durationMs) + throws Exception { + final long startTime = System.currentTimeMillis(); + while (System.currentTimeMillis() - startTime < durationMs) { + final List showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList; + final TShowPipeInfo pipeInfo = + showPipeResult.stream().filter(info -> info.id.equals(pipeName)).findFirst().orElse(null); + Assert.assertNotNull(pipeInfo); + Assert.assertEquals("RUNNING", pipeInfo.state); + Thread.sleep(1000); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index c9cd2d44af04b..ae07e9d9712b3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -551,7 +551,7 @@ public Iterable toTabletInsertionEvents(final long timeout } else { PipeLogger.log(LOGGER::warn, e, errorMsg); } - throw new PipeException(errorMsg); + throw new PipeException(errorMsg, e); } } @@ -630,7 +630,7 @@ private TsFileInsertionDataContainer initDataContainer() { final String errorMsg = String.format("Read TsFile %s error.", tsFile.getPath()); LOGGER.warn(errorMsg, e); - throw new PipeException(errorMsg); + throw new PipeException(errorMsg, e); } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/PipeProcessorSubtaskExecutorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/PipeProcessorSubtaskExecutorTest.java index 4d3cf4c49d834..a403e83329ef1 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/PipeProcessorSubtaskExecutorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/PipeProcessorSubtaskExecutorTest.java @@ -19,16 +19,30 @@ package org.apache.iotdb.db.pipe.agent.task; +import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException; import org.apache.iotdb.commons.pipe.agent.task.connection.EventSupplier; import org.apache.iotdb.db.pipe.agent.task.connection.PipeEventCollector; import org.apache.iotdb.db.pipe.agent.task.execution.PipeProcessorSubtaskExecutor; import org.apache.iotdb.db.pipe.agent.task.subtask.processor.PipeProcessorSubtask; +import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; +import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; +import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock; +import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.pipe.api.PipeProcessor; +import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; +import org.apache.iotdb.pipe.api.exception.PipeException; +import org.junit.Assert; import org.junit.Before; +import org.junit.Test; import org.mockito.Mockito; +import java.io.File; + +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class PipeProcessorSubtaskExecutorTest extends PipeSubtaskExecutorTest { @@ -47,4 +61,89 @@ public void setUp() throws Exception { mock(PipeProcessor.class), mock(PipeEventCollector.class))); } + + @Test + public void testTsFileInsertionEventPreservesOutOfMemoryCause() { + final PipeMemoryManager memoryManager = PipeDataNodeResourceManager.memory(); + PipeMemoryBlock memoryBlock = null; + + try { + memoryBlock = + memoryManager.forceAllocateForTabletWithRetry( + PipeMemoryManager.getTotalNonFloatingMemorySizeInBytes()); + Assert.assertFalse(memoryManager.isEnough4TabletParsing()); + + final File tsFile = + new File("target/testTsFileInsertionEventPreservesOutOfMemoryCause.tsfile"); + final TsFileResource resource = mock(TsFileResource.class); + when(resource.isClosed()).thenReturn(true); + when(resource.isEmpty()).thenReturn(false); + when(resource.isGeneratedByPipe()).thenReturn(false); + when(resource.isGeneratedByPipeConsensus()).thenReturn(false); + when(resource.getTsFilePath()).thenReturn(tsFile.getPath()); + + final PipeTsFileInsertionEvent event = + new PipeTsFileInsertionEvent( + resource, tsFile, false, false, false, "testPipe", 0, null, null, 0, 1); + + final PipeException exception = + Assert.assertThrows(PipeException.class, () -> event.toTabletInsertionEvents(1)); + Assert.assertTrue(exception.getCause() instanceof PipeRuntimeOutOfMemoryCriticalException); + } finally { + memoryManager.release(memoryBlock); + } + } + + @Test + public void testProcessorSubtaskTreatsOutOfMemoryCauseAsTemporaryFailure() throws Exception { + final EventSupplier eventSupplier = mock(EventSupplier.class); + final PipeProcessor pipeProcessor = mock(PipeProcessor.class); + final PipeEventCollector pipeEventCollector = mock(PipeEventCollector.class); + final TsFileInsertionEvent tsFileInsertionEvent = mock(TsFileInsertionEvent.class); + when(eventSupplier.supply()).thenReturn(tsFileInsertionEvent); + doThrow( + new PipeException( + "Parse TsFile error", + new PipeRuntimeOutOfMemoryCriticalException( + "TimeoutException: Waited 22.016 seconds for memory to parse TsFile"))) + .when(pipeProcessor) + .process(tsFileInsertionEvent, pipeEventCollector); + + final TestablePipeProcessorSubtask pipeProcessorSubtask = + new TestablePipeProcessorSubtask( + "PipeProcessorSubtaskExecutorTest", + "TestPipe", + System.currentTimeMillis(), + 0, + eventSupplier, + pipeProcessor, + pipeEventCollector); + + Assert.assertFalse(pipeProcessorSubtask.executeOnceForTest()); + } + + private static class TestablePipeProcessorSubtask extends PipeProcessorSubtask { + + private TestablePipeProcessorSubtask( + final String taskID, + final String pipeName, + final long creationTime, + final int regionId, + final EventSupplier inputEventSupplier, + final PipeProcessor pipeProcessor, + final PipeEventCollector outputEventCollector) { + super( + taskID, + pipeName, + creationTime, + regionId, + inputEventSupplier, + pipeProcessor, + outputEventCollector); + } + + private boolean executeOnceForTest() throws Exception { + return executeOnce(); + } + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/processor/throwing/ThrowingExceptionProcessor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/processor/throwing/ThrowingExceptionProcessor.java index ca4cd6f7dded8..50bbb4b3acee4 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/processor/throwing/ThrowingExceptionProcessor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/processor/throwing/ThrowingExceptionProcessor.java @@ -28,6 +28,7 @@ import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; +import java.lang.reflect.InvocationTargetException; import java.util.Arrays; import java.util.Set; import java.util.stream.Collectors; @@ -37,6 +38,7 @@ public class ThrowingExceptionProcessor implements PipeProcessor { private boolean throwInCustomize = false; private boolean throwInProcessTabletInsertionEvent = false; private boolean throwInProcessTsFileInsertionEvent = false; + private boolean throwInProcessTsFileInsertionEventWithTemporaryOutOfMemory = false; private boolean throwInProcessEvent = false; private boolean throwInClose = false; @@ -55,6 +57,8 @@ public void validate(PipeParameterValidator validator) throws Exception { throwInCustomize = throwingStages.contains("customize"); throwInProcessTabletInsertionEvent = throwingStages.contains("process-tablet-insertion-event"); throwInProcessTsFileInsertionEvent = throwingStages.contains("process-tsfile-insertion-event"); + throwInProcessTsFileInsertionEventWithTemporaryOutOfMemory = + throwingStages.contains("process-tsfile-insertion-event-with-temporary-oom"); throwInProcessEvent = throwingStages.contains("process-event"); throwInClose = throwingStages.contains("close"); } @@ -78,11 +82,35 @@ public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector ev @Override public void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector eventCollector) throws Exception { + if (throwInProcessTsFileInsertionEventWithTemporaryOutOfMemory) { + parseTsFileWithShortTimeout(tsFileInsertionEvent); + } if (throwInProcessTsFileInsertionEvent) { throw new Exception("Throwing exception in process(TsFileInsertionEvent, EventCollector)"); } } + private void parseTsFileWithShortTimeout(final TsFileInsertionEvent tsFileInsertionEvent) + throws Exception { + try { + tsFileInsertionEvent + .getClass() + .getMethod("toTabletInsertionEvents", long.class) + .invoke(tsFileInsertionEvent, 1L); + } catch (final NoSuchMethodException e) { + tsFileInsertionEvent.toTabletInsertionEvents(); + } catch (final InvocationTargetException e) { + final Throwable cause = e.getCause(); + if (cause instanceof Exception) { + throw (Exception) cause; + } + if (cause instanceof Error) { + throw (Error) cause; + } + throw new RuntimeException(cause); + } + } + @Override public void process(Event event, EventCollector eventCollector) throws Exception { if (throwInProcessEvent) { From d837718fcca66e7ee9027b72461d71eb9edf49ff Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Fri, 3 Jul 2026 15:17:09 +0800 Subject: [PATCH 2/2] Expose pipe memory reject thresholds in IT config --- .../env/cluster/config/MppCommonConfig.java | 18 +++++++++++++++++ .../cluster/config/MppSharedCommonConfig.java | 20 +++++++++++++++++++ .../env/remote/config/RemoteCommonConfig.java | 12 +++++++++++ .../apache/iotdb/itbase/env/CommonConfig.java | 6 ++++++ 4 files changed, 56 insertions(+) diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java index 8109bf02c7ea4..78e47cfd69ff7 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java @@ -441,6 +441,24 @@ public CommonConfig setPipeMemoryManagementEnabled(boolean pipeMemoryManagementE return this; } + @Override + public CommonConfig setPipeDataStructureTabletMemoryBlockAllocationRejectThreshold( + double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold) { + setProperty( + "pipe_data_structure_tablet_memory_block_allocation_reject_threshold", + String.valueOf(pipeDataStructureTabletMemoryBlockAllocationRejectThreshold)); + return this; + } + + @Override + public CommonConfig setPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold( + double pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold) { + setProperty( + "pipe_data_structure_ts_file_memory_block_allocation_reject_threshold", + String.valueOf(pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold)); + return this; + } + @Override public CommonConfig setIsPipeEnableMemoryCheck(boolean isPipeEnableMemoryCheck) { setProperty("pipe_enable_memory_checked", String.valueOf(isPipeEnableMemoryCheck)); diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java index 17eaf87114405..ee78782e23a7a 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java @@ -451,6 +451,26 @@ public CommonConfig setPipeMemoryManagementEnabled(boolean pipeMemoryManagementE return this; } + @Override + public CommonConfig setPipeDataStructureTabletMemoryBlockAllocationRejectThreshold( + double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold) { + dnConfig.setPipeDataStructureTabletMemoryBlockAllocationRejectThreshold( + pipeDataStructureTabletMemoryBlockAllocationRejectThreshold); + cnConfig.setPipeDataStructureTabletMemoryBlockAllocationRejectThreshold( + pipeDataStructureTabletMemoryBlockAllocationRejectThreshold); + return this; + } + + @Override + public CommonConfig setPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold( + double pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold) { + dnConfig.setPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold( + pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold); + cnConfig.setPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold( + pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold); + return this; + } + @Override public CommonConfig setIsPipeEnableMemoryCheck(boolean isPipeEnableMemoryCheck) { dnConfig.setIsPipeEnableMemoryCheck(isPipeEnableMemoryCheck); diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java index 8fb1b0860bb83..f0d27c2a60a14 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java @@ -318,6 +318,18 @@ public CommonConfig setPipeMemoryManagementEnabled(boolean pipeMemoryManagementE return this; } + @Override + public CommonConfig setPipeDataStructureTabletMemoryBlockAllocationRejectThreshold( + double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold) { + return this; + } + + @Override + public CommonConfig setPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold( + double pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold) { + return this; + } + @Override public CommonConfig setIsPipeEnableMemoryCheck(boolean isPipeEnableMemoryCheck) { return this; diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java index cbdf6847ebb14..4f82c7ca7fdc7 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java @@ -142,6 +142,12 @@ CommonConfig setEnableAutoLeaderBalanceForIoTConsensus( CommonConfig setPipeMemoryManagementEnabled(boolean pipeMemoryManagementEnabled); + CommonConfig setPipeDataStructureTabletMemoryBlockAllocationRejectThreshold( + double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold); + + CommonConfig setPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold( + double pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold); + CommonConfig setIsPipeEnableMemoryCheck(boolean isPipeEnableMemoryCheck); CommonConfig setPipeAirGapReceiverEnabled(boolean isPipeAirGapReceiverEnabled);