From dca8085428c213a394150c3e8530fd8877ce9424 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 25 Jun 2026 15:49:05 +0800 Subject: [PATCH 1/3] Fix pipe history LoadTsFile schema retry --- ...IoTDBPipeReceiverAutoCreateDisabledIT.java | 64 ++++++++++++++++++- .../visitor/PipeStatementTSStatusVisitor.java | 1 + .../plan/analyze/load/LoadTsFileAnalyzer.java | 52 +++++++++++++++ .../PipeStatementTsStatusVisitorTest.java | 13 ++++ .../analyze/load/LoadTsFileAnalyzerTest.java | 34 ++++++++++ 5 files changed, 162 insertions(+), 2 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java index 158dd552c201d..58aced70ed392 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java @@ -36,6 +36,7 @@ import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; +import java.util.Arrays; import java.util.HashSet; import java.util.Set; @@ -61,14 +62,16 @@ protected void setupConfig() { .getConfig() .getCommonConfig() .setDataReplicationFactor(1) - .setSchemaReplicationFactor(1); + .setSchemaReplicationFactor(1) + .setPipeAutoSplitFullEnabled(true); receiverEnv .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(false) .setDatanodeMemoryProportion("3:3:1:1:1:0") .setDataReplicationFactor(1) - .setSchemaReplicationFactor(1); + .setSchemaReplicationFactor(1) + .setPipeAutoSplitFullEnabled(true); } @Test @@ -122,6 +125,63 @@ public void testReceiverAutoCreateSchemaDisabledWithSpecialTimeSeries() throws E } } + @Test + public void testAutoSplitHistoryTsFileWithDeletionWhenReceiverAutoCreateSchemaDisabled() + throws Exception { + TestUtils.executeNonQueries( + senderEnv, + Arrays.asList( + "create database root.sg", + "create timeseries root.sg.non_aligned.s1 with datatype=INT32", + "create timeseries root.sg.non_aligned.s2 with datatype=DOUBLE", + "create aligned timeseries root.sg.aligned(s1 INT32, s2 DOUBLE)", + "create timeseries root.sg.deleted_measurement.s1 with datatype=INT32", + "create timeseries root.sg.deleted_measurement.s2 with datatype=DOUBLE", + "insert into root.sg.non_aligned(time, s1, s2) values(1, 1, 1.0), (2, 2, 2.0), (3, 3, 3.0)", + "insert into root.sg.aligned(time, s1, s2) values(1, 10, 10.0), (2, 20, 20.0), (3, 30, 30.0)", + "insert into root.sg.deleted_measurement(time, s1, s2) values(1, 100, 100.0), (2, 200, 200.0)", + "flush", + "delete from root.sg.non_aligned.s1 where time > 2", + "delete from root.sg.aligned.* where time > 2", + "delete timeseries root.sg.deleted_measurement.s1", + "flush")); + + awaitUntilFlush(senderEnv); + + TestUtils.executeNonQuery( + senderEnv, + String.format( + "create pipe test with source ('inclusion'='all', 'source.history.enable'='true', 'source.realtime.mode'='batch') " + + "with sink ('sink'='iotdb-thrift-sink', 'sink.node-urls'='%s')", + receiverEnv.getDataNodeWrapper(0).getIpAndPortString())); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "select * from root.sg.non_aligned", + "Time,root.sg.non_aligned.s1,root.sg.non_aligned.s2,", + new HashSet<>(Arrays.asList("1,1,1.0,", "2,2,2.0,", "3,null,3.0,"))); + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "select * from root.sg.aligned", + "Time,root.sg.aligned.s1,root.sg.aligned.s2,", + new HashSet<>(Arrays.asList("1,10,10.0,", "2,20,20.0,"))); + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "select * from root.sg.deleted_measurement", + "Time,root.sg.deleted_measurement.s2,", + new HashSet<>(Arrays.asList("1,100.0,", "2,200.0,"))); + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "count timeseries root.sg.deleted_measurement.*", + "count(timeseries),", + new HashSet<>(Arrays.asList("1,"))); + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "show devices root.sg.aligned", + "Device,IsAligned,Template,TTL(ms),", + new HashSet<>(Arrays.asList("root.sg.aligned,true,null,INF,"))); + } + private QueryResult queryForResult(final Statement statement, final String sql) throws SQLException { try (final ResultSet resultSet = statement.executeQuery(sql)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java index ff2805b2e7dee..c7da4650f338a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java @@ -74,6 +74,7 @@ public TSStatus visitNode(final StatementNode node, final TSStatus status) { public TSStatus visitLoadFile( final LoadTsFileStatement loadTsFileStatement, final TSStatus status) { if (status.getCode() == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode() + || status.getCode() == TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode() || status.getCode() == TSStatusCode.LOAD_FILE_ERROR.getStatusCode() && status.getMessage() != null && status.getMessage().contains("memory")) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java index e357b86e0c736..2652735ce45e1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java @@ -83,6 +83,11 @@ public class LoadTsFileAnalyzer implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(LoadTsFileAnalyzer.class); + private static final String MISSING_SCHEMA_MESSAGE = + "does not exist in IoTDB and can not be created"; + private static final String AUTO_CREATE_SCHEMA_HINT_MESSAGE = + "Please check weather auto-create-schema is enabled"; + private static final LoadTsFileCostMetricsSet LOAD_TSFILE_COST_METRICS_SET = LoadTsFileCostMetricsSet.getInstance(); @@ -222,6 +227,10 @@ public IAnalysis analyzeFileByFile(IAnalysis analysis) { executeTabletConversionOnException(analysis, e); return analysis; } catch (Exception e) { + if (isTemporaryUnavailableDueToPipeSchemaNotReady(e)) { + setFailAnalysisForTemporaryUnavailablePipeSchema(analysis, e); + return analysis; + } final String exceptionMessage = String.format( "Auto create or verify schema error when executing statement %s. Detail: %s.", @@ -346,6 +355,10 @@ private boolean doAnalyzeFileByFile(IAnalysis analysis) { "The file %s is not a valid tsfile. Please check the input file.", tsFile.getPath())); } catch (Exception e) { + if (isTemporaryUnavailableDueToPipeSchemaNotReady(e)) { + setFailAnalysisForTemporaryUnavailablePipeSchema(analysis, e); + return false; + } final String exceptionMessage = String.format( "Loading file %s failed. Detail: %s", @@ -681,8 +694,27 @@ private void setFailAnalysisForAuthException(IAnalysis analysis, AuthException e analysis.setFailStatus(RpcUtils.getStatus(e.getCode(), e.getMessage())); } + private void setFailAnalysisForTemporaryUnavailablePipeSchema( + final IAnalysis analysis, final Throwable throwable) { + final String exceptionMessage = + String.format( + "Pipe generated LoadTsFile is waiting for schema metadata to be transferred. Detail: %s", + throwable.getMessage() == null + ? throwable.getClass().getName() + : throwable.getMessage()); + analysis.setFinishQueryAfterAnalyze(true); + analysis.setFailStatus( + RpcUtils.getStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION, exceptionMessage)); + setRealStatement(analysis); + } + private void executeTabletConversionOnException( final IAnalysis analysis, final LoadAnalyzeException e) { + if (isTemporaryUnavailableDueToPipeSchemaNotReady(e)) { + setFailAnalysisForTemporaryUnavailablePipeSchema(analysis, e); + return; + } + if (shouldSkipConversion(e)) { analysis.setFailStatus( new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage())); @@ -764,6 +796,26 @@ private void executeTabletConversionOnException( setRealStatement(analysis); } + boolean isTemporaryUnavailableDueToPipeSchemaNotReady(final Throwable throwable) { + if (!isGeneratedByPipe + || !isVerifySchema + || IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()) { + return false; + } + + Throwable current = throwable; + while (current != null) { + final String message = current.getMessage(); + if (message != null + && message.contains(MISSING_SCHEMA_MESSAGE) + && message.contains(AUTO_CREATE_SCHEMA_HINT_MESSAGE)) { + return true; + } + current = current.getCause(); + } + return false; + } + private boolean shouldSkipConversion(LoadAnalyzeException e) { return (e instanceof LoadAnalyzeTypeMismatchException) && !isConvertOnTypeMismatch; } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java index 756d11818251f..ab3e0eda54324 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.utils.StatusUtils; import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiver; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.template.BatchActivateTemplateStatement; import org.apache.iotdb.rpc.TSStatusCode; @@ -64,6 +65,18 @@ StatusUtils.OK, new TSStatus(TSStatusCode.OUT_OF_TTL.getStatusCode())))) .getCode()); } + @Test + public void testLoadTemporaryUnavailableClassification() throws Exception { + Assert.assertEquals( + TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode(), + IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR + .process( + LoadTsFileStatement.createUnchecked("temporary-unavailable.tsfile"), + new TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) + .setMessage("schema is not ready")) + .getCode()); + } + @Test public void testDatabaseNotExistRuntimeExceptionClassification() { Assert.assertEquals( diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java index c533b0043e904..1bc6744766970 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java @@ -19,10 +19,12 @@ package org.apache.iotdb.db.queryengine.plan.analyze.load; +import org.apache.iotdb.commons.exception.SemanticException; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.queryengine.plan.relational.metadata.ColumnSchema; import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.exception.load.LoadAnalyzeException; import org.apache.iotdb.db.exception.load.LoadAnalyzeTypeMismatchException; import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; @@ -198,6 +200,38 @@ public void testTreeSchemaVerifierShouldThrowMismatchWhenVerifyingDataType() thr } } + @Test + public void testPipeGeneratedLoadMissingSchemaShouldBeTemporaryWhenAutoCreateDisabled() + throws Exception { + final boolean originalAutoCreateSchemaEnabled = + IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled(); + IoTDBDescriptor.getInstance().getConfig().setAutoCreateSchemaEnabled(false); + try (final LoadTsFileAnalyzer analyzer = + new LoadTsFileAnalyzer( + LoadTsFileStatement.createUnchecked("missing-schema.tsfile"), + true, + new MPPQueryContext(new QueryId("load_pipe_test")))) { + Assert.assertTrue( + analyzer.isTemporaryUnavailableDueToPipeSchemaNotReady( + new LoadAnalyzeException( + "Device root.sg.d1 does not exist in IoTDB and can not be created. " + + "Please check weather auto-create-schema is enabled."))); + Assert.assertTrue( + analyzer.isTemporaryUnavailableDueToPipeSchemaNotReady( + new SemanticException( + "Auto create or verify schema error. Detail: Measurement root.sg.d1.s1 " + + "does not exist in IoTDB and can not be created. " + + "Please check weather auto-create-schema is enabled."))); + Assert.assertFalse( + analyzer.isTemporaryUnavailableDueToPipeSchemaNotReady( + new LoadAnalyzeException("Data type mismatch for measurement root.sg.d1.s1"))); + } finally { + IoTDBDescriptor.getInstance() + .getConfig() + .setAutoCreateSchemaEnabled(originalAutoCreateSchemaEnabled); + } + } + private void writeTableTsFileWithMixedDevices(final File tsFile) throws Exception { if (tsFile.exists()) { Assert.assertTrue(tsFile.delete()); From 2b62bd984db74b89b292f49a9a98d1eb4d3dcf22 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Fri, 26 Jun 2026 10:04:40 +0800 Subject: [PATCH 2/3] Address load tsfile schema retry review --- .../iotdb/db/i18n/DataNodeQueryMessages.java | 6 +++ .../iotdb/db/i18n/DataNodeQueryMessages.java | 6 +++ .../LoadAnalyzeMissingSchemaException.java | 27 ++++++++++++++ .../visitor/PipeStatementTSStatusVisitor.java | 5 +-- .../plan/analyze/load/LoadTsFileAnalyzer.java | 37 +++++++++++-------- .../TreeSchemaAutoCreatorAndVerifier.java | 22 ++++++----- .../PipeStatementTsStatusVisitorTest.java | 12 ++++++ .../analyze/load/LoadTsFileAnalyzerTest.java | 12 ++---- 8 files changed, 90 insertions(+), 37 deletions(-) create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/load/LoadAnalyzeMissingSchemaException.java diff --git a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java index f870290c09a82..96b138a9fe138 100644 --- a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java +++ b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java @@ -314,6 +314,12 @@ public final class DataNodeQueryMessages { "Empty file detected, will skip loading this file: {}"; public static final String AUTO_CREATE_OR_VERIFY_SCHEMA_ERROR = "Auto create or verify schema error."; + public static final String LOAD_TSFILE_DEVICE_SCHEMA_MISSING_AUTO_CREATE_DISABLED = + "Device %s does not exist in IoTDB and can not be created. Please check whether auto-create-schema is enabled."; + public static final String LOAD_TSFILE_MEASUREMENT_SCHEMA_MISSING_AUTO_CREATE_DISABLED = + "Measurement %s does not exist in IoTDB and can not be created. Please check whether auto-create-schema is enabled."; + public static final String PIPE_GENERATED_LOAD_TSFILE_WAITING_FOR_SCHEMA_METADATA = + "Pipe generated LoadTsFile is waiting for schema metadata to be transferred. Detail: %s"; public static final String FAILED_TO_FIND_TAG_COLUMN_MAPPING_FOR_TABLE = "Failed to find tag column mapping for table {}"; public static final String AUTO_CREATE_DATABASE_FAILED_BECAUSE = diff --git a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java index b0dae59e19861..aa2436c442f71 100644 --- a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java +++ b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java @@ -312,6 +312,12 @@ public final class DataNodeQueryMessages { "检测到空文件,将跳过加载此文件:{}"; public static final String AUTO_CREATE_OR_VERIFY_SCHEMA_ERROR = "自动创建或验证 schema 出错。"; + public static final String LOAD_TSFILE_DEVICE_SCHEMA_MISSING_AUTO_CREATE_DISABLED = + "设备 %s 在 IoTDB 中不存在且无法被创建。请检查是否启用了 auto-create-schema。"; + public static final String LOAD_TSFILE_MEASUREMENT_SCHEMA_MISSING_AUTO_CREATE_DISABLED = + "时间序列 %s 在 IoTDB 中不存在且无法被创建。请检查是否启用了 auto-create-schema。"; + public static final String PIPE_GENERATED_LOAD_TSFILE_WAITING_FOR_SCHEMA_METADATA = + "Pipe 生成的 LoadTsFile 正在等待 schema 元数据传输完成。详情:%s"; public static final String FAILED_TO_FIND_TAG_COLUMN_MAPPING_FOR_TABLE = "未找到表 {} 的标签列映射"; public static final String AUTO_CREATE_DATABASE_FAILED_BECAUSE = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/load/LoadAnalyzeMissingSchemaException.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/load/LoadAnalyzeMissingSchemaException.java new file mode 100644 index 0000000000000..9f5e129d56f45 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/load/LoadAnalyzeMissingSchemaException.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.exception.load; + +public class LoadAnalyzeMissingSchemaException extends LoadAnalyzeException { + + public LoadAnalyzeMissingSchemaException(final String message) { + super(message); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java index c7da4650f338a..df316b79367e4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java @@ -74,10 +74,7 @@ public TSStatus visitNode(final StatementNode node, final TSStatus status) { public TSStatus visitLoadFile( final LoadTsFileStatement loadTsFileStatement, final TSStatus status) { if (status.getCode() == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode() - || status.getCode() == TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode() - || status.getCode() == TSStatusCode.LOAD_FILE_ERROR.getStatusCode() - && status.getMessage() != null - && status.getMessage().contains("memory")) { + || status.getCode() == TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) { return new TSStatus( TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) .setMessage(status.getMessage()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java index 2652735ce45e1..b1bdd077dc48d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java @@ -28,6 +28,7 @@ import org.apache.iotdb.commons.queryengine.utils.TimestampPrecisionUtils; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.load.LoadAnalyzeException; +import org.apache.iotdb.db.exception.load.LoadAnalyzeMissingSchemaException; import org.apache.iotdb.db.exception.load.LoadAnalyzeTypeMismatchException; import org.apache.iotdb.db.exception.load.LoadEmptyFileException; import org.apache.iotdb.db.i18n.DataNodeQueryMessages; @@ -83,11 +84,6 @@ public class LoadTsFileAnalyzer implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(LoadTsFileAnalyzer.class); - private static final String MISSING_SCHEMA_MESSAGE = - "does not exist in IoTDB and can not be created"; - private static final String AUTO_CREATE_SCHEMA_HINT_MESSAGE = - "Please check weather auto-create-schema is enabled"; - private static final LoadTsFileCostMetricsSet LOAD_TSFILE_COST_METRICS_SET = LoadTsFileCostMetricsSet.getInstance(); @@ -227,8 +223,7 @@ public IAnalysis analyzeFileByFile(IAnalysis analysis) { executeTabletConversionOnException(analysis, e); return analysis; } catch (Exception e) { - if (isTemporaryUnavailableDueToPipeSchemaNotReady(e)) { - setFailAnalysisForTemporaryUnavailablePipeSchema(analysis, e); + if (setTemporaryUnavailableStatusIfNecessary(analysis, e)) { return analysis; } final String exceptionMessage = @@ -355,8 +350,7 @@ private boolean doAnalyzeFileByFile(IAnalysis analysis) { "The file %s is not a valid tsfile. Please check the input file.", tsFile.getPath())); } catch (Exception e) { - if (isTemporaryUnavailableDueToPipeSchemaNotReady(e)) { - setFailAnalysisForTemporaryUnavailablePipeSchema(analysis, e); + if (setTemporaryUnavailableStatusIfNecessary(analysis, e)) { return false; } final String exceptionMessage = @@ -698,7 +692,7 @@ private void setFailAnalysisForTemporaryUnavailablePipeSchema( final IAnalysis analysis, final Throwable throwable) { final String exceptionMessage = String.format( - "Pipe generated LoadTsFile is waiting for schema metadata to be transferred. Detail: %s", + DataNodeQueryMessages.PIPE_GENERATED_LOAD_TSFILE_WAITING_FOR_SCHEMA_METADATA, throwable.getMessage() == null ? throwable.getClass().getName() : throwable.getMessage()); @@ -710,8 +704,7 @@ private void setFailAnalysisForTemporaryUnavailablePipeSchema( private void executeTabletConversionOnException( final IAnalysis analysis, final LoadAnalyzeException e) { - if (isTemporaryUnavailableDueToPipeSchemaNotReady(e)) { - setFailAnalysisForTemporaryUnavailablePipeSchema(analysis, e); + if (setTemporaryUnavailableStatusIfNecessary(analysis, e)) { return; } @@ -796,6 +789,21 @@ private void executeTabletConversionOnException( setRealStatement(analysis); } + private boolean setTemporaryUnavailableStatusIfNecessary( + final IAnalysis analysis, final Throwable throwable) { + if (isTemporaryUnavailableDueToPipeSchemaNotReady(throwable)) { + setFailAnalysisForTemporaryUnavailablePipeSchema(analysis, throwable); + return true; + } + if (isGeneratedByPipe && LoadTsFileDataTypeConverter.isMemoryPressureException(throwable)) { + analysis.setFinishQueryAfterAnalyze(true); + analysis.setFailStatus(LoadTsFileDataTypeConverter.getMemoryPressureStatus(throwable)); + setRealStatement(analysis); + return true; + } + return false; + } + boolean isTemporaryUnavailableDueToPipeSchemaNotReady(final Throwable throwable) { if (!isGeneratedByPipe || !isVerifySchema @@ -805,10 +813,7 @@ boolean isTemporaryUnavailableDueToPipeSchemaNotReady(final Throwable throwable) Throwable current = throwable; while (current != null) { - final String message = current.getMessage(); - if (message != null - && message.contains(MISSING_SCHEMA_MESSAGE) - && message.contains(AUTO_CREATE_SCHEMA_HINT_MESSAGE)) { + if (current instanceof LoadAnalyzeMissingSchemaException) { return true; } current = current.getCause(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java index d4f4210727e28..bb93ac5b2d81d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java @@ -36,6 +36,7 @@ import org.apache.iotdb.db.auth.AuthorityChecker; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.load.LoadAnalyzeException; +import org.apache.iotdb.db.exception.load.LoadAnalyzeMissingSchemaException; import org.apache.iotdb.db.exception.load.LoadAnalyzeTypeMismatchException; import org.apache.iotdb.db.exception.load.LoadFileException; import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException; @@ -109,7 +110,7 @@ public void setCurrentTimeIndex(final ITimeIndex timeIndex) { public void autoCreateAndVerify( TsFileSequenceReader reader, Map> device2TimeseriesMetadataList) - throws IOException, AuthException, LoadAnalyzeTypeMismatchException { + throws IOException, AuthException, LoadAnalyzeException { for (final Map.Entry> entry : device2TimeseriesMetadataList.entrySet()) { final IDeviceID device = entry.getKey(); @@ -198,14 +199,14 @@ public void flushAndClearDeviceIsAlignedCacheIfNecessary() throws SemanticExcept schemaCache.clearDeviceIsAlignedCacheIfNecessary(); } - public void flush() throws AuthException, LoadAnalyzeTypeMismatchException { + public void flush() throws AuthException, LoadAnalyzeException { doAutoCreateAndVerify(); schemaCache.clearTimeSeries(); } private void doAutoCreateAndVerify() - throws SemanticException, AuthException, LoadAnalyzeTypeMismatchException { + throws SemanticException, AuthException, LoadAnalyzeException { if (schemaCache.getDevice2TimeSeries().isEmpty()) { return; } @@ -235,6 +236,11 @@ private void doAutoCreateAndVerify() } else { handleException(e, loadTsFileAnalyzer.getStatementString()); } + } catch (LoadAnalyzeMissingSchemaException e) { + if (loadTsFileAnalyzer.isTemporaryUnavailableDueToPipeSchemaNotReady(e)) { + throw e; + } + handleException(e, loadTsFileAnalyzer.getStatementString()); } catch (Exception e) { if (e.getCause() instanceof LoadAnalyzeTypeMismatchException && loadTsFileAnalyzer.isConvertOnTypeMismatch()) { @@ -449,10 +455,9 @@ private void verifySchema(ISchemaTree schemaTree) .collect(Collectors.toList())); if (iotdbDeviceSchemaInfo == null) { - throw new LoadAnalyzeException( + throw new LoadAnalyzeMissingSchemaException( String.format( - "Device %s does not exist in IoTDB and can not be created. " - + "Please check weather auto-create-schema is enabled.", + DataNodeQueryMessages.LOAD_TSFILE_DEVICE_SCHEMA_MISSING_AUTO_CREATE_DISABLED, device)); } @@ -475,10 +480,9 @@ private void verifySchema(ISchemaTree schemaTree) final IMeasurementSchema tsFileSchema = tsfileTimeseriesSchemas.get(i); final IMeasurementSchema iotdbSchema = iotdbTimeseriesSchemas.get(i); if (iotdbSchema == null) { - throw new LoadAnalyzeException( + throw new LoadAnalyzeMissingSchemaException( String.format( - "Measurement %s does not exist in IoTDB and can not be created. " - + "Please check weather auto-create-schema is enabled.", + DataNodeQueryMessages.LOAD_TSFILE_MEASUREMENT_SCHEMA_MISSING_AUTO_CREATE_DISABLED, device + TsFileConstant.PATH_SEPARATOR + tsfileTimeseriesSchemas.get(i))); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java index ab3e0eda54324..a2771eff4f7a0 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java @@ -77,6 +77,18 @@ public void testLoadTemporaryUnavailableClassification() throws Exception { .getCode()); } + @Test + public void testLoadFileErrorWithMemoryMessageIsNotClassifiedByMessage() throws Exception { + Assert.assertEquals( + TSStatusCode.LOAD_FILE_ERROR.getStatusCode(), + IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR + .process( + LoadTsFileStatement.createUnchecked("memory-error.tsfile"), + new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()) + .setMessage("memory pressure")) + .getCode()); + } + @Test public void testDatabaseNotExistRuntimeExceptionClassification() { Assert.assertEquals( diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java index 1bc6744766970..42b58adcf6c2d 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java @@ -19,12 +19,12 @@ package org.apache.iotdb.db.queryengine.plan.analyze.load; -import org.apache.iotdb.commons.exception.SemanticException; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.queryengine.plan.relational.metadata.ColumnSchema; import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.load.LoadAnalyzeException; +import org.apache.iotdb.db.exception.load.LoadAnalyzeMissingSchemaException; import org.apache.iotdb.db.exception.load.LoadAnalyzeTypeMismatchException; import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; @@ -213,15 +213,11 @@ public void testPipeGeneratedLoadMissingSchemaShouldBeTemporaryWhenAutoCreateDis new MPPQueryContext(new QueryId("load_pipe_test")))) { Assert.assertTrue( analyzer.isTemporaryUnavailableDueToPipeSchemaNotReady( - new LoadAnalyzeException( - "Device root.sg.d1 does not exist in IoTDB and can not be created. " - + "Please check weather auto-create-schema is enabled."))); + new LoadAnalyzeMissingSchemaException("missing device schema"))); Assert.assertTrue( analyzer.isTemporaryUnavailableDueToPipeSchemaNotReady( - new SemanticException( - "Auto create or verify schema error. Detail: Measurement root.sg.d1.s1 " - + "does not exist in IoTDB and can not be created. " - + "Please check weather auto-create-schema is enabled."))); + new RuntimeException( + "wrapped", new LoadAnalyzeMissingSchemaException("missing measurement schema")))); Assert.assertFalse( analyzer.isTemporaryUnavailableDueToPipeSchemaNotReady( new LoadAnalyzeException("Data type mismatch for measurement root.sg.d1.s1"))); From db3cab8767a648cb2f08557b4fd1f74cef2b9a7a Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Fri, 26 Jun 2026 12:04:20 +0800 Subject: [PATCH 3/3] Fix load tsfile tests with real temp files --- .../receiver/PipeStatementTsStatusVisitorTest.java | 11 +++++++++-- .../plan/analyze/load/LoadTsFileAnalyzerTest.java | 5 ++++- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java index a2771eff4f7a0..f2716d5c1a48a 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java @@ -31,6 +31,7 @@ import org.junit.Assert; import org.junit.Test; +import java.io.File; import java.util.Arrays; import java.util.Collections; @@ -67,11 +68,14 @@ StatusUtils.OK, new TSStatus(TSStatusCode.OUT_OF_TTL.getStatusCode())))) @Test public void testLoadTemporaryUnavailableClassification() throws Exception { + final File tsFile = File.createTempFile("temporary-unavailable", ".tsfile"); + tsFile.deleteOnExit(); + Assert.assertEquals( TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode(), IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR .process( - LoadTsFileStatement.createUnchecked("temporary-unavailable.tsfile"), + LoadTsFileStatement.createUnchecked(tsFile.getAbsolutePath()), new TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) .setMessage("schema is not ready")) .getCode()); @@ -79,11 +83,14 @@ public void testLoadTemporaryUnavailableClassification() throws Exception { @Test public void testLoadFileErrorWithMemoryMessageIsNotClassifiedByMessage() throws Exception { + final File tsFile = File.createTempFile("memory-error", ".tsfile"); + tsFile.deleteOnExit(); + Assert.assertEquals( TSStatusCode.LOAD_FILE_ERROR.getStatusCode(), IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR .process( - LoadTsFileStatement.createUnchecked("memory-error.tsfile"), + LoadTsFileStatement.createUnchecked(tsFile.getAbsolutePath()), new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()) .setMessage("memory pressure")) .getCode()); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java index 42b58adcf6c2d..68f4bde29e6a6 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java @@ -206,9 +206,12 @@ public void testPipeGeneratedLoadMissingSchemaShouldBeTemporaryWhenAutoCreateDis final boolean originalAutoCreateSchemaEnabled = IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled(); IoTDBDescriptor.getInstance().getConfig().setAutoCreateSchemaEnabled(false); + final File tsFile = File.createTempFile("missing-schema", ".tsfile"); + tsFile.deleteOnExit(); + try (final LoadTsFileAnalyzer analyzer = new LoadTsFileAnalyzer( - LoadTsFileStatement.createUnchecked("missing-schema.tsfile"), + LoadTsFileStatement.createUnchecked(tsFile.getAbsolutePath()), true, new MPPQueryContext(new QueryId("load_pipe_test")))) { Assert.assertTrue(