Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,24 @@ public CommonConfig setPipeMemoryManagementEnabled(boolean pipeMemoryManagementE
return this;
}

@Override
public CommonConfig setPipeDataStructureTabletMemoryBlockAllocationRejectThreshold(
double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold) {
setProperty(
"pipe_data_structure_tablet_memory_block_allocation_reject_threshold",
String.valueOf(pipeDataStructureTabletMemoryBlockAllocationRejectThreshold));
return this;
}

@Override
public CommonConfig setPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold(
double pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold) {
setProperty(
"pipe_data_structure_ts_file_memory_block_allocation_reject_threshold",
String.valueOf(pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold));
return this;
}

@Override
public CommonConfig setIsPipeEnableMemoryCheck(boolean isPipeEnableMemoryCheck) {
setProperty("pipe_enable_memory_checked", String.valueOf(isPipeEnableMemoryCheck));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,26 @@ public CommonConfig setPipeMemoryManagementEnabled(boolean pipeMemoryManagementE
return this;
}

@Override
public CommonConfig setPipeDataStructureTabletMemoryBlockAllocationRejectThreshold(
double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold) {
dnConfig.setPipeDataStructureTabletMemoryBlockAllocationRejectThreshold(
pipeDataStructureTabletMemoryBlockAllocationRejectThreshold);
cnConfig.setPipeDataStructureTabletMemoryBlockAllocationRejectThreshold(
pipeDataStructureTabletMemoryBlockAllocationRejectThreshold);
return this;
}

@Override
public CommonConfig setPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold(
double pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold) {
dnConfig.setPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold(
pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold);
cnConfig.setPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold(
pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold);
return this;
}

@Override
public CommonConfig setIsPipeEnableMemoryCheck(boolean isPipeEnableMemoryCheck) {
dnConfig.setIsPipeEnableMemoryCheck(isPipeEnableMemoryCheck);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,18 @@ public CommonConfig setPipeMemoryManagementEnabled(boolean pipeMemoryManagementE
return this;
}

@Override
public CommonConfig setPipeDataStructureTabletMemoryBlockAllocationRejectThreshold(
double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold) {
return this;
}

@Override
public CommonConfig setPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold(
double pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold) {
return this;
}

@Override
public CommonConfig setIsPipeEnableMemoryCheck(boolean isPipeEnableMemoryCheck) {
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,12 @@ CommonConfig setEnableAutoLeaderBalanceForIoTConsensus(

CommonConfig setPipeMemoryManagementEnabled(boolean pipeMemoryManagementEnabled);

CommonConfig setPipeDataStructureTabletMemoryBlockAllocationRejectThreshold(
double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold);

CommonConfig setPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold(
double pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold);

CommonConfig setIsPipeEnableMemoryCheck(boolean isPipeEnableMemoryCheck);

CommonConfig setPipeAirGapReceiverEnabled(boolean isPipeAirGapReceiverEnabled);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.pipe.it.autocreate;

import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema;
import org.apache.iotdb.rpc.TSStatusCode;

import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2AutoCreateSchema.class})
public class IoTDBPipeTsFileParseOOMIT extends AbstractPipeDualAutoIT {

@Override
protected void setupConfig() {
super.setupConfig();
senderEnv.getConfig().getCommonConfig().setPipeMemoryManagementEnabled(true);
senderEnv
.getConfig()
.getCommonConfig()
.setPipeDataStructureTabletMemoryBlockAllocationRejectThreshold(0);
senderEnv
.getConfig()
.getCommonConfig()
.setPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold(0);
}

@Test
public void testPipeKeepsRunningWhenTsFileProcessingTemporarilyOutOfMemory() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

final String receiverIp = receiverDataNode.getIp();
final int receiverPort = receiverDataNode.getPort();

TestUtils.executeNonQueries(
senderEnv,
Arrays.asList("insert into root.temporary_oom.d0(time,s1) values (0,1)", "flush"),
null);

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
final Map<String, String> extractorAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("extractor.pattern", "root.temporary_oom");
extractorAttributes.put("extractor.realtime.enable", "false");

processorAttributes.put("processor", "throwing-exception-processor");
processorAttributes.put("stages", "process-tsfile-insertion-event-with-temporary-oom");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port", Integer.toString(receiverPort));

final TSStatus status =
client.createPipe(
new TCreatePipeReq("temporary_oom_pipe", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
.setProcessorAttributes(processorAttributes));

Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());

assertPipeRunningFor(client, "temporary_oom_pipe", 35_000L);
}
}

private void assertPipeRunningFor(
final SyncConfigNodeIServiceClient client, final String pipeName, final long durationMs)
throws Exception {
final long startTime = System.currentTimeMillis();
while (System.currentTimeMillis() - startTime < durationMs) {
final List<TShowPipeInfo> showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
final TShowPipeInfo pipeInfo =
showPipeResult.stream().filter(info -> info.id.equals(pipeName)).findFirst().orElse(null);
Assert.assertNotNull(pipeInfo);
Assert.assertEquals("RUNNING", pipeInfo.state);
Thread.sleep(1000);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ public Iterable<TabletInsertionEvent> toTabletInsertionEvents(final long timeout
} else {
PipeLogger.log(LOGGER::warn, e, errorMsg);
}
throw new PipeException(errorMsg);
throw new PipeException(errorMsg, e);
}
}

Expand Down Expand Up @@ -630,7 +630,7 @@ private TsFileInsertionDataContainer initDataContainer() {

final String errorMsg = String.format("Read TsFile %s error.", tsFile.getPath());
LOGGER.warn(errorMsg, e);
throw new PipeException(errorMsg);
throw new PipeException(errorMsg, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,30 @@

package org.apache.iotdb.db.pipe.agent.task;

import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
import org.apache.iotdb.commons.pipe.agent.task.connection.EventSupplier;
import org.apache.iotdb.db.pipe.agent.task.connection.PipeEventCollector;
import org.apache.iotdb.db.pipe.agent.task.execution.PipeProcessorSubtaskExecutor;
import org.apache.iotdb.db.pipe.agent.task.subtask.processor.PipeProcessorSubtask;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.pipe.api.PipeProcessor;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

import java.io.File;

import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class PipeProcessorSubtaskExecutorTest extends PipeSubtaskExecutorTest {

Expand All @@ -47,4 +61,89 @@ public void setUp() throws Exception {
mock(PipeProcessor.class),
mock(PipeEventCollector.class)));
}

@Test
public void testTsFileInsertionEventPreservesOutOfMemoryCause() {
final PipeMemoryManager memoryManager = PipeDataNodeResourceManager.memory();
PipeMemoryBlock memoryBlock = null;

try {
memoryBlock =
memoryManager.forceAllocateForTabletWithRetry(
PipeMemoryManager.getTotalNonFloatingMemorySizeInBytes());
Assert.assertFalse(memoryManager.isEnough4TabletParsing());

final File tsFile =
new File("target/testTsFileInsertionEventPreservesOutOfMemoryCause.tsfile");
final TsFileResource resource = mock(TsFileResource.class);
when(resource.isClosed()).thenReturn(true);
when(resource.isEmpty()).thenReturn(false);
when(resource.isGeneratedByPipe()).thenReturn(false);
when(resource.isGeneratedByPipeConsensus()).thenReturn(false);
when(resource.getTsFilePath()).thenReturn(tsFile.getPath());

final PipeTsFileInsertionEvent event =
new PipeTsFileInsertionEvent(
resource, tsFile, false, false, false, "testPipe", 0, null, null, 0, 1);

final PipeException exception =
Assert.assertThrows(PipeException.class, () -> event.toTabletInsertionEvents(1));
Assert.assertTrue(exception.getCause() instanceof PipeRuntimeOutOfMemoryCriticalException);
} finally {
memoryManager.release(memoryBlock);
}
}

@Test
public void testProcessorSubtaskTreatsOutOfMemoryCauseAsTemporaryFailure() throws Exception {
final EventSupplier eventSupplier = mock(EventSupplier.class);
final PipeProcessor pipeProcessor = mock(PipeProcessor.class);
final PipeEventCollector pipeEventCollector = mock(PipeEventCollector.class);
final TsFileInsertionEvent tsFileInsertionEvent = mock(TsFileInsertionEvent.class);
when(eventSupplier.supply()).thenReturn(tsFileInsertionEvent);
doThrow(
new PipeException(
"Parse TsFile error",
new PipeRuntimeOutOfMemoryCriticalException(
"TimeoutException: Waited 22.016 seconds for memory to parse TsFile")))
.when(pipeProcessor)
.process(tsFileInsertionEvent, pipeEventCollector);

final TestablePipeProcessorSubtask pipeProcessorSubtask =
new TestablePipeProcessorSubtask(
"PipeProcessorSubtaskExecutorTest",
"TestPipe",
System.currentTimeMillis(),
0,
eventSupplier,
pipeProcessor,
pipeEventCollector);

Assert.assertFalse(pipeProcessorSubtask.executeOnceForTest());
}

private static class TestablePipeProcessorSubtask extends PipeProcessorSubtask {

private TestablePipeProcessorSubtask(
final String taskID,
final String pipeName,
final long creationTime,
final int regionId,
final EventSupplier inputEventSupplier,
final PipeProcessor pipeProcessor,
final PipeEventCollector outputEventCollector) {
super(
taskID,
pipeName,
creationTime,
regionId,
inputEventSupplier,
pipeProcessor,
outputEventCollector);
}

private boolean executeOnceForTest() throws Exception {
return executeOnce();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;

import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -37,6 +38,7 @@ public class ThrowingExceptionProcessor implements PipeProcessor {
private boolean throwInCustomize = false;
private boolean throwInProcessTabletInsertionEvent = false;
private boolean throwInProcessTsFileInsertionEvent = false;
private boolean throwInProcessTsFileInsertionEventWithTemporaryOutOfMemory = false;
private boolean throwInProcessEvent = false;
private boolean throwInClose = false;

Expand All @@ -55,6 +57,8 @@ public void validate(PipeParameterValidator validator) throws Exception {
throwInCustomize = throwingStages.contains("customize");
throwInProcessTabletInsertionEvent = throwingStages.contains("process-tablet-insertion-event");
throwInProcessTsFileInsertionEvent = throwingStages.contains("process-tsfile-insertion-event");
throwInProcessTsFileInsertionEventWithTemporaryOutOfMemory =
throwingStages.contains("process-tsfile-insertion-event-with-temporary-oom");
throwInProcessEvent = throwingStages.contains("process-event");
throwInClose = throwingStages.contains("close");
}
Expand All @@ -78,11 +82,35 @@ public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector ev
@Override
public void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector eventCollector)
throws Exception {
if (throwInProcessTsFileInsertionEventWithTemporaryOutOfMemory) {
parseTsFileWithShortTimeout(tsFileInsertionEvent);
}
if (throwInProcessTsFileInsertionEvent) {
throw new Exception("Throwing exception in process(TsFileInsertionEvent, EventCollector)");
}
}

private void parseTsFileWithShortTimeout(final TsFileInsertionEvent tsFileInsertionEvent)
throws Exception {
try {
tsFileInsertionEvent
.getClass()
.getMethod("toTabletInsertionEvents", long.class)
.invoke(tsFileInsertionEvent, 1L);
} catch (final NoSuchMethodException e) {
tsFileInsertionEvent.toTabletInsertionEvents();
} catch (final InvocationTargetException e) {
final Throwable cause = e.getCause();
if (cause instanceof Exception) {
throw (Exception) cause;
}
if (cause instanceof Error) {
throw (Error) cause;
}
throw new RuntimeException(cause);
}
}

@Override
public void process(Event event, EventCollector eventCollector) throws Exception {
if (throwInProcessEvent) {
Expand Down
Loading