diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java index 158dd552c201d..58aced70ed392 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java @@ -36,6 +36,7 @@ import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; +import java.util.Arrays; import java.util.HashSet; import java.util.Set; @@ -61,14 +62,16 @@ protected void setupConfig() { .getConfig() .getCommonConfig() .setDataReplicationFactor(1) - .setSchemaReplicationFactor(1); + .setSchemaReplicationFactor(1) + .setPipeAutoSplitFullEnabled(true); receiverEnv .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(false) .setDatanodeMemoryProportion("3:3:1:1:1:0") .setDataReplicationFactor(1) - .setSchemaReplicationFactor(1); + .setSchemaReplicationFactor(1) + .setPipeAutoSplitFullEnabled(true); } @Test @@ -122,6 +125,63 @@ public void testReceiverAutoCreateSchemaDisabledWithSpecialTimeSeries() throws E } } + @Test + public void testAutoSplitHistoryTsFileWithDeletionWhenReceiverAutoCreateSchemaDisabled() + throws Exception { + TestUtils.executeNonQueries( + senderEnv, + Arrays.asList( + "create database root.sg", + "create timeseries root.sg.non_aligned.s1 with datatype=INT32", + "create timeseries root.sg.non_aligned.s2 with datatype=DOUBLE", + "create aligned timeseries root.sg.aligned(s1 INT32, s2 DOUBLE)", + "create timeseries root.sg.deleted_measurement.s1 with datatype=INT32", + "create timeseries root.sg.deleted_measurement.s2 with datatype=DOUBLE", + "insert into root.sg.non_aligned(time, s1, s2) values(1, 1, 1.0), (2, 2, 2.0), (3, 3, 3.0)", + "insert into root.sg.aligned(time, s1, s2) values(1, 10, 10.0), (2, 20, 20.0), (3, 30, 30.0)", + "insert into root.sg.deleted_measurement(time, s1, s2) values(1, 100, 100.0), (2, 200, 200.0)", + "flush", + "delete from root.sg.non_aligned.s1 where time > 2", + "delete from root.sg.aligned.* where time > 2", + "delete timeseries root.sg.deleted_measurement.s1", + "flush")); + + awaitUntilFlush(senderEnv); + + TestUtils.executeNonQuery( + senderEnv, + String.format( + "create pipe test with source ('inclusion'='all', 'source.history.enable'='true', 'source.realtime.mode'='batch') " + + "with sink ('sink'='iotdb-thrift-sink', 'sink.node-urls'='%s')", + receiverEnv.getDataNodeWrapper(0).getIpAndPortString())); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "select * from root.sg.non_aligned", + "Time,root.sg.non_aligned.s1,root.sg.non_aligned.s2,", + new HashSet<>(Arrays.asList("1,1,1.0,", "2,2,2.0,", "3,null,3.0,"))); + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "select * from root.sg.aligned", + "Time,root.sg.aligned.s1,root.sg.aligned.s2,", + new HashSet<>(Arrays.asList("1,10,10.0,", "2,20,20.0,"))); + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "select * from root.sg.deleted_measurement", + "Time,root.sg.deleted_measurement.s2,", + new HashSet<>(Arrays.asList("1,100.0,", "2,200.0,"))); + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "count timeseries root.sg.deleted_measurement.*", + "count(timeseries),", + new HashSet<>(Arrays.asList("1,"))); + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "show devices root.sg.aligned", + "Device,IsAligned,Template,TTL(ms),", + new HashSet<>(Arrays.asList("root.sg.aligned,true,null,INF,"))); + } + private QueryResult queryForResult(final Statement statement, final String sql) throws SQLException { try (final ResultSet resultSet = statement.executeQuery(sql)) { diff --git a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java index f870290c09a82..96b138a9fe138 100644 --- a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java +++ b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java @@ -314,6 +314,12 @@ public final class DataNodeQueryMessages { "Empty file detected, will skip loading this file: {}"; public static final String AUTO_CREATE_OR_VERIFY_SCHEMA_ERROR = "Auto create or verify schema error."; + public static final String LOAD_TSFILE_DEVICE_SCHEMA_MISSING_AUTO_CREATE_DISABLED = + "Device %s does not exist in IoTDB and can not be created. Please check whether auto-create-schema is enabled."; + public static final String LOAD_TSFILE_MEASUREMENT_SCHEMA_MISSING_AUTO_CREATE_DISABLED = + "Measurement %s does not exist in IoTDB and can not be created. Please check whether auto-create-schema is enabled."; + public static final String PIPE_GENERATED_LOAD_TSFILE_WAITING_FOR_SCHEMA_METADATA = + "Pipe generated LoadTsFile is waiting for schema metadata to be transferred. Detail: %s"; public static final String FAILED_TO_FIND_TAG_COLUMN_MAPPING_FOR_TABLE = "Failed to find tag column mapping for table {}"; public static final String AUTO_CREATE_DATABASE_FAILED_BECAUSE = diff --git a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java index b0dae59e19861..aa2436c442f71 100644 --- a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java +++ b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java @@ -312,6 +312,12 @@ public final class DataNodeQueryMessages { "检测到空文件,将跳过加载此文件:{}"; public static final String AUTO_CREATE_OR_VERIFY_SCHEMA_ERROR = "自动创建或验证 schema 出错。"; + public static final String LOAD_TSFILE_DEVICE_SCHEMA_MISSING_AUTO_CREATE_DISABLED = + "设备 %s 在 IoTDB 中不存在且无法被创建。请检查是否启用了 auto-create-schema。"; + public static final String LOAD_TSFILE_MEASUREMENT_SCHEMA_MISSING_AUTO_CREATE_DISABLED = + "时间序列 %s 在 IoTDB 中不存在且无法被创建。请检查是否启用了 auto-create-schema。"; + public static final String PIPE_GENERATED_LOAD_TSFILE_WAITING_FOR_SCHEMA_METADATA = + "Pipe 生成的 LoadTsFile 正在等待 schema 元数据传输完成。详情:%s"; public static final String FAILED_TO_FIND_TAG_COLUMN_MAPPING_FOR_TABLE = "未找到表 {} 的标签列映射"; public static final String AUTO_CREATE_DATABASE_FAILED_BECAUSE = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/load/LoadAnalyzeMissingSchemaException.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/load/LoadAnalyzeMissingSchemaException.java new file mode 100644 index 0000000000000..9f5e129d56f45 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/load/LoadAnalyzeMissingSchemaException.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.exception.load; + +public class LoadAnalyzeMissingSchemaException extends LoadAnalyzeException { + + public LoadAnalyzeMissingSchemaException(final String message) { + super(message); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java index ff2805b2e7dee..df316b79367e4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java @@ -74,9 +74,7 @@ public TSStatus visitNode(final StatementNode node, final TSStatus status) { public TSStatus visitLoadFile( final LoadTsFileStatement loadTsFileStatement, final TSStatus status) { if (status.getCode() == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode() - || status.getCode() == TSStatusCode.LOAD_FILE_ERROR.getStatusCode() - && status.getMessage() != null - && status.getMessage().contains("memory")) { + || status.getCode() == TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) { return new TSStatus( TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) .setMessage(status.getMessage()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java index e357b86e0c736..b1bdd077dc48d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java @@ -28,6 +28,7 @@ import org.apache.iotdb.commons.queryengine.utils.TimestampPrecisionUtils; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.load.LoadAnalyzeException; +import org.apache.iotdb.db.exception.load.LoadAnalyzeMissingSchemaException; import org.apache.iotdb.db.exception.load.LoadAnalyzeTypeMismatchException; import org.apache.iotdb.db.exception.load.LoadEmptyFileException; import org.apache.iotdb.db.i18n.DataNodeQueryMessages; @@ -222,6 +223,9 @@ public IAnalysis analyzeFileByFile(IAnalysis analysis) { executeTabletConversionOnException(analysis, e); return analysis; } catch (Exception e) { + if (setTemporaryUnavailableStatusIfNecessary(analysis, e)) { + return analysis; + } final String exceptionMessage = String.format( "Auto create or verify schema error when executing statement %s. Detail: %s.", @@ -346,6 +350,9 @@ private boolean doAnalyzeFileByFile(IAnalysis analysis) { "The file %s is not a valid tsfile. Please check the input file.", tsFile.getPath())); } catch (Exception e) { + if (setTemporaryUnavailableStatusIfNecessary(analysis, e)) { + return false; + } final String exceptionMessage = String.format( "Loading file %s failed. Detail: %s", @@ -681,8 +688,26 @@ private void setFailAnalysisForAuthException(IAnalysis analysis, AuthException e analysis.setFailStatus(RpcUtils.getStatus(e.getCode(), e.getMessage())); } + private void setFailAnalysisForTemporaryUnavailablePipeSchema( + final IAnalysis analysis, final Throwable throwable) { + final String exceptionMessage = + String.format( + DataNodeQueryMessages.PIPE_GENERATED_LOAD_TSFILE_WAITING_FOR_SCHEMA_METADATA, + throwable.getMessage() == null + ? throwable.getClass().getName() + : throwable.getMessage()); + analysis.setFinishQueryAfterAnalyze(true); + analysis.setFailStatus( + RpcUtils.getStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION, exceptionMessage)); + setRealStatement(analysis); + } + private void executeTabletConversionOnException( final IAnalysis analysis, final LoadAnalyzeException e) { + if (setTemporaryUnavailableStatusIfNecessary(analysis, e)) { + return; + } + if (shouldSkipConversion(e)) { analysis.setFailStatus( new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage())); @@ -764,6 +789,38 @@ private void executeTabletConversionOnException( setRealStatement(analysis); } + private boolean setTemporaryUnavailableStatusIfNecessary( + final IAnalysis analysis, final Throwable throwable) { + if (isTemporaryUnavailableDueToPipeSchemaNotReady(throwable)) { + setFailAnalysisForTemporaryUnavailablePipeSchema(analysis, throwable); + return true; + } + if (isGeneratedByPipe && LoadTsFileDataTypeConverter.isMemoryPressureException(throwable)) { + analysis.setFinishQueryAfterAnalyze(true); + analysis.setFailStatus(LoadTsFileDataTypeConverter.getMemoryPressureStatus(throwable)); + setRealStatement(analysis); + return true; + } + return false; + } + + boolean isTemporaryUnavailableDueToPipeSchemaNotReady(final Throwable throwable) { + if (!isGeneratedByPipe + || !isVerifySchema + || IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()) { + return false; + } + + Throwable current = throwable; + while (current != null) { + if (current instanceof LoadAnalyzeMissingSchemaException) { + return true; + } + current = current.getCause(); + } + return false; + } + private boolean shouldSkipConversion(LoadAnalyzeException e) { return (e instanceof LoadAnalyzeTypeMismatchException) && !isConvertOnTypeMismatch; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java index d4f4210727e28..bb93ac5b2d81d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java @@ -36,6 +36,7 @@ import org.apache.iotdb.db.auth.AuthorityChecker; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.load.LoadAnalyzeException; +import org.apache.iotdb.db.exception.load.LoadAnalyzeMissingSchemaException; import org.apache.iotdb.db.exception.load.LoadAnalyzeTypeMismatchException; import org.apache.iotdb.db.exception.load.LoadFileException; import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException; @@ -109,7 +110,7 @@ public void setCurrentTimeIndex(final ITimeIndex timeIndex) { public void autoCreateAndVerify( TsFileSequenceReader reader, Map> device2TimeseriesMetadataList) - throws IOException, AuthException, LoadAnalyzeTypeMismatchException { + throws IOException, AuthException, LoadAnalyzeException { for (final Map.Entry> entry : device2TimeseriesMetadataList.entrySet()) { final IDeviceID device = entry.getKey(); @@ -198,14 +199,14 @@ public void flushAndClearDeviceIsAlignedCacheIfNecessary() throws SemanticExcept schemaCache.clearDeviceIsAlignedCacheIfNecessary(); } - public void flush() throws AuthException, LoadAnalyzeTypeMismatchException { + public void flush() throws AuthException, LoadAnalyzeException { doAutoCreateAndVerify(); schemaCache.clearTimeSeries(); } private void doAutoCreateAndVerify() - throws SemanticException, AuthException, LoadAnalyzeTypeMismatchException { + throws SemanticException, AuthException, LoadAnalyzeException { if (schemaCache.getDevice2TimeSeries().isEmpty()) { return; } @@ -235,6 +236,11 @@ private void doAutoCreateAndVerify() } else { handleException(e, loadTsFileAnalyzer.getStatementString()); } + } catch (LoadAnalyzeMissingSchemaException e) { + if (loadTsFileAnalyzer.isTemporaryUnavailableDueToPipeSchemaNotReady(e)) { + throw e; + } + handleException(e, loadTsFileAnalyzer.getStatementString()); } catch (Exception e) { if (e.getCause() instanceof LoadAnalyzeTypeMismatchException && loadTsFileAnalyzer.isConvertOnTypeMismatch()) { @@ -449,10 +455,9 @@ private void verifySchema(ISchemaTree schemaTree) .collect(Collectors.toList())); if (iotdbDeviceSchemaInfo == null) { - throw new LoadAnalyzeException( + throw new LoadAnalyzeMissingSchemaException( String.format( - "Device %s does not exist in IoTDB and can not be created. " - + "Please check weather auto-create-schema is enabled.", + DataNodeQueryMessages.LOAD_TSFILE_DEVICE_SCHEMA_MISSING_AUTO_CREATE_DISABLED, device)); } @@ -475,10 +480,9 @@ private void verifySchema(ISchemaTree schemaTree) final IMeasurementSchema tsFileSchema = tsfileTimeseriesSchemas.get(i); final IMeasurementSchema iotdbSchema = iotdbTimeseriesSchemas.get(i); if (iotdbSchema == null) { - throw new LoadAnalyzeException( + throw new LoadAnalyzeMissingSchemaException( String.format( - "Measurement %s does not exist in IoTDB and can not be created. " - + "Please check weather auto-create-schema is enabled.", + DataNodeQueryMessages.LOAD_TSFILE_MEASUREMENT_SCHEMA_MISSING_AUTO_CREATE_DISABLED, device + TsFileConstant.PATH_SEPARATOR + tsfileTimeseriesSchemas.get(i))); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java index 756d11818251f..f2716d5c1a48a 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java @@ -24,12 +24,14 @@ import org.apache.iotdb.commons.utils.StatusUtils; import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiver; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.template.BatchActivateTemplateStatement; import org.apache.iotdb.rpc.TSStatusCode; import org.junit.Assert; import org.junit.Test; +import java.io.File; import java.util.Arrays; import java.util.Collections; @@ -64,6 +66,36 @@ StatusUtils.OK, new TSStatus(TSStatusCode.OUT_OF_TTL.getStatusCode())))) .getCode()); } + @Test + public void testLoadTemporaryUnavailableClassification() throws Exception { + final File tsFile = File.createTempFile("temporary-unavailable", ".tsfile"); + tsFile.deleteOnExit(); + + Assert.assertEquals( + TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode(), + IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR + .process( + LoadTsFileStatement.createUnchecked(tsFile.getAbsolutePath()), + new TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) + .setMessage("schema is not ready")) + .getCode()); + } + + @Test + public void testLoadFileErrorWithMemoryMessageIsNotClassifiedByMessage() throws Exception { + final File tsFile = File.createTempFile("memory-error", ".tsfile"); + tsFile.deleteOnExit(); + + Assert.assertEquals( + TSStatusCode.LOAD_FILE_ERROR.getStatusCode(), + IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR + .process( + LoadTsFileStatement.createUnchecked(tsFile.getAbsolutePath()), + new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()) + .setMessage("memory pressure")) + .getCode()); + } + @Test public void testDatabaseNotExistRuntimeExceptionClassification() { Assert.assertEquals( diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java index c533b0043e904..68f4bde29e6a6 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java @@ -23,6 +23,8 @@ import org.apache.iotdb.commons.queryengine.plan.relational.metadata.ColumnSchema; import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.exception.load.LoadAnalyzeException; +import org.apache.iotdb.db.exception.load.LoadAnalyzeMissingSchemaException; import org.apache.iotdb.db.exception.load.LoadAnalyzeTypeMismatchException; import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; @@ -198,6 +200,37 @@ public void testTreeSchemaVerifierShouldThrowMismatchWhenVerifyingDataType() thr } } + @Test + public void testPipeGeneratedLoadMissingSchemaShouldBeTemporaryWhenAutoCreateDisabled() + throws Exception { + final boolean originalAutoCreateSchemaEnabled = + IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled(); + IoTDBDescriptor.getInstance().getConfig().setAutoCreateSchemaEnabled(false); + final File tsFile = File.createTempFile("missing-schema", ".tsfile"); + tsFile.deleteOnExit(); + + try (final LoadTsFileAnalyzer analyzer = + new LoadTsFileAnalyzer( + LoadTsFileStatement.createUnchecked(tsFile.getAbsolutePath()), + true, + new MPPQueryContext(new QueryId("load_pipe_test")))) { + Assert.assertTrue( + analyzer.isTemporaryUnavailableDueToPipeSchemaNotReady( + new LoadAnalyzeMissingSchemaException("missing device schema"))); + Assert.assertTrue( + analyzer.isTemporaryUnavailableDueToPipeSchemaNotReady( + new RuntimeException( + "wrapped", new LoadAnalyzeMissingSchemaException("missing measurement schema")))); + Assert.assertFalse( + analyzer.isTemporaryUnavailableDueToPipeSchemaNotReady( + new LoadAnalyzeException("Data type mismatch for measurement root.sg.d1.s1"))); + } finally { + IoTDBDescriptor.getInstance() + .getConfig() + .setAutoCreateSchemaEnabled(originalAutoCreateSchemaEnabled); + } + } + private void writeTableTsFileWithMixedDevices(final File tsFile) throws Exception { if (tsFile.exists()) { Assert.assertTrue(tsFile.delete());