Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -67,26 +67,22 @@
private static final PipeProcessorSubtaskExecutor PROCESSOR_EXECUTOR =
PipeSubtaskExecutorManager.getInstance().getProcessorExecutor();

protected final Map<String, String> systemParameters = new HashMap<>();

public PipeDataNodeTaskBuilder(
final PipeStaticMeta pipeStaticMeta, final int regionId, final PipeTaskMeta pipeTaskMeta) {
this.pipeStaticMeta = pipeStaticMeta;
this.regionId = regionId;
this.pipeTaskMeta = pipeTaskMeta;
generateSystemParameters();
}

public PipeDataNodeTask build() {
// Event flow: source -> processor -> sink

// Analyzes the PipeParameters to identify potential conflicts.
final PipeParameters sourceParameters =
blendUserAndSystemParameters(pipeStaticMeta.getSourceParameters());
blendUserAndSystemParameters(pipeStaticMeta.getSourceParameters(), pipeTaskMeta);
final PipeParameters sinkParameters =
blendUserAndSystemParameters(pipeStaticMeta.getSinkParameters());
checkConflict(sourceParameters, sinkParameters);
injectParameters(sourceParameters, sinkParameters);
blendUserAndSystemParameters(pipeStaticMeta.getSinkParameters(), pipeTaskMeta);
preprocessParameters(sourceParameters, sinkParameters);

// We first build the source and sink, then build the processor.
final PipeTaskSourceStage sourceStage =
Expand Down Expand Up @@ -125,7 +121,7 @@
new PipeTaskProcessorStage(
pipeStaticMeta.getPipeName(),
pipeStaticMeta.getCreationTime(),
blendUserAndSystemParameters(pipeStaticMeta.getProcessorParameters()),
blendUserAndSystemParameters(pipeStaticMeta.getProcessorParameters(), pipeTaskMeta),
regionId,
sourceStage.getEventSupplier(),
sinkStage.getPipeSinkPendingQueue(),
Expand All @@ -143,22 +139,25 @@
pipeStaticMeta.getPipeName(), regionId, sourceStage, processorStage, sinkStage);
}

private void generateSystemParameters() {
public static PipeParameters blendUserAndSystemParameters(
final PipeParameters userParameters, final PipeTaskMeta pipeTaskMeta) {
// Deep copy the user parameters to avoid modification of the original parameters.
// If the original parameters are modified, progress index report will be affected.
final Map<String, String> blendedParameters = new HashMap<>(userParameters.getAttribute());
if (!(pipeTaskMeta.getProgressIndex() instanceof MinimumProgressIndex)
|| pipeTaskMeta.isNewlyAdded()) {
systemParameters.put(SystemConstant.RESTART_OR_NEWLY_ADDED_KEY, Boolean.TRUE.toString());
blendedParameters.put(SystemConstant.RESTART_OR_NEWLY_ADDED_KEY, Boolean.TRUE.toString());
}
return new PipeParameters(blendedParameters);
}

private PipeParameters blendUserAndSystemParameters(final PipeParameters userParameters) {
// Deep copy the user parameters to avoid modification of the original parameters.
// If the original parameters are modified, progress index report will be affected.
final Map<String, String> blendedParameters = new HashMap<>(userParameters.getAttribute());
blendedParameters.putAll(systemParameters);
return new PipeParameters(blendedParameters);
public static void preprocessParameters(
final PipeParameters sourceParameters, final PipeParameters sinkParameters) {
checkConflict(sourceParameters, sinkParameters);
injectParameters(sourceParameters, sinkParameters);
}

private void checkConflict(
private static void checkConflict(

Check failure on line 160 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 19 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ8RTC2aB9efT_Y3g3Ej&open=AZ8RTC2aB9efT_Y3g3Ej&pullRequest=18049
final PipeParameters sourceParameters, final PipeParameters sinkParameters) {
final Pair<Boolean, Boolean> insertionDeletionListeningOptionPair;
final boolean shouldTerminatePipeOnAllHistoricalEventsConsumed;
Expand Down Expand Up @@ -228,7 +227,7 @@
}
}

private void injectParameters(
private static void injectParameters(
final PipeParameters sourceParameters, final PipeParameters sinkParameters) {
final boolean isSourceExternal =
!BuiltinPipePlugin.BUILTIN_SOURCES.contains(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,64 +69,33 @@
final Supplier<? extends PipeSinkSubtaskExecutor> executorSupplier,
final PipeParameters pipeSinkParameters,
final PipeTaskSinkRuntimeEnvironment environment) {
final String connectorName =
PipeSinkConstant.getConnectorOrSinkNameWithDefault(pipeSinkParameters);
final String connectorKey =
connectorName
// Convert the value of `CONNECTOR_KEY` or `SINK_KEY` to lowercase
// for matching in `CONNECTOR_CONSTRUCTORS`
.toLowerCase();
final String connectorKey = getConnectorKey(pipeSinkParameters);
PipeEventCommitManager.getInstance()
.register(
environment.getPipeName(),
environment.getCreationTime(),
environment.getRegionId(),
connectorKey);

final boolean isDataRegionSink =
StorageEngine.getInstance()
.getAllDataRegionIds()
.contains(new DataRegionId(environment.getRegionId()))
|| PipeRuntimeMeta.isSourceExternal(environment.getRegionId());

final int sinkNum;
final boolean isDataRegionSink = isDataRegionSink(environment.getRegionId());
final int sinkNum = calculateSinkSubtaskNum(pipeSinkParameters, isDataRegionSink, connectorKey);
boolean realTimeFirst = false;
boolean serializeByRegion = false;
String attributeSortedString = generateAttributeSortedString(pipeSinkParameters);
final String attributeSortedString =
generateAttributeSortedString(pipeSinkParameters, environment.getRegionId());
final String attributeDisplayString = generateAttributeDisplayString(pipeSinkParameters);
if (isDataRegionSink) {
serializeByRegion = PipeSinkConstant.isSerializeByRegionEnabled(pipeSinkParameters);
sinkNum =
serializeByRegion
? 1
: pipeSinkParameters.getIntOrDefault(
Arrays.asList(
PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY,
PipeSinkConstant.SINK_IOTDB_PARALLEL_TASKS_KEY),
PipeSinkConstant.SINGLE_THREAD_DEFAULT_SINK.contains(connectorKey)
? 1
: PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE);
realTimeFirst =
pipeSinkParameters.getBooleanOrDefault(
Arrays.asList(
PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY,
PipeSinkConstant.SINK_REALTIME_FIRST_KEY),
PipeSinkConstant.CONNECTOR_REALTIME_FIRST_DEFAULT_VALUE);
attributeSortedString =
serializeByRegion
? "data_region_" + environment.getRegionId() + "_" + attributeSortedString
: "data_" + attributeSortedString;
} else {
// Do not allow parallel tasks for schema region connectors
// to avoid the potential disorder of the schema region data transfer
sinkNum = 1;
attributeSortedString = "schema_" + attributeSortedString;
}
final String attributeDisplayStringWithPrefix =
isDataRegionSink
? serializeByRegion
? PipeSinkConstant.isSerializeByRegionEnabled(pipeSinkParameters)
? "data_region_" + environment.getRegionId() + "_" + attributeDisplayString
: "data_" + attributeDisplayString

Check warning on line 98 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Extract this nested ternary operation into an independent statement.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ8RTC2JB9efT_Y3g3Eg&open=AZ8RTC2JB9efT_Y3g3Eg&pullRequest=18049
: "schema_" + attributeDisplayString;
environment.setAttributeSortedString(attributeDisplayStringWithPrefix);

Expand Down Expand Up @@ -285,7 +254,60 @@
.getPendingQueue();
}

public synchronized boolean hasRegisteredSubtasks(
final PipeParameters pipeSinkParameters, final int regionId) {
return attributeSortedString2SubtaskLifeCycleMap.containsKey(
generateAttributeSortedString(pipeSinkParameters, regionId));
}

public static int calculateSinkSubtaskNum(
final PipeParameters pipeSinkParameters, final int regionId) {
final String connectorKey = getConnectorKey(pipeSinkParameters);
return calculateSinkSubtaskNum(pipeSinkParameters, isDataRegionSink(regionId), connectorKey);
}

public static String generateAttributeSortedString(
final PipeParameters pipeSinkParameters, final int regionId) {
final String attributeSortedString = generateAttributeSortedString(pipeSinkParameters);
if (isDataRegionSink(regionId)) {
return PipeSinkConstant.isSerializeByRegionEnabled(pipeSinkParameters)
? "data_region_" + regionId + "_" + attributeSortedString
: "data_" + attributeSortedString;
}
return "schema_" + attributeSortedString;
}

private static String getConnectorKey(final PipeParameters pipeSinkParameters) {
return PipeSinkConstant.getConnectorOrSinkNameWithDefault(pipeSinkParameters).toLowerCase();
}

private static boolean isDataRegionSink(final int regionId) {
return StorageEngine.getInstance().getAllDataRegionIds().contains(new DataRegionId(regionId))
|| PipeRuntimeMeta.isSourceExternal(regionId);
}

private static int calculateSinkSubtaskNum(

Check warning on line 289 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

All overloaded methods should be placed next to each other. Placing non-overloaded methods in between overloaded methods with the same type is a violation. Previous overloaded method located at line '263'.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ8RTC2JB9efT_Y3g3Eh&open=AZ8RTC2JB9efT_Y3g3Eh&pullRequest=18049
final PipeParameters pipeSinkParameters,
final boolean isDataRegionSink,
final String connectorKey) {
if (!isDataRegionSink) {
// Do not allow parallel tasks for schema region connectors to avoid the potential disorder of
// the schema region data transfer.
return 1;
}
if (PipeSinkConstant.isSerializeByRegionEnabled(pipeSinkParameters)) {
return 1;
}
return pipeSinkParameters.getIntOrDefault(
Arrays.asList(
PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY,
PipeSinkConstant.SINK_IOTDB_PARALLEL_TASKS_KEY),
PipeSinkConstant.SINGLE_THREAD_DEFAULT_SINK.contains(connectorKey)
? 1
: PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE);
}

private static String generateAttributeSortedString(

Check warning on line 310 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

All overloaded methods should be placed next to each other. Placing non-overloaded methods in between overloaded methods with the same type is a violation. Previous overloaded method located at line '269'.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ8RTC2JB9efT_Y3g3Ei&open=AZ8RTC2JB9efT_Y3g3Ei&pullRequest=18049
final PipeParameters pipeConnectorParameters) {
final TreeMap<String, String> sortedStringSourceMap =
new TreeMap<>(pipeConnectorParameters.getAttribute());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.pipe.agent.task;

import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.exception.PipeException;

import org.junit.Assert;
import org.junit.Test;

import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;

public class PipeDataNodeTaskAgentTest {

@Test
public void testCreateMemoryCheckStillRunsWhenNoPipeTasksNeedToBeCreated() throws Exception {
final boolean originalPipeEnableMemoryCheck =
CommonDescriptor.getInstance().getConfig().isPipeEnableMemoryChecked();
final long originalPipeInsertNodeQueueMemory =
CommonDescriptor.getInstance().getConfig().getPipeInsertNodeQueueMemory();
final double originalPipeTotalFloatingMemoryProportion =
CommonDescriptor.getInstance().getConfig().getPipeTotalFloatingMemoryProportion();

try {
CommonDescriptor.getInstance().getConfig().setIsPipeEnableMemoryChecked(true);
CommonDescriptor.getInstance().getConfig().setPipeInsertNodeQueueMemory(1);
CommonDescriptor.getInstance().getConfig().setPipeTotalFloatingMemoryProportion(0);

Assert.assertThrows(
PipeException.class,
() ->
PipeDataNodeAgent.task()
.calculateMemoryUsage(
new PipeMeta(
new PipeStaticMeta(
"p", 1L, new HashMap<>(), new HashMap<>(), new HashMap<>()),
new PipeRuntimeMeta())));
} finally {
CommonDescriptor.getInstance()
.getConfig()
.setIsPipeEnableMemoryChecked(originalPipeEnableMemoryCheck);
CommonDescriptor.getInstance()
.getConfig()
.setPipeInsertNodeQueueMemory(originalPipeInsertNodeQueueMemory);
CommonDescriptor.getInstance()
.getConfig()
.setPipeTotalFloatingMemoryProportion(originalPipeTotalFloatingMemoryProportion);
}
}

@Test
public void testPlainBatchMemoryIncludesLeaderCacheEndpointShards() throws Exception {
final Map<String, String> sinkAttributes = new HashMap<>();
sinkAttributes.put(
PipeSinkConstant.CONNECTOR_FORMAT_KEY, PipeSinkConstant.CONNECTOR_FORMAT_TABLET_VALUE);
sinkAttributes.put(PipeSinkConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY, "1024");
sinkAttributes.put(
PipeSinkConstant.CONNECTOR_IOTDB_NODE_URLS_KEY, "127.0.0.1:6667, 127.0.0.2:6667");
sinkAttributes.put(PipeSinkConstant.CONNECTOR_IOTDB_IP_KEY, "127.0.0.3");
sinkAttributes.put(PipeSinkConstant.CONNECTOR_IOTDB_PORT_KEY, "6667");

Assert.assertEquals(
4 * 1024L, invokeCalculateSinkBatchMemory(new PipeParameters(sinkAttributes)));

sinkAttributes.put(
PipeSinkConstant.CONNECTOR_LEADER_CACHE_ENABLE_KEY, Boolean.FALSE.toString());
Assert.assertEquals(1024L, invokeCalculateSinkBatchMemory(new PipeParameters(sinkAttributes)));
}

@Test
public void testTsFileBatchMemoryIgnoresLeaderCacheEndpointShards() throws Exception {
final Map<String, String> sinkAttributes = new HashMap<>();
sinkAttributes.put(
PipeSinkConstant.CONNECTOR_FORMAT_KEY, PipeSinkConstant.CONNECTOR_FORMAT_TS_FILE_VALUE);
sinkAttributes.put(PipeSinkConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY, "2048");
sinkAttributes.put(
PipeSinkConstant.CONNECTOR_IOTDB_NODE_URLS_KEY, "127.0.0.1:6667,127.0.0.2:6667");

Assert.assertEquals(2048L, invokeCalculateSinkBatchMemory(new PipeParameters(sinkAttributes)));
}

@Test
public void testPlainBatchMemoryReturnsZeroWhenBatchModeIsDisabled() throws Exception {
final Map<String, String> sinkAttributes = new HashMap<>();
sinkAttributes.put(
PipeSinkConstant.CONNECTOR_FORMAT_KEY, PipeSinkConstant.CONNECTOR_FORMAT_TABLET_VALUE);
sinkAttributes.put(
PipeSinkConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY, Boolean.FALSE.toString());
sinkAttributes.put(PipeSinkConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY, "1024");

Assert.assertEquals(0L, invokeCalculateSinkBatchMemory(new PipeParameters(sinkAttributes)));
}

@Test
public void testSendTsFileReadBufferMemoryUsesSinkReadFileBufferSize() throws Exception {
final Map<String, String> sourceAttributes = new HashMap<>();
sourceAttributes.put(PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY, Boolean.FALSE.toString());

final Map<String, String> sinkAttributes = new HashMap<>();
sinkAttributes.put(
PipeSinkConstant.CONNECTOR_FORMAT_KEY, PipeSinkConstant.CONNECTOR_FORMAT_TABLET_VALUE);
Assert.assertEquals(
0L,
invokeCalculateSendTsFileReadBufferMemory(
new PipeParameters(sourceAttributes), new PipeParameters(sinkAttributes)));

sinkAttributes.put(
PipeSinkConstant.CONNECTOR_FORMAT_KEY, PipeSinkConstant.CONNECTOR_FORMAT_HYBRID_VALUE);
Assert.assertEquals(
PipeConfig.getInstance().getPipeSinkReadFileBufferSize(),
invokeCalculateSendTsFileReadBufferMemory(
new PipeParameters(sourceAttributes), new PipeParameters(sinkAttributes)));
}

private long invokeCalculateSinkBatchMemory(final PipeParameters sinkParameters)
throws Exception {
final Method method =
PipeDataNodeTaskAgent.class.getDeclaredMethod(
"calculateSinkBatchMemory", PipeParameters.class);
method.setAccessible(true);
return (long) method.invoke(null, sinkParameters);
}

private long invokeCalculateSendTsFileReadBufferMemory(
final PipeParameters sourceParameters, final PipeParameters sinkParameters) throws Exception {
final Method method =
PipeDataNodeTaskAgent.class.getDeclaredMethod(
"calculateSendTsFileReadBufferMemory", PipeParameters.class, PipeParameters.class);
method.setAccessible(true);
return (long) method.invoke(null, sourceParameters, sinkParameters);
}
}
Loading
Loading