Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ExprToSqlVisitor;
import org.apache.doris.analysis.ImportColumnDesc;
import org.apache.doris.analysis.Separator;
import org.apache.doris.analysis.ToSqlParams;
import org.apache.doris.analysis.UserIdentity;
Expand Down Expand Up @@ -468,6 +469,31 @@ protected void setRoutineLoadDesc(RoutineLoadDesc routineLoadDesc) {
}
}

protected RoutineLoadDesc buildRoutineLoadDescSnapshot() {
List<ImportColumnDesc> columnsInfo = null;
if (columnDescs != null && !columnDescs.descs.isEmpty()) {
columnsInfo = new ArrayList<>(columnDescs.descs);
}
return new RoutineLoadDesc(columnSeparator, lineDelimiter, columnsInfo, precedingFilter, whereExpr,
partitionNamesInfo, deleteCondition, mergeType, sequenceCol);
}

public void validateTargetTable(Database db, OlapTable targetTable) throws UserException {
if (isMultiTable) {
throw new AnalysisException("ALTER ROUTINE LOAD target table change only supports single-table job");
}
checkMeta(targetTable, buildRoutineLoadDescSnapshot());

targetTable.readLock();
try {
NereidsStreamLoadPlanner planner = new NereidsStreamLoadPlanner(db, targetTable,
toNereidsRoutineLoadTaskInfo());
planner.plan(new TUniqueId(0, 0));
} finally {
targetTable.readUnlock();
}
}

@Override
public long getId() {
return id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -757,9 +757,12 @@ public void modifyProperties(AlterRoutineLoadCommand command) throws UserExcepti
}

modifyPropertiesInternal(jobProperties, dataSourceProperties);
if (command.hasTargetTable()) {
this.tableId = command.getTargetTableId();
}

AlterRoutineLoadJobOperationLog log = new AlterRoutineLoadJobOperationLog(this.id,
jobProperties, dataSourceProperties);
jobProperties, dataSourceProperties, command.getTargetTableId());
Env.getCurrentEnv().getEditLog().logAlterRoutineLoadJob(log);
} finally {
writeUnlock();
Expand Down Expand Up @@ -883,6 +886,9 @@ private void resetCloudProgress(Cloud.ResetRLProgressRequest.Builder builder) th
public void replayModifyProperties(AlterRoutineLoadJobOperationLog log) {
try {
modifyPropertiesInternal(log.getJobProperties(), (KafkaDataSourceProperties) log.getDataSourceProperties());
if (log.getTargetTableId() != 0) {
this.tableId = log.getTargetTableId();
}
} catch (UserException e) {
// should not happen
LOG.error("failed to replay modify kafka routine load job: {}", id, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -689,9 +689,12 @@ public void modifyProperties(AlterRoutineLoadCommand command) throws UserExcepti
}

modifyPropertiesInternal(jobProperties, dataSourceProperties);
if (command.hasTargetTable()) {
this.tableId = command.getTargetTableId();
}

AlterRoutineLoadJobOperationLog log = new AlterRoutineLoadJobOperationLog(this.id,
jobProperties, dataSourceProperties);
jobProperties, dataSourceProperties, command.getTargetTableId());
Env.getCurrentEnv().getEditLog().logAlterRoutineLoadJob(log);
} finally {
writeUnlock();
Expand Down Expand Up @@ -785,6 +788,9 @@ public void replayModifyProperties(AlterRoutineLoadJobOperationLog log) {
try {
modifyPropertiesInternal(log.getJobProperties(),
(KinesisDataSourceProperties) log.getDataSourceProperties());
if (log.getTargetTableId() != 0) {
this.tableId = log.getTargetTableId();
}
} catch (UserException e) {
LOG.error("failed to replay modify kinesis routine load job: {}", id, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9284,6 +9284,10 @@ public LogicalPlan visitAlterRoutineLoad(DorisParser.AlterRoutineLoadContext ctx
}
LabelNameInfo labelNameInfo = new LabelNameInfo(dbName, jobName);

if (ctx.table != null) {
return new AlterRoutineLoadCommand(labelNameInfo, ctx.table.getText());
}

Map<String, String> properties = new HashMap<>();
if (ctx.properties != null) {
properties.putAll(visitPropertyClause(ctx.properties));
Expand All @@ -9309,7 +9313,8 @@ public LogicalPlan visitAlterRoutineLoad(DorisParser.AlterRoutineLoadContext ctx
}
}

return new AlterRoutineLoadCommand(labelNameInfo, loadPropertyMap, properties, dataSourceMapProperties);
return new AlterRoutineLoadCommand(labelNameInfo, null,
loadPropertyMap, properties, dataSourceMapProperties);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,31 @@
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.RandomDistributionInfo;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.datasource.property.fileformat.CsvFileFormatProperties;
import org.apache.doris.datasource.property.fileformat.JsonFileFormatProperties;
import org.apache.doris.load.RoutineLoadDesc;
import org.apache.doris.load.routineload.AbstractDataSourceProperties;
import org.apache.doris.load.routineload.RoutineLoadDataSourcePropertyFactory;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.commands.info.CreateRoutineLoadInfo;
import org.apache.doris.nereids.trees.plans.commands.info.LabelNameInfo;
import org.apache.doris.nereids.trees.plans.commands.load.LoadProperty;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.thrift.TUniqueKeyUpdateMode;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -85,10 +91,12 @@ public class AlterRoutineLoadCommand extends AlterCommand {
.build();

private final LabelNameInfo labelNameInfo;
private final String targetTableName;
private final Map<String, LoadProperty> loadPropertyMap;
private RoutineLoadDesc routineLoadDesc;
private final Map<String, String> jobProperties;
private final Map<String, String> dataSourceMapProperties;
private long targetTableId;
private boolean isPartialUpdate;

// save analyzed job properties.
Expand All @@ -100,6 +108,7 @@ public class AlterRoutineLoadCommand extends AlterCommand {
* AlterRoutineLoadCommand
*/
public AlterRoutineLoadCommand(LabelNameInfo labelNameInfo,
String targetTableName,
Map<String, LoadProperty> loadPropertyMap,
Map<String, String> jobProperties,
Map<String, String> dataSourceMapProperties) {
Expand All @@ -108,6 +117,7 @@ public AlterRoutineLoadCommand(LabelNameInfo labelNameInfo,
Objects.requireNonNull(jobProperties, "jobProperties is null");
Objects.requireNonNull(dataSourceMapProperties, "dataSourceMapProperties is null");
this.labelNameInfo = labelNameInfo;
this.targetTableName = targetTableName;
this.loadPropertyMap = loadPropertyMap == null ? Maps.newHashMap() : loadPropertyMap;
this.jobProperties = jobProperties;
this.dataSourceMapProperties = dataSourceMapProperties;
Expand All @@ -118,7 +128,11 @@ public AlterRoutineLoadCommand(LabelNameInfo labelNameInfo,
public AlterRoutineLoadCommand(LabelNameInfo labelNameInfo,
Map<String, String> jobProperties,
Map<String, String> dataSourceMapProperties) {
this(labelNameInfo, Maps.newHashMap(), jobProperties, dataSourceMapProperties);
this(labelNameInfo, null, Maps.newHashMap(), jobProperties, dataSourceMapProperties);
}

public AlterRoutineLoadCommand(LabelNameInfo labelNameInfo, String targetTableName) {
this(labelNameInfo, targetTableName, Maps.newHashMap(), Maps.newHashMap(), Maps.newHashMap());
}

public String getDbName() {
Expand All @@ -133,6 +147,18 @@ public Map<String, String> getAnalyzedJobProperties() {
return analyzedJobProperties;
}

public boolean hasTargetTable() {
return !StringUtil.isEmpty(targetTableName);
}

public String getTargetTableName() {
return targetTableName;
}

public long getTargetTableId() {
return targetTableId;
}

public boolean hasDataSourceProperty() {
return MapUtils.isNotEmpty(dataSourceMapProperties);
}
Expand Down Expand Up @@ -161,18 +187,27 @@ public void doRun(ConnectContext ctx, StmtExecutor executor) throws Exception {
public void validate(ConnectContext ctx) throws UserException {
labelNameInfo.validate(ctx);
FeNameFormat.checkCommonName(NAME_TYPE, labelNameInfo.getLabel());
if (hasTargetTable() && (MapUtils.isNotEmpty(loadPropertyMap) || MapUtils.isNotEmpty(jobProperties)
|| MapUtils.isNotEmpty(dataSourceMapProperties))) {
throw new AnalysisException("ALTER ROUTINE LOAD target table change does not support other properties");
}
// check routine load job properties include desired concurrent number etc.
checkJobProperties();
// check load properties
RoutineLoadJob job = Env.getCurrentEnv().getRoutineLoadManager()
.getJob(getDbName(), getJobName());
this.routineLoadDesc = CreateRoutineLoadInfo.checkLoadProperties(ctx, loadPropertyMap,
job.getDbFullName(), job.getTableName(), job.isMultiTable(), job.getMergeType());
if (MapUtils.isNotEmpty(loadPropertyMap)) {
this.routineLoadDesc = CreateRoutineLoadInfo.checkLoadProperties(ctx, loadPropertyMap,
job.getDbFullName(), job.getTableName(), job.isMultiTable(), job.getMergeType());
}
// check data source properties
checkDataSourceProperties();
if (hasTargetTable()) {
validateTargetTable(ctx, job);
}
checkPartialUpdate();
if (analyzedJobProperties.isEmpty() && MapUtils.isEmpty(dataSourceMapProperties)
&& routineLoadDesc == null) {
&& MapUtils.isEmpty(loadPropertyMap) && !hasTargetTable()) {
throw new AnalysisException("No properties are specified");
}
}
Expand Down Expand Up @@ -329,21 +364,61 @@ private void checkDataSourceProperties() throws UserException {
}

private void checkPartialUpdate() throws UserException {
if (!isPartialUpdate) {
RoutineLoadJob job = Env.getCurrentEnv().getRoutineLoadManager()
.getJob(getDbName(), getJobName());
TUniqueKeyUpdateMode uniqueKeyUpdateMode = getEffectiveUniqueKeyUpdateMode(job);
if (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPSERT) {
return;
}
RoutineLoadJob job = Env.getCurrentEnv().getRoutineLoadManager()
.getJob(getDbName(), getDbName());
if (job.isMultiTable()) {
throw new AnalysisException("load by PARTIAL_COLUMNS is not supported in multi-table load.");
}
Database db = Env.getCurrentInternalCatalog().getDbOrAnalysisException(job.getDbFullName());
Table table = db.getTableOrAnalysisException(job.getTableName());
if (isPartialUpdate && !((OlapTable) table).getEnableUniqueKeyMergeOnWrite()) {
String tableName = hasTargetTable() ? targetTableName : job.getTableName();
Table table = db.getTableOrAnalysisException(tableName);
if (!((OlapTable) table).getEnableUniqueKeyMergeOnWrite()) {
throw new AnalysisException("load by PARTIAL_COLUMNS is only supported in unique table MoW");
}
}

private TUniqueKeyUpdateMode getEffectiveUniqueKeyUpdateMode(RoutineLoadJob job) {
if (analyzedJobProperties.containsKey(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE)) {
return TUniqueKeyUpdateMode.valueOf(
analyzedJobProperties.get(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE));
}
if (analyzedJobProperties.containsKey(CreateRoutineLoadInfo.PARTIAL_COLUMNS) && isPartialUpdate
&& job.getUniqueKeyUpdateMode() == TUniqueKeyUpdateMode.UPSERT) {
return TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS;
}
return job.getUniqueKeyUpdateMode();
}

private void validateTargetTable(ConnectContext ctx, RoutineLoadJob job) throws UserException {
if (job.isMultiTable()) {
throw new AnalysisException("ALTER ROUTINE LOAD target table change only supports single-table job");
}
Database db = Env.getCurrentInternalCatalog().getDbOrAnalysisException(job.getDbFullName());
Table table = db.getTableOrAnalysisException(targetTableName);
if (!(table instanceof OlapTable)) {
throw new AnalysisException("ALTER ROUTINE LOAD target table only supports OLAP table");
}
OlapTable olapTable = (OlapTable) table;
if (olapTable.isTemporary()) {
throw new AnalysisException("Do not support load for temporary table " + olapTable.getDisplayName());
}
if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ctx, InternalCatalog.INTERNAL_CATALOG_NAME,
job.getDbFullName(), targetTableName, PrivPredicate.LOAD)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD",
ctx.getQualifiedUser(), ctx.getRemoteIP(), job.getDbFullName() + ": " + targetTableName);
}
if (job.isLoadToSingleTablet()
&& !(olapTable.getDefaultDistributionInfo() instanceof RandomDistributionInfo)) {
throw new AnalysisException("if load_to_single_tablet set to true, the olap table must be with random distribution");
}
job.validateTargetTable(db, olapTable);
this.targetTableId = olapTable.getId();
}

@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitAlterRoutineLoadCommand(this, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,20 @@ public class AlterRoutineLoadJobOperationLog implements Writable {
private Map<String, String> jobProperties;
@SerializedName(value = "dataSourceProperties")
private AbstractDataSourceProperties dataSourceProperties;
@SerializedName(value = "targetTableId")
private long targetTableId;

public AlterRoutineLoadJobOperationLog(long jobId, Map<String, String> jobProperties,
AbstractDataSourceProperties dataSourceProperties) {
this(jobId, jobProperties, dataSourceProperties, 0L);
}

public AlterRoutineLoadJobOperationLog(long jobId, Map<String, String> jobProperties,
AbstractDataSourceProperties dataSourceProperties, long targetTableId) {
this.jobId = jobId;
this.jobProperties = jobProperties;
this.dataSourceProperties = dataSourceProperties;
this.targetTableId = targetTableId;
}

public long getJobId() {
Expand All @@ -57,6 +65,10 @@ public AbstractDataSourceProperties getDataSourceProperties() {
return dataSourceProperties;
}

public long getTargetTableId() {
return targetTableId;
}

public static AlterRoutineLoadJobOperationLog read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, AlterRoutineLoadJobOperationLog.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.doris.nereids.trees.plans.commands.info.LabelNameInfo;
import org.apache.doris.nereids.trees.plans.commands.load.LoadProperty;
import org.apache.doris.nereids.trees.plans.commands.load.LoadSeparator;
import org.apache.doris.persist.AlterRoutineLoadJobOperationLog;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TResourceInfo;

Expand Down Expand Up @@ -415,6 +416,24 @@ public void testFromCreateStmt() throws UserException {
}
}

@Test
public void testReplayModifyPropertiesSwitchesTargetTableWithoutResettingProgress() {
KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", 1L,
101L, "127.0.0.1:9020", "topic1", UserIdentity.ADMIN);
Map<Integer, Long> partitionToOffset = Maps.newHashMap();
partitionToOffset.put(1, 123L);
KafkaProgress progress = new KafkaProgress(partitionToOffset);
Deencapsulation.setField(routineLoadJob, "progress", progress);

AlterRoutineLoadJobOperationLog log = new AlterRoutineLoadJobOperationLog(1L,
Maps.newHashMap(), null, 202L);
routineLoadJob.replayModifyProperties(log);

Assert.assertEquals(202L, routineLoadJob.getTableId());
Assert.assertSame(progress, routineLoadJob.getProgress());
Assert.assertEquals(Long.valueOf(123L), ((KafkaProgress) routineLoadJob.getProgress()).getOffsetByPartition(1));
}

private CreateRoutineLoadInfo initCreateRoutineLoadInfo() {
Map<String, String> properties = Maps.newHashMap();
properties.put(CreateRoutineLoadInfo.DESIRED_CONCURRENT_NUMBER_PROPERTY, "2");
Expand Down
Loading
Loading