From fb5a55bd0dba94541d407b508fd74da3130dceef Mon Sep 17 00:00:00 2001 From: Refrain Date: Fri, 26 Jun 2026 11:51:32 +0800 Subject: [PATCH 1/2] [feature](fe) Add routine load target-table alter support ### What problem does this PR solve? Issue Number: None Related PR: None Problem Summary: Allow paused single-table Routine Load jobs to switch their target table with ALTER ROUTINE LOAD FOR [db.]job ON , while preserving existing progress and replaying the new table binding from edit log. ### Release note Support ALTER ROUTINE LOAD ... ON
to switch the target table for paused single-table routine load jobs. ### Check List (For Author) - Test: FE unit test - "/data/data3/huangruixin/include/src-master/apache-maven-3.9.9/bin/mvn -pl fe-core -am -DskipITs -Dcheckstyle.skip=true -DfailIfNoTests=false -Dtest=org.apache.doris.nereids.trees.plans.commands.AlterRoutineLoadCommandTest,org.apache.doris.persist.AlterRoutineLoadOperationLogTest,org.apache.doris.load.routineload.KafkaRoutineLoadJobTest test" - Kinesis unit tests skipped per user request - Behavior changed: Yes (new ALTER ROUTINE LOAD target-table switch behavior) - Does this need documentation: Yes (documented in /data/data3/huangruixin/docs/routine-load-alter-table-design.html) --- .../load/routineload/RoutineLoadJob.java | 25 +++ .../kafka/KafkaRoutineLoadJob.java | 8 +- .../kinesis/KinesisRoutineLoadJob.java | 8 +- .../nereids/parser/LogicalPlanBuilder.java | 7 +- .../commands/AlterRoutineLoadCommand.java | 73 ++++++- .../AlterRoutineLoadJobOperationLog.java | 12 ++ .../routineload/KafkaRoutineLoadJobTest.java | 19 ++ .../commands/AlterRoutineLoadCommandTest.java | 180 +++++++++++++++-- .../AlterRoutineLoadOperationLogTest.java | 63 ++++++ .../org/apache/doris/nereids/DorisParser.g4 | 11 +- .../test_routine_load_alter.groovy | 191 ++++++++++++++++++ 11 files changed, 572 insertions(+), 25 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 3223ff913594e1..e7c69da5a77488 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -468,6 +468,31 @@ protected void setRoutineLoadDesc(RoutineLoadDesc routineLoadDesc) { } } + protected RoutineLoadDesc buildRoutineLoadDescSnapshot() { + List columnsInfo = null; + if (columnDescs != null && !columnDescs.descs.isEmpty()) { + columnsInfo = new ArrayList<>(columnDescs.descs); + } + return new RoutineLoadDesc(columnSeparator, lineDelimiter, columnsInfo, precedingFilter, whereExpr, + partitionNamesInfo, deleteCondition, mergeType, sequenceCol); + } + + public void validateTargetTable(Database db, OlapTable targetTable) throws UserException { + if (isMultiTable) { + throw new AnalysisException("ALTER ROUTINE LOAD target table change only supports single-table job"); + } + checkMeta(targetTable, buildRoutineLoadDescSnapshot()); + + targetTable.readLock(); + try { + NereidsStreamLoadPlanner planner = new NereidsStreamLoadPlanner(db, targetTable, + toNereidsRoutineLoadTaskInfo()); + planner.plan(new TUniqueId(0, 0)); + } finally { + targetTable.readUnlock(); + } + } + @Override public long getId() { return id; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaRoutineLoadJob.java index dcb683e9e227dd..c66f5e2f1a5e70 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaRoutineLoadJob.java @@ -757,9 +757,12 @@ public void modifyProperties(AlterRoutineLoadCommand command) throws UserExcepti } modifyPropertiesInternal(jobProperties, dataSourceProperties); + if (command.hasTargetTable()) { + this.tableId = command.getTargetTableId(); + } AlterRoutineLoadJobOperationLog log = new AlterRoutineLoadJobOperationLog(this.id, - jobProperties, dataSourceProperties); + jobProperties, dataSourceProperties, command.getTargetTableId()); Env.getCurrentEnv().getEditLog().logAlterRoutineLoadJob(log); } finally { writeUnlock(); @@ -883,6 +886,9 @@ private void resetCloudProgress(Cloud.ResetRLProgressRequest.Builder builder) th public void replayModifyProperties(AlterRoutineLoadJobOperationLog log) { try { modifyPropertiesInternal(log.getJobProperties(), (KafkaDataSourceProperties) log.getDataSourceProperties()); + if (log.getTargetTableId() != 0) { + this.tableId = log.getTargetTableId(); + } } catch (UserException e) { // should not happen LOG.error("failed to replay modify kafka routine load job: {}", id, e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kinesis/KinesisRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kinesis/KinesisRoutineLoadJob.java index 0c05889929924e..a60f1b1f309d76 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kinesis/KinesisRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kinesis/KinesisRoutineLoadJob.java @@ -689,9 +689,12 @@ public void modifyProperties(AlterRoutineLoadCommand command) throws UserExcepti } modifyPropertiesInternal(jobProperties, dataSourceProperties); + if (command.hasTargetTable()) { + this.tableId = command.getTargetTableId(); + } AlterRoutineLoadJobOperationLog log = new AlterRoutineLoadJobOperationLog(this.id, - jobProperties, dataSourceProperties); + jobProperties, dataSourceProperties, command.getTargetTableId()); Env.getCurrentEnv().getEditLog().logAlterRoutineLoadJob(log); } finally { writeUnlock(); @@ -785,6 +788,9 @@ public void replayModifyProperties(AlterRoutineLoadJobOperationLog log) { try { modifyPropertiesInternal(log.getJobProperties(), (KinesisDataSourceProperties) log.getDataSourceProperties()); + if (log.getTargetTableId() != 0) { + this.tableId = log.getTargetTableId(); + } } catch (UserException e) { LOG.error("failed to replay modify kinesis routine load job: {}", id, e); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index c470bc789d9e78..4116eab04c844a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -9284,6 +9284,10 @@ public LogicalPlan visitAlterRoutineLoad(DorisParser.AlterRoutineLoadContext ctx } LabelNameInfo labelNameInfo = new LabelNameInfo(dbName, jobName); + if (ctx.table != null) { + return new AlterRoutineLoadCommand(labelNameInfo, ctx.table.getText()); + } + Map properties = new HashMap<>(); if (ctx.properties != null) { properties.putAll(visitPropertyClause(ctx.properties)); @@ -9309,7 +9313,8 @@ public LogicalPlan visitAlterRoutineLoad(DorisParser.AlterRoutineLoadContext ctx } } - return new AlterRoutineLoadCommand(labelNameInfo, loadPropertyMap, properties, dataSourceMapProperties); + return new AlterRoutineLoadCommand(labelNameInfo, null, + loadPropertyMap, properties, dataSourceMapProperties); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommand.java index 367480c5d934d9..dce80149607c60 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommand.java @@ -21,12 +21,16 @@ import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.RandomDistributionInfo; import org.apache.doris.catalog.Table; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; import org.apache.doris.common.FeNameFormat; import org.apache.doris.common.UserException; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.common.util.Util; +import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.datasource.property.fileformat.CsvFileFormatProperties; import org.apache.doris.datasource.property.fileformat.JsonFileFormatProperties; import org.apache.doris.load.RoutineLoadDesc; @@ -40,6 +44,7 @@ import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.mysql.privilege.PrivPredicate; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; @@ -85,10 +90,12 @@ public class AlterRoutineLoadCommand extends AlterCommand { .build(); private final LabelNameInfo labelNameInfo; + private final String targetTableName; private final Map loadPropertyMap; private RoutineLoadDesc routineLoadDesc; private final Map jobProperties; private final Map dataSourceMapProperties; + private long targetTableId; private boolean isPartialUpdate; // save analyzed job properties. @@ -100,6 +107,7 @@ public class AlterRoutineLoadCommand extends AlterCommand { * AlterRoutineLoadCommand */ public AlterRoutineLoadCommand(LabelNameInfo labelNameInfo, + String targetTableName, Map loadPropertyMap, Map jobProperties, Map dataSourceMapProperties) { @@ -108,6 +116,7 @@ public AlterRoutineLoadCommand(LabelNameInfo labelNameInfo, Objects.requireNonNull(jobProperties, "jobProperties is null"); Objects.requireNonNull(dataSourceMapProperties, "dataSourceMapProperties is null"); this.labelNameInfo = labelNameInfo; + this.targetTableName = targetTableName; this.loadPropertyMap = loadPropertyMap == null ? Maps.newHashMap() : loadPropertyMap; this.jobProperties = jobProperties; this.dataSourceMapProperties = dataSourceMapProperties; @@ -118,7 +127,11 @@ public AlterRoutineLoadCommand(LabelNameInfo labelNameInfo, public AlterRoutineLoadCommand(LabelNameInfo labelNameInfo, Map jobProperties, Map dataSourceMapProperties) { - this(labelNameInfo, Maps.newHashMap(), jobProperties, dataSourceMapProperties); + this(labelNameInfo, null, Maps.newHashMap(), jobProperties, dataSourceMapProperties); + } + + public AlterRoutineLoadCommand(LabelNameInfo labelNameInfo, String targetTableName) { + this(labelNameInfo, targetTableName, Maps.newHashMap(), Maps.newHashMap(), Maps.newHashMap()); } public String getDbName() { @@ -133,6 +146,18 @@ public Map getAnalyzedJobProperties() { return analyzedJobProperties; } + public boolean hasTargetTable() { + return !StringUtil.isEmpty(targetTableName); + } + + public String getTargetTableName() { + return targetTableName; + } + + public long getTargetTableId() { + return targetTableId; + } + public boolean hasDataSourceProperty() { return MapUtils.isNotEmpty(dataSourceMapProperties); } @@ -161,18 +186,27 @@ public void doRun(ConnectContext ctx, StmtExecutor executor) throws Exception { public void validate(ConnectContext ctx) throws UserException { labelNameInfo.validate(ctx); FeNameFormat.checkCommonName(NAME_TYPE, labelNameInfo.getLabel()); + if (hasTargetTable() && (MapUtils.isNotEmpty(loadPropertyMap) || MapUtils.isNotEmpty(jobProperties) + || MapUtils.isNotEmpty(dataSourceMapProperties))) { + throw new AnalysisException("ALTER ROUTINE LOAD target table change does not support other properties"); + } // check routine load job properties include desired concurrent number etc. checkJobProperties(); // check load properties RoutineLoadJob job = Env.getCurrentEnv().getRoutineLoadManager() .getJob(getDbName(), getJobName()); - this.routineLoadDesc = CreateRoutineLoadInfo.checkLoadProperties(ctx, loadPropertyMap, - job.getDbFullName(), job.getTableName(), job.isMultiTable(), job.getMergeType()); + if (MapUtils.isNotEmpty(loadPropertyMap)) { + this.routineLoadDesc = CreateRoutineLoadInfo.checkLoadProperties(ctx, loadPropertyMap, + job.getDbFullName(), job.getTableName(), job.isMultiTable(), job.getMergeType()); + } // check data source properties checkDataSourceProperties(); + if (hasTargetTable()) { + validateTargetTable(ctx, job); + } checkPartialUpdate(); if (analyzedJobProperties.isEmpty() && MapUtils.isEmpty(dataSourceMapProperties) - && routineLoadDesc == null) { + && MapUtils.isEmpty(loadPropertyMap) && !hasTargetTable()) { throw new AnalysisException("No properties are specified"); } } @@ -333,17 +367,44 @@ private void checkPartialUpdate() throws UserException { return; } RoutineLoadJob job = Env.getCurrentEnv().getRoutineLoadManager() - .getJob(getDbName(), getDbName()); + .getJob(getDbName(), getJobName()); if (job.isMultiTable()) { throw new AnalysisException("load by PARTIAL_COLUMNS is not supported in multi-table load."); } Database db = Env.getCurrentInternalCatalog().getDbOrAnalysisException(job.getDbFullName()); - Table table = db.getTableOrAnalysisException(job.getTableName()); + String tableName = hasTargetTable() ? targetTableName : job.getTableName(); + Table table = db.getTableOrAnalysisException(tableName); if (isPartialUpdate && !((OlapTable) table).getEnableUniqueKeyMergeOnWrite()) { throw new AnalysisException("load by PARTIAL_COLUMNS is only supported in unique table MoW"); } } + private void validateTargetTable(ConnectContext ctx, RoutineLoadJob job) throws UserException { + if (job.isMultiTable()) { + throw new AnalysisException("ALTER ROUTINE LOAD target table change only supports single-table job"); + } + Database db = Env.getCurrentInternalCatalog().getDbOrAnalysisException(job.getDbFullName()); + Table table = db.getTableOrAnalysisException(targetTableName); + if (!(table instanceof OlapTable)) { + throw new AnalysisException("ALTER ROUTINE LOAD target table only supports OLAP table"); + } + OlapTable olapTable = (OlapTable) table; + if (olapTable.isTemporary()) { + throw new AnalysisException("Do not support load for temporary table " + olapTable.getDisplayName()); + } + if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ctx, InternalCatalog.INTERNAL_CATALOG_NAME, + job.getDbFullName(), targetTableName, PrivPredicate.LOAD)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", + ctx.getQualifiedUser(), ctx.getRemoteIP(), job.getDbFullName() + ": " + targetTableName); + } + if (job.isLoadToSingleTablet() + && !(olapTable.getDefaultDistributionInfo() instanceof RandomDistributionInfo)) { + throw new AnalysisException("if load_to_single_tablet set to true, the olap table must be with random distribution"); + } + job.validateTargetTable(db, olapTable); + this.targetTableId = olapTable.getId(); + } + @Override public R accept(PlanVisitor visitor, C context) { return visitor.visitAlterRoutineLoadCommand(this, context); diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/AlterRoutineLoadJobOperationLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/AlterRoutineLoadJobOperationLog.java index 4729882f7927fb..ae7c8ad214de7c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/AlterRoutineLoadJobOperationLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/AlterRoutineLoadJobOperationLog.java @@ -37,12 +37,20 @@ public class AlterRoutineLoadJobOperationLog implements Writable { private Map jobProperties; @SerializedName(value = "dataSourceProperties") private AbstractDataSourceProperties dataSourceProperties; + @SerializedName(value = "targetTableId") + private long targetTableId; public AlterRoutineLoadJobOperationLog(long jobId, Map jobProperties, AbstractDataSourceProperties dataSourceProperties) { + this(jobId, jobProperties, dataSourceProperties, 0L); + } + + public AlterRoutineLoadJobOperationLog(long jobId, Map jobProperties, + AbstractDataSourceProperties dataSourceProperties, long targetTableId) { this.jobId = jobId; this.jobProperties = jobProperties; this.dataSourceProperties = dataSourceProperties; + this.targetTableId = targetTableId; } public long getJobId() { @@ -57,6 +65,10 @@ public AbstractDataSourceProperties getDataSourceProperties() { return dataSourceProperties; } + public long getTargetTableId() { + return targetTableId; + } + public static AlterRoutineLoadJobOperationLog read(DataInput in) throws IOException { String json = Text.readString(in); return GsonUtils.GSON.fromJson(json, AlterRoutineLoadJobOperationLog.class); diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java index 8b442012ff2c04..7a1d11513256c8 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java @@ -44,6 +44,7 @@ import org.apache.doris.nereids.trees.plans.commands.info.LabelNameInfo; import org.apache.doris.nereids.trees.plans.commands.load.LoadProperty; import org.apache.doris.nereids.trees.plans.commands.load.LoadSeparator; +import org.apache.doris.persist.AlterRoutineLoadJobOperationLog; import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TResourceInfo; @@ -415,6 +416,24 @@ public void testFromCreateStmt() throws UserException { } } + @Test + public void testReplayModifyPropertiesSwitchesTargetTableWithoutResettingProgress() { + KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", 1L, + 101L, "127.0.0.1:9020", "topic1", UserIdentity.ADMIN); + Map partitionToOffset = Maps.newHashMap(); + partitionToOffset.put(1, 123L); + KafkaProgress progress = new KafkaProgress(partitionToOffset); + Deencapsulation.setField(routineLoadJob, "progress", progress); + + AlterRoutineLoadJobOperationLog log = new AlterRoutineLoadJobOperationLog(1L, + Maps.newHashMap(), null, 202L); + routineLoadJob.replayModifyProperties(log); + + Assert.assertEquals(202L, routineLoadJob.getTableId()); + Assert.assertSame(progress, routineLoadJob.getProgress()); + Assert.assertEquals(Long.valueOf(123L), ((KafkaProgress) routineLoadJob.getProgress()).getOffsetByPartition(1)); + } + private CreateRoutineLoadInfo initCreateRoutineLoadInfo() { Map properties = Maps.newHashMap(); properties.put(CreateRoutineLoadInfo.DESIRED_CONCURRENT_NUMBER_PROPERTY, "2"); diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommandTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommandTest.java index 0989bc127b046b..f52b269bfb79e6 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommandTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommandTest.java @@ -19,16 +19,23 @@ import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.RandomDistributionInfo; import org.apache.doris.catalog.Table; +import org.apache.doris.common.AnalysisException; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.load.loadv2.LoadTask; import org.apache.doris.load.routineload.RoutineLoadJob; import org.apache.doris.load.routineload.RoutineLoadManager; +import org.apache.doris.mysql.privilege.AccessControllerManager; +import org.apache.doris.nereids.exceptions.ParseException; +import org.apache.doris.nereids.parser.NereidsParser; import org.apache.doris.nereids.trees.plans.commands.info.CreateRoutineLoadInfo; import org.apache.doris.nereids.trees.plans.commands.info.LabelNameInfo; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.QueryState; import org.apache.doris.qe.SessionVariable; +import org.apache.doris.thrift.TUniqueKeyUpdateMode; import com.google.common.collect.Maps; import org.junit.jupiter.api.AfterEach; @@ -41,8 +48,16 @@ import java.util.Map; public class AlterRoutineLoadCommandTest { + private static final NereidsParser PARSER = new NereidsParser(); + private Env env; private ConnectContext connectContext; + private AccessControllerManager accessManager; + private InternalCatalog catalog; + private Database db; + private OlapTable currentTable; + private RoutineLoadManager routineLoadManager; + private RoutineLoadJob routineLoadJob; private MockedStatic envMockedStatic; private MockedStatic ctxMockedStatic; @@ -50,26 +65,38 @@ public class AlterRoutineLoadCommandTest { public void setUp() throws Exception { env = Mockito.mock(Env.class); connectContext = Mockito.mock(ConnectContext.class); + accessManager = Mockito.mock(AccessControllerManager.class); + catalog = Mockito.mock(InternalCatalog.class); + db = Mockito.mock(Database.class); + currentTable = Mockito.mock(OlapTable.class); + routineLoadManager = Mockito.mock(RoutineLoadManager.class); + routineLoadJob = Mockito.mock(RoutineLoadJob.class); envMockedStatic = Mockito.mockStatic(Env.class); ctxMockedStatic = Mockito.mockStatic(ConnectContext.class); envMockedStatic.when(Env::getCurrentEnv).thenReturn(env); ctxMockedStatic.when(ConnectContext::get).thenReturn(connectContext); Mockito.when(connectContext.getSessionVariable()).thenReturn(new SessionVariable()); Mockito.when(connectContext.getState()).thenReturn(new QueryState()); - InternalCatalog catalog = Mockito.mock(InternalCatalog.class); - Database db = Mockito.mock(Database.class); - Table tbl = Mockito.mock(Table.class); + Mockito.when(connectContext.getQualifiedUser()).thenReturn("testUser"); + Mockito.when(connectContext.getRemoteIP()).thenReturn("127.0.0.1"); envMockedStatic.when(Env::getCurrentInternalCatalog).thenReturn(catalog); Mockito.doReturn(db).when(catalog).getDbOrAnalysisException(Mockito.anyString()); - Mockito.doReturn(tbl).when(db).getTableOrAnalysisException(Mockito.anyString()); - Mockito.when(env.getRoutineLoadManager()).thenReturn(Mockito.mock(RoutineLoadManager.class)); - RoutineLoadManager rlm = env.getRoutineLoadManager(); - RoutineLoadJob rlJob = Mockito.mock(RoutineLoadJob.class); - Mockito.when(rlm.getJob(Mockito.anyString(), Mockito.anyString())).thenReturn(rlJob); - Mockito.when(rlJob.getDbFullName()).thenReturn("testDb"); - Mockito.when(rlJob.getTableName()).thenReturn("testTable"); - Mockito.when(rlJob.isMultiTable()).thenReturn(false); - Mockito.when(rlJob.getMergeType()).thenReturn(LoadTask.MergeType.APPEND); + Mockito.doReturn(currentTable).when(db).getTableOrAnalysisException(Mockito.anyString()); + Mockito.when(env.getRoutineLoadManager()).thenReturn(routineLoadManager); + Mockito.when(env.getAccessManager()).thenReturn(accessManager); + Mockito.when(accessManager.checkTblPriv(Mockito.any(ConnectContext.class), Mockito.anyString(), + Mockito.anyString(), Mockito.anyString(), Mockito.any())).thenReturn(true); + Mockito.when(routineLoadManager.getJob(Mockito.anyString(), Mockito.anyString())).thenReturn(routineLoadJob); + Mockito.when(routineLoadJob.getDbFullName()).thenReturn("testDb"); + Mockito.when(routineLoadJob.getTableName()).thenReturn("testTable"); + Mockito.when(routineLoadJob.getDbId()).thenReturn(1000L); + Mockito.when(routineLoadJob.getTableId()).thenReturn(2000L); + Mockito.when(routineLoadJob.isMultiTable()).thenReturn(false); + Mockito.when(routineLoadJob.getMergeType()).thenReturn(LoadTask.MergeType.APPEND); + Mockito.when(routineLoadJob.isLoadToSingleTablet()).thenReturn(false); + Mockito.when(routineLoadJob.getUniqueKeyUpdateMode()).thenReturn(TUniqueKeyUpdateMode.UPSERT); + Mockito.when(currentTable.getType()).thenReturn(Table.TableType.OLAP); + Mockito.when(currentTable.isTemporary()).thenReturn(false); } @AfterEach @@ -86,6 +113,14 @@ private void runBefore() { Mockito.when(connectContext.isSkipAuth()).thenReturn(true); } + private void mockTargetTable(Table table) { + try { + Mockito.doReturn(table).when(db).getTableOrAnalysisException("testTable2"); + } catch (AnalysisException e) { + throw new RuntimeException(e); + } + } + @Test public void testValidate() { runBefore(); @@ -115,4 +150,125 @@ public void testValidate() { Assertions.assertTrue(command.getAnalyzedJobProperties().containsKey(CreateRoutineLoadInfo.STRICT_MODE)); Assertions.assertTrue(command.getAnalyzedJobProperties().containsKey(CreateRoutineLoadInfo.TIMEZONE)); } + + @Test + public void testParseAlterRoutineLoadOnTargetTable() { + AlterRoutineLoadCommand command = (AlterRoutineLoadCommand) PARSER.parseSingle( + "ALTER ROUTINE LOAD FOR testDb.label1 ON testTable2"); + Assertions.assertEquals("testDb", command.getDbName()); + Assertions.assertEquals("label1", command.getJobName()); + Assertions.assertTrue(command.hasTargetTable()); + Assertions.assertEquals("testTable2", command.getTargetTableName()); + } + + @Test + public void testParseAlterRoutineLoadOnTargetTableRejectMixedProperties() { + Assertions.assertThrows(ParseException.class, () -> PARSER.parseSingle( + "ALTER ROUTINE LOAD FOR testDb.label1 ON testTable2 PROPERTIES(\"max_error_number\"=\"1\")")); + Assertions.assertThrows(ParseException.class, () -> PARSER.parseSingle( + "ALTER ROUTINE LOAD FOR testDb.label1 ON testTable2 FROM KAFKA(\"kafka_offsets\"=\"100\")")); + Assertions.assertThrows(ParseException.class, () -> PARSER.parseSingle( + "ALTER ROUTINE LOAD FOR testDb.label1 ON testTable2 COLUMNS(k1)")); + } + + @Test + public void testValidateTargetTableOnlyDoesNotRequireOtherProperties() throws Exception { + runBefore(); + mockTargetTable(currentTable); + + AlterRoutineLoadCommand command = (AlterRoutineLoadCommand) PARSER.parseSingle( + "ALTER ROUTINE LOAD FOR testDb.label1 ON testTable2"); + Assertions.assertDoesNotThrow(() -> command.validate(connectContext)); + Assertions.assertEquals("testTable2", command.getTargetTableName()); + } + + @Test + public void testValidateTargetTableRejectsMultiTableJob() throws Exception { + runBefore(); + Mockito.when(routineLoadJob.isMultiTable()).thenReturn(true); + AlterRoutineLoadCommand command = (AlterRoutineLoadCommand) PARSER.parseSingle( + "ALTER ROUTINE LOAD FOR testDb.label1 ON testTable2"); + + Assertions.assertTrue(Assertions.assertThrows(Exception.class, () -> command.validate(connectContext)) + .getMessage().contains("single-table")); + } + + @Test + public void testValidateTargetTableRejectsWithoutLoadPrivilege() throws Exception { + runBefore(); + mockTargetTable(currentTable); + Mockito.when(accessManager.checkTblPriv(Mockito.any(ConnectContext.class), Mockito.anyString(), + Mockito.eq("testDb"), Mockito.eq("testTable2"), Mockito.any())).thenReturn(false); + AlterRoutineLoadCommand command = (AlterRoutineLoadCommand) PARSER.parseSingle( + "ALTER ROUTINE LOAD FOR testDb.label1 ON testTable2"); + + Assertions.assertTrue(Assertions.assertThrows(Exception.class, () -> command.validate(connectContext)) + .getMessage().contains("LOAD")); + } + + @Test + public void testValidateTargetTableRejectsTemporaryTable() throws Exception { + runBefore(); + OlapTable tempTable = Mockito.mock(OlapTable.class); + Mockito.when(tempTable.getType()).thenReturn(Table.TableType.OLAP); + Mockito.when(tempTable.isTemporary()).thenReturn(true); + mockTargetTable(tempTable); + AlterRoutineLoadCommand command = (AlterRoutineLoadCommand) PARSER.parseSingle( + "ALTER ROUTINE LOAD FOR testDb.label1 ON testTable2"); + + Assertions.assertTrue(Assertions.assertThrows(Exception.class, () -> command.validate(connectContext)) + .getMessage().contains("temporary table")); + } + + @Test + public void testValidateTargetTableRejectsLoadToSingleTabletWithoutRandomDistribution() throws Exception { + runBefore(); + OlapTable newTable = Mockito.mock(OlapTable.class); + Mockito.when(newTable.getType()).thenReturn(Table.TableType.OLAP); + Mockito.when(newTable.isTemporary()).thenReturn(false); + Mockito.when(newTable.getDefaultDistributionInfo()).thenReturn(null); + mockTargetTable(newTable); + Mockito.when(routineLoadJob.isLoadToSingleTablet()).thenReturn(true); + AlterRoutineLoadCommand command = (AlterRoutineLoadCommand) PARSER.parseSingle( + "ALTER ROUTINE LOAD FOR testDb.label1 ON testTable2"); + + Assertions.assertTrue(Assertions.assertThrows(Exception.class, () -> command.validate(connectContext)) + .getMessage().contains("load_to_single_tablet")); + } + + @Test + public void testValidateTargetTableAllowsLoadToSingleTabletWithRandomDistribution() throws Exception { + runBefore(); + OlapTable newTable = Mockito.mock(OlapTable.class); + RandomDistributionInfo distributionInfo = Mockito.mock(RandomDistributionInfo.class); + Mockito.when(newTable.getType()).thenReturn(Table.TableType.OLAP); + Mockito.when(newTable.isTemporary()).thenReturn(false); + Mockito.when(newTable.getDefaultDistributionInfo()).thenReturn(distributionInfo); + mockTargetTable(newTable); + Mockito.when(routineLoadJob.isLoadToSingleTablet()).thenReturn(true); + AlterRoutineLoadCommand command = (AlterRoutineLoadCommand) PARSER.parseSingle( + "ALTER ROUTINE LOAD FOR testDb.label1 ON testTable2"); + + Assertions.assertDoesNotThrow(() -> command.validate(connectContext)); + } + + @Test + public void testValidateTargetTablePassesTargetTableToJobValidation() throws Exception { + runBefore(); + OlapTable targetTable = Mockito.mock(OlapTable.class); + Mockito.when(targetTable.getType()).thenReturn(Table.TableType.OLAP); + Mockito.when(targetTable.isTemporary()).thenReturn(false); + Mockito.doReturn(targetTable).when(db).getTableOrAnalysisException("testTable2"); + Mockito.when(routineLoadJob.isLoadToSingleTablet()).thenReturn(false); + Mockito.doAnswer(invocation -> { + Assertions.assertSame(db, invocation.getArgument(0)); + Assertions.assertSame(targetTable, invocation.getArgument(1)); + return null; + }).when(routineLoadJob).validateTargetTable(Mockito.any(Database.class), Mockito.any(OlapTable.class)); + + AlterRoutineLoadCommand command = (AlterRoutineLoadCommand) PARSER.parseSingle( + "ALTER ROUTINE LOAD FOR testDb.label1 ON testTable2"); + + Assertions.assertDoesNotThrow(() -> command.validate(connectContext)); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/persist/AlterRoutineLoadOperationLogTest.java b/fe/fe-core/src/test/java/org/apache/doris/persist/AlterRoutineLoadOperationLogTest.java index 8a1550d48f5d13..063a6067581c17 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/persist/AlterRoutineLoadOperationLogTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/persist/AlterRoutineLoadOperationLogTest.java @@ -19,14 +19,18 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.common.io.Text; import org.apache.doris.load.routineload.kafka.KafkaConfiguration; import org.apache.doris.load.routineload.kafka.KafkaDataSourceProperties; import org.apache.doris.nereids.trees.plans.commands.info.CreateRoutineLoadInfo; +import org.apache.doris.persist.gson.GsonUtils; import com.google.common.collect.Maps; import org.junit.Assert; import org.junit.Test; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.File; @@ -86,4 +90,63 @@ public void testSerializeAlterRoutineLoadOperationLog() throws IOException, User } + @Test + public void testSerializeAlterRoutineLoadOperationLogWithTargetTableId() throws Exception { + long jobId = 1000; + long targetTableId = 2001; + Map jobProperties = Maps.newHashMap(); + Map dataSourceProperties = Maps.newHashMap(); + dataSourceProperties.put("property.group.id", "mygroup"); + KafkaDataSourceProperties routineLoadDataSourceProperties = new KafkaDataSourceProperties( + dataSourceProperties); + routineLoadDataSourceProperties.setAlter(true); + routineLoadDataSourceProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE); + routineLoadDataSourceProperties.analyze(); + + AlterRoutineLoadJobOperationLog log = new AlterRoutineLoadJobOperationLog(jobId, jobProperties, + routineLoadDataSourceProperties, targetTableId); + + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + try (DataOutputStream out = new DataOutputStream(byteArrayOutputStream)) { + log.write(out); + } + + AlterRoutineLoadJobOperationLog readLog; + try (DataInputStream in = new DataInputStream( + new ByteArrayInputStream(byteArrayOutputStream.toByteArray()))) { + readLog = AlterRoutineLoadJobOperationLog.read(in); + } + + Assert.assertEquals(targetTableId, readLog.getTargetTableId()); + } + + @Test + public void testDeserializeAlterRoutineLoadOperationLogWithoutTargetTableId() throws Exception { + long jobId = 1000; + Map jobProperties = Maps.newHashMap(); + jobProperties.put(CreateRoutineLoadInfo.DESIRED_CONCURRENT_NUMBER_PROPERTY, "5"); + Map dataSourceProperties = Maps.newHashMap(); + dataSourceProperties.put("property.group.id", "mygroup"); + KafkaDataSourceProperties routineLoadDataSourceProperties = new KafkaDataSourceProperties( + dataSourceProperties); + routineLoadDataSourceProperties.setAlter(true); + routineLoadDataSourceProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE); + routineLoadDataSourceProperties.analyze(); + AlterRoutineLoadJobOperationLog log = new AlterRoutineLoadJobOperationLog(jobId, + jobProperties, routineLoadDataSourceProperties, 0L); + String legacyJson = GsonUtils.GSON.toJson(log).replace(",\"targetTableId\":0", ""); + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + try (DataOutputStream out = new DataOutputStream(byteArrayOutputStream)) { + Text.writeString(out, legacyJson); + } + + AlterRoutineLoadJobOperationLog readLog; + try (DataInputStream in = new DataInputStream( + new ByteArrayInputStream(byteArrayOutputStream.toByteArray()))) { + readLog = AlterRoutineLoadJobOperationLog.read(in); + } + + Assert.assertEquals(0L, readLog.getTargetTableId()); + Assert.assertEquals("5", readLog.getJobProperties().get(CreateRoutineLoadInfo.DESIRED_CONCURRENT_NUMBER_PROPERTY)); + } } diff --git a/fe/fe-sql-parser/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 b/fe/fe-sql-parser/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 index cd2391b1775f1e..d16c184be0a814 100644 --- a/fe/fe-sql-parser/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 +++ b/fe/fe-sql-parser/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 @@ -334,10 +334,13 @@ supportedAlterStatement | ALTER SYSTEM RENAME COMPUTE GROUP name=identifier newName=identifier #alterSystemRenameComputeGroup | ALTER RESOURCE name=identifierOrText properties=propertyClause? #alterResource | ALTER REPOSITORY name=identifier properties=propertyClause? #alterRepository - | ALTER ROUTINE LOAD FOR name=multipartIdentifier - (loadProperty (COMMA loadProperty)*)? - properties=propertyClause? - (FROM type=identifier LEFT_PAREN propertyItemList RIGHT_PAREN)? #alterRoutineLoad + | ALTER ROUTINE LOAD FOR name=multipartIdentifier + ( + ON table=identifier + | (loadProperty (COMMA loadProperty)*)? + properties=propertyClause? + (FROM type=identifier LEFT_PAREN propertyItemList RIGHT_PAREN)? + ) #alterRoutineLoad | ALTER COLOCATE GROUP name=multipartIdentifier SET LEFT_PAREN propertyItemList RIGHT_PAREN #alterColocateGroup | ALTER USER (IF EXISTS)? grantUserIdentify diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_alter.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_alter.groovy index 32571b5e29abd8..f19ac5bb729904 100644 --- a/regression-test/suites/load_p0/routine_load/test_routine_load_alter.groovy +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_alter.groovy @@ -16,6 +16,7 @@ // under the License. import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.admin.NewTopic import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.clients.producer.ProducerConfig @@ -293,5 +294,195 @@ suite("test_routine_load_alter","p0") { sql "stop routine load for ${jobName}" sql "truncate table ${tableName}" } + + // test alter target table + def srcTableName = "test_routine_load_alter_src" + def dstTableName = "test_routine_load_alter_dst" + def alterTargetTopic = "test_routine_load_alter_target_table_${System.currentTimeMillis()}" + def alterTargetJob = "test_alter_target_table_${System.currentTimeMillis()}" + def alterTopicProducer = null + def alterTopicAdmin = null + try { + sql """ DROP TABLE IF EXISTS ${srcTableName} """ + sql """ DROP TABLE IF EXISTS ${dstTableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${srcTableName} ( + `k1` int(20) NULL, + `k2` string NULL, + `v1` date NULL, + `v2` string NULL, + `v3` datetime NULL, + `v4` string NULL + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 3 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + sql """ + CREATE TABLE IF NOT EXISTS ${dstTableName} ( + `k1` int(20) NULL, + `k2` string NULL, + `v1` date NULL, + `v2` string NULL, + `v3` datetime NULL, + `v4` string NULL + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 3 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + sql "sync" + + def topicProps = new Properties() + topicProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString()) + alterTopicAdmin = AdminClient.create(topicProps) + alterTopicAdmin.createTopics([new NewTopic(alterTargetTopic, 1, (short) 1)]).all().get() + + def producerProps = new Properties() + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString()) + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringSerializer") + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringSerializer") + producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000") + producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000") + alterTopicProducer = new KafkaProducer<>(producerProps) + + def firstBatch = new File("""${context.file.parent}/data/${kafkaCsvTpoics[0]}.csv""").readLines() + firstBatch.each { line -> + alterTopicProducer.send(new ProducerRecord<>(alterTargetTopic, null, line)).get() + } + alterTopicProducer.flush() + + sql """ + CREATE ROUTINE LOAD ${alterTargetJob} ON ${srcTableName} + COLUMNS TERMINATED BY "," + PROPERTIES + ( + "max_batch_interval" = "5", + "max_batch_rows" = "300000", + "max_batch_size" = "209715200" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${kafka_broker}", + "kafka_topic" = "${alterTargetTopic}", + "kafka_partitions" = "0", + "kafka_offsets" = "OFFSET_BEGINNING" + ); + """ + sql "sync" + + count = 0 + while (true) { + def res = sql "select count(*) from ${srcTableName}" + def state = sql "show routine load for ${alterTargetJob}" + log.info("routine load state: ${state[0][8].toString()}".toString()) + log.info("routine load statistic: ${state[0][14].toString()}".toString()) + log.info("reason of state changed: ${state[0][17].toString()}".toString()) + if (res[0][0] >= 3) { + break + } + if (count >= 120) { + log.error("routine load can not visible for long time") + assertEquals(3, res[0][0]) + break + } + sleep(1000) + count++ + } + + sql "pause routine load for ${alterTargetJob}" + def showBeforeAlter = sql "show routine load for ${alterTargetJob}" + assertEquals(srcTableName, showBeforeAlter[0][6].toString()) + def progressBeforeAlter = showBeforeAlter[0][15].toString() + + sql "ALTER ROUTINE LOAD FOR ${alterTargetJob} ON ${dstTableName}" + + def showAfterAlter = sql "show routine load for ${alterTargetJob}" + assertEquals(dstTableName, showAfterAlter[0][6].toString()) + assertEquals(progressBeforeAlter, showAfterAlter[0][15].toString()) + + def secondBatch = [ + "4,eab,2023-07-16,def,2023-07-21:05:48:31,ghi", + "5,eab,2023-07-17,def,2023-07-22:05:48:31,ghi", + "6,eab,2023-07-18,def,2023-07-23:05:48:31,ghi" + ] + secondBatch.each { line -> + alterTopicProducer.send(new ProducerRecord<>(alterTargetTopic, null, line)).get() + } + alterTopicProducer.flush() + + sql "resume routine load for ${alterTargetJob}" + + count = 0 + def stableCount = 0 + while (true) { + def srcCount = sql "select count(*) from ${srcTableName}" + def dstCount = sql "select count(*) from ${dstTableName}" + long srcCountValue = (srcCount[0][0] as Number).longValue() + long dstCountValue = (dstCount[0][0] as Number).longValue() + log.info("src count: ${srcCountValue}".toString()) + log.info("dst count: ${dstCountValue}".toString()) + if (srcCountValue == 3 && dstCountValue == 3) { + stableCount++ + if (stableCount >= 5) { + sleep(2000) + def finalSrcCount = sql "select count(*) from ${srcTableName}" + def finalDstCount = sql "select count(*) from ${dstTableName}" + assertEquals(3L, (finalSrcCount[0][0] as Number).longValue()) + assertEquals(3L, (finalDstCount[0][0] as Number).longValue()) + break + } + } else { + stableCount = 0 + if (srcCountValue > 3 || dstCountValue > 3) { + assertEquals(3L, srcCountValue) + assertEquals(3L, dstCountValue) + } + } + if (count >= 120) { + log.error("routine load target table alter can not visible for long time") + assertEquals(3L, srcCountValue) + assertEquals(3L, dstCountValue) + break + } + sleep(1000) + count++ + } + + def srcRows = sql "select k1, k2 from ${srcTableName} order by k1" + def dstRows = sql "select k1, k2 from ${dstTableName} order by k1" + assertEquals(3, srcRows.size()) + assertEquals(3, dstRows.size()) + assertEquals(1, srcRows[0][0]) + assertEquals("eab", srcRows[0][1]) + assertEquals(2, srcRows[1][0]) + assertEquals("eab", srcRows[1][1]) + assertEquals(3, srcRows[2][0]) + assertEquals("eab", srcRows[2][1]) + assertEquals(4, dstRows[0][0]) + assertEquals("eab", dstRows[0][1]) + assertEquals(5, dstRows[1][0]) + assertEquals("eab", dstRows[1][1]) + assertEquals(6, dstRows[2][0]) + assertEquals("eab", dstRows[2][1]) + } finally { + try { + sql "stop routine load for ${alterTargetJob}" + } catch (Exception e) { + logger.warn("failed to stop alter target routine load: ${e.message}".toString()) + } + if (alterTopicProducer != null) { + alterTopicProducer.close() + } + if (alterTopicAdmin != null) { + alterTopicAdmin.close() + } + sql "truncate table ${srcTableName}" + sql "truncate table ${dstTableName}" + } } } From 03cf6269e260ad9bfbb2ce4fa13b26076317f650 Mon Sep 17 00:00:00 2001 From: Refrain Date: Fri, 26 Jun 2026 14:39:11 +0800 Subject: [PATCH 2/2] [fix](fe) Tighten routine load alter validation ### What problem does this PR solve? Issue Number: None Related PR: #64878 Problem Summary: Follow-up review found two issues in the new routine load target-table alter support. First, `AlterRoutineLoadCommand` had an import order regression that could fail FE checkstyle. Second, alter validation only rechecked `PARTIAL_COLUMNS=true` from the current command, which left the effective partial-update state under-validated when the existing job or a `unique_key_update_mode` change required merge-on-write semantics. This change restores import ordering, validates the effective unique key update mode against the destination table, and adds focused FE unit coverage for those cases. ### Release note Routine Load alter now rejects target-table or unique-key-update changes that are incompatible with partial update requirements. ### Check List (For Author) - Test: Unit Test - Behavior changed: Yes - Does this need documentation: No --- .../load/routineload/RoutineLoadJob.java | 3 +- .../commands/AlterRoutineLoadCommand.java | 24 ++++++++++--- .../commands/AlterRoutineLoadCommandTest.java | 34 +++++++++++++++++++ 3 files changed, 55 insertions(+), 6 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index e7c69da5a77488..407ce7ecd86fba 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -19,6 +19,7 @@ import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.ExprToSqlVisitor; +import org.apache.doris.analysis.ImportColumnDesc; import org.apache.doris.analysis.Separator; import org.apache.doris.analysis.ToSqlParams; import org.apache.doris.analysis.UserIdentity; @@ -469,7 +470,7 @@ protected void setRoutineLoadDesc(RoutineLoadDesc routineLoadDesc) { } protected RoutineLoadDesc buildRoutineLoadDescSnapshot() { - List columnsInfo = null; + List columnsInfo = null; if (columnDescs != null && !columnDescs.descs.isEmpty()) { columnsInfo = new ArrayList<>(columnDescs.descs); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommand.java index dce80149607c60..7d392e5d6d6fea 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommand.java @@ -37,6 +37,7 @@ import org.apache.doris.load.routineload.AbstractDataSourceProperties; import org.apache.doris.load.routineload.RoutineLoadDataSourcePropertyFactory; import org.apache.doris.load.routineload.RoutineLoadJob; +import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.commands.info.CreateRoutineLoadInfo; import org.apache.doris.nereids.trees.plans.commands.info.LabelNameInfo; @@ -44,7 +45,7 @@ import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.StmtExecutor; -import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.thrift.TUniqueKeyUpdateMode; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; @@ -363,22 +364,35 @@ private void checkDataSourceProperties() throws UserException { } private void checkPartialUpdate() throws UserException { - if (!isPartialUpdate) { - return; - } RoutineLoadJob job = Env.getCurrentEnv().getRoutineLoadManager() .getJob(getDbName(), getJobName()); + TUniqueKeyUpdateMode uniqueKeyUpdateMode = getEffectiveUniqueKeyUpdateMode(job); + if (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPSERT) { + return; + } if (job.isMultiTable()) { throw new AnalysisException("load by PARTIAL_COLUMNS is not supported in multi-table load."); } Database db = Env.getCurrentInternalCatalog().getDbOrAnalysisException(job.getDbFullName()); String tableName = hasTargetTable() ? targetTableName : job.getTableName(); Table table = db.getTableOrAnalysisException(tableName); - if (isPartialUpdate && !((OlapTable) table).getEnableUniqueKeyMergeOnWrite()) { + if (!((OlapTable) table).getEnableUniqueKeyMergeOnWrite()) { throw new AnalysisException("load by PARTIAL_COLUMNS is only supported in unique table MoW"); } } + private TUniqueKeyUpdateMode getEffectiveUniqueKeyUpdateMode(RoutineLoadJob job) { + if (analyzedJobProperties.containsKey(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE)) { + return TUniqueKeyUpdateMode.valueOf( + analyzedJobProperties.get(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE)); + } + if (analyzedJobProperties.containsKey(CreateRoutineLoadInfo.PARTIAL_COLUMNS) && isPartialUpdate + && job.getUniqueKeyUpdateMode() == TUniqueKeyUpdateMode.UPSERT) { + return TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS; + } + return job.getUniqueKeyUpdateMode(); + } + private void validateTargetTable(ConnectContext ctx, RoutineLoadJob job) throws UserException { if (job.isMultiTable()) { throw new AnalysisException("ALTER ROUTINE LOAD target table change only supports single-table job"); diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommandTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommandTest.java index f52b269bfb79e6..93aa02e72b179f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommandTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommandTest.java @@ -271,4 +271,38 @@ public void testValidateTargetTablePassesTargetTableToJobValidation() throws Exc Assertions.assertDoesNotThrow(() -> command.validate(connectContext)); } + + @Test + public void testValidateTargetTableRejectsExistingFlexiblePartialUpdateOnNonMowTable() throws Exception { + runBefore(); + OlapTable targetTable = Mockito.mock(OlapTable.class); + Mockito.when(targetTable.getType()).thenReturn(Table.TableType.OLAP); + Mockito.when(targetTable.isTemporary()).thenReturn(false); + Mockito.when(targetTable.getEnableUniqueKeyMergeOnWrite()).thenReturn(false); + Mockito.doReturn(targetTable).when(db).getTableOrAnalysisException("testTable2"); + Mockito.when(routineLoadJob.getUniqueKeyUpdateMode()).thenReturn(TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS); + Mockito.doThrow(new AnalysisException("Only unique key merge on write support partial update")) + .when(routineLoadJob).validateTargetTable(Mockito.any(Database.class), Mockito.any(OlapTable.class)); + + AlterRoutineLoadCommand command = (AlterRoutineLoadCommand) PARSER.parseSingle( + "ALTER ROUTINE LOAD FOR testDb.label1 ON testTable2"); + + Assertions.assertTrue(Assertions.assertThrows(Exception.class, () -> command.validate(connectContext)) + .getMessage().contains("partial update")); + } + + @Test + public void testValidateRejectsUniqueKeyUpdateModeOnNonMowTable() { + runBefore(); + Mockito.when(routineLoadJob.getUniqueKeyUpdateMode()).thenReturn(TUniqueKeyUpdateMode.UPSERT); + Mockito.when(currentTable.getEnableUniqueKeyMergeOnWrite()).thenReturn(false); + Map jobProperties = Maps.newHashMap(); + jobProperties.put(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE, "UPDATE_FIXED_COLUMNS"); + + AlterRoutineLoadCommand command = new AlterRoutineLoadCommand( + new LabelNameInfo("testDb", "label1"), jobProperties, Maps.newHashMap()); + + Assertions.assertTrue(Assertions.assertThrows(Exception.class, () -> command.validate(connectContext)) + .getMessage().contains("PARTIAL_COLUMNS")); + } }