Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
2e2eea8
close #17748
Feb 27, 2026
8c8429c
fix: prevent circular dependency and wrong startNodes in backfill dep…
Feb 27, 2026
9321ce0
refactor: replace ThreadLocal with explicit parameter for cycle detec…
Feb 27, 2026
e05bb03
[Chore] Apply spotless formatting for backfill workflow executor files
Mar 23, 2026
2579df7
fix review
Mar 23, 2026
4584651
fix(api): isolate visited codes per parallel backfill chunk
Mar 25, 2026
72164a8
fix(api): remove redundant dependent-trigger testing seam
Mar 25, 2026
7cbf200
fix(api): propagate ZonedDateTime chunk into dependent backfill
Mar 26, 2026
5a83f5c
test(api): remove unit tests for private dependent backfill
Mar 26, 2026
01ba88d
fix(api): align dependent routing and prevent double-splitting
Mar 26, 2026
542f0cb
test(api): add dependent backfill traversal branches
Mar 26, 2026
526ac69
Update dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler…
det101 Mar 26, 2026
c3a50f2
fix(api): aggregate dependent nodes per downstream workflow
Mar 26, 2026
abf142a
chore(api): initialize visitedCodes at execute entry
Mar 26, 2026
66d7817
Apply suggestion from @Copilot
det101 Mar 26, 2026
f81fde1
Update dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler…
det101 Mar 26, 2026
08cdc10
Update dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler…
det101 Mar 26, 2026
4503093
fix: handle null/zero expectedParallelismNumber to prevent Arithmetic…
Mar 26, 2026
f346fb3
fix: cap parallelism to listDate size and route dependent backfill th…
Mar 26, 2026
2775ed5
refactor: revert parallelism guard to original if/else style and shar…
Mar 26, 2026
351d12b
docs: clarify force-SERIAL rationale and recursion depth assumption
Mar 26, 2026
69a4ac2
Refactor: translate all code comments to English and unify test logic…
Mar 26, 2026
1a53393
fix spotless
Mar 27, 2026
37ddf8a
fix spotless
Mar 27, 2026
a8aac0d
Fix JDK version syntax compatibility issues
Mar 30, 2026
fc9fd1e
Merge branch 'dev' into DSIP-95
SbloodyS Mar 31, 2026
585fc36
Merge branch 'dev' into DSIP-95
SbloodyS Apr 7, 2026
364e7bc
Merge dev into DSIP-95
Apr 7, 2026
725f180
Refactor dependent backfill into resolve and trigger methods.
Apr 8, 2026
fb41680
Merge branch 'dev' into DSIP-95
det101 Apr 9, 2026
308a6c2
Implement downstream workflow resolution with optional offline filter…
Apr 10, 2026
3ab0433
Merge branch 'dev' into DSIP-95
SbloodyS Apr 13, 2026
5bc37c5
fix spotless
Apr 14, 2026
0fa16ce
Merge branch 'DSIP-95' of github.com:det101/dolphinscheduler into pr-…
Apr 14, 2026
57f2ef9
Merge branch 'dev' into DSIP-95
ruanwenjun Apr 14, 2026
c8c5443
Update dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler…
det101 Apr 15, 2026
9c1c3f9
fix spotless
Apr 15, 2026
de1a4d5
Merge branch 'dev' into DSIP-95
ruanwenjun Apr 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.api.executor.workflow;

import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.WorkflowLineageService;
import org.apache.dolphinscheduler.api.validator.workflow.BackfillWorkflowDTO;
import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
import org.apache.dolphinscheduler.common.enums.ExecutionOrder;
Expand Down Expand Up @@ -49,17 +50,57 @@
@Component
public class BackfillWorkflowExecutorDelegate implements IExecutorDelegate<BackfillWorkflowDTO, List<Integer>> {

@Autowired
private WorkflowLineageService workflowLineageService;

@Autowired

Check warning on line 56 in dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this field injection and use constructor injection instead.

See more on https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&issues=AZ0Y58rTS-WNYeg2Pm0M&open=AZ0Y58rTS-WNYeg2Pm0M&pullRequest=18003
private RegistryClient registryClient;

@Override
public List<Integer> execute(final BackfillWorkflowDTO backfillWorkflowDTO) {
return executeWithDependentExpansion(backfillWorkflowDTO);
}

/**
* Expands optional downstream workflows, then submits root and each downstream in list order.
* <p>
* {@link RunMode} (serial vs parallel date sharding) is taken only from the <strong>root</strong>
* {@code backfillWorkflowDTO}'s {@link BackfillWorkflowDTO.BackfillParamsDTO#getRunMode()}; downstream DTOs
* mirror the same mode in their params for consistency.
*/
List<Integer> executeWithDependentExpansion(final BackfillWorkflowDTO backfillWorkflowDTO) {
// todo: directly call the master api to do backfill
List<BackfillWorkflowDTO> dependentBackfillDtos = new ArrayList<>();
dependentBackfillDtos.add(backfillWorkflowDTO);
if (backfillWorkflowDTO.getBackfillParams()
.getBackfillDependentMode() == ComplementDependentMode.ALL_DEPENDENT) {

List<WorkflowDefinition> downstreamWorkflowList =
workflowLineageService.resolveDownstreamWorkflowDefinitionCodes(
backfillWorkflowDTO.getWorkflowDefinition().getCode(),
backfillWorkflowDTO.getBackfillParams().isAllLevelDependent(),
true);
if (downstreamWorkflowList.isEmpty()) {
log.info("No downstream dependent workflows found for workflow code {}",
backfillWorkflowDTO.getWorkflowDefinition().getCode());
} else {
dependentBackfillDtos.addAll(buildResolvedDownstreamBackfillDtos(backfillWorkflowDTO,
backfillWorkflowDTO.getBackfillParams().getBackfillDateList(),
downstreamWorkflowList));
}
}
List<Integer> workflowInstanceIdList = new ArrayList<>();
// RunMode is defined by the root request only (not per downstream DTO).
if (backfillWorkflowDTO.getBackfillParams().getRunMode() == RunMode.RUN_MODE_SERIAL) {
return doSerialBackfillWorkflow(backfillWorkflowDTO);
for (BackfillWorkflowDTO dependentDto : dependentBackfillDtos) {
workflowInstanceIdList.addAll(doSerialBackfillWorkflow(dependentDto));
}
} else {
return doParallelBackfillWorkflow(backfillWorkflowDTO);
for (BackfillWorkflowDTO dependentDto : dependentBackfillDtos) {
workflowInstanceIdList.addAll(doParallelBackfillWorkflow(dependentDto));
}
}
return workflowInstanceIdList;
}
Comment thread
det101 marked this conversation as resolved.

private List<Integer> doSerialBackfillWorkflow(final BackfillWorkflowDTO backfillWorkflowDTO) {
Expand All @@ -71,9 +112,7 @@
Collections.sort(backfillTimeList);
}

final Integer workflowInstanceId = doBackfillWorkflow(
backfillWorkflowDTO,
backfillTimeList.stream().map(DateUtils::dateToString).collect(Collectors.toList()));
final Integer workflowInstanceId = doBackfillWorkflow(backfillWorkflowDTO, backfillTimeList);
return Lists.newArrayList(workflowInstanceId);
}

Expand All @@ -92,8 +131,7 @@
final List<Integer> workflowInstanceIdList = Lists.newArrayList();
for (List<ZonedDateTime> stringDate : splitDateTime(listDate, expectedParallelismNumber)) {
final Integer workflowInstanceId = doBackfillWorkflow(
backfillWorkflowDTO,
stringDate.stream().map(DateUtils::dateToString).collect(Collectors.toList()));
backfillWorkflowDTO, stringDate);
workflowInstanceIdList.add(workflowInstanceId);
}
return workflowInstanceIdList;
Expand Down Expand Up @@ -124,12 +162,15 @@
}

private Integer doBackfillWorkflow(final BackfillWorkflowDTO backfillWorkflowDTO,
final List<String> backfillTimeList) {
final List<ZonedDateTime> backfillDateTimes) {
final Server masterServer = registryClient.getRandomServer(RegistryNodeType.MASTER).orElse(null);
if (masterServer == null) {
throw new ServiceException("no master server available");
}

final List<String> backfillTimeList =
backfillDateTimes.stream().map(DateUtils::dateToString).collect(Collectors.toList());

final WorkflowDefinition workflowDefinition = backfillWorkflowDTO.getWorkflowDefinition();
final WorkflowBackfillTriggerRequest backfillTriggerRequest = WorkflowBackfillTriggerRequest.builder()
.userId(backfillWorkflowDTO.getLoginUser().getId())
Expand All @@ -149,22 +190,76 @@
.dryRun(backfillWorkflowDTO.getDryRun())
.build();

final WorkflowBackfillTriggerResponse backfillTriggerResponse = Clients
.withService(IWorkflowControlClient.class)
.withHost(masterServer.getHost() + ":" + masterServer.getPort())
.backfillTriggerWorkflow(backfillTriggerRequest);
final WorkflowBackfillTriggerResponse backfillTriggerResponse =
triggerBackfillWorkflow(backfillTriggerRequest, masterServer);
if (!backfillTriggerResponse.isSuccess()) {
throw new ServiceException("Backfill workflow failed: " + backfillTriggerResponse.getMessage());
}
final BackfillWorkflowDTO.BackfillParamsDTO backfillParams = backfillWorkflowDTO.getBackfillParams();
if (backfillParams.getBackfillDependentMode() == ComplementDependentMode.ALL_DEPENDENT) {
doBackfillDependentWorkflow(backfillWorkflowDTO, backfillTimeList);
}
return backfillTriggerResponse.getWorkflowInstanceId();
}

private void doBackfillDependentWorkflow(final BackfillWorkflowDTO backfillWorkflowDTO,
final List<String> backfillTimeList) {
// todo:
protected WorkflowBackfillTriggerResponse triggerBackfillWorkflow(final WorkflowBackfillTriggerRequest request,
final Server masterServer) {
return Clients
.withService(IWorkflowControlClient.class)
.withHost(masterServer.getHost() + ":" + masterServer.getPort())
.backfillTriggerWorkflow(request);
}

/**
* Builds {@link BackfillWorkflowDTO} list for resolved downstream workflows.
* {@link RunMode} in each downstream {@link BackfillWorkflowDTO.BackfillParamsDTO} matches the root (see
* {@link #executeWithDependentExpansion(BackfillWorkflowDTO)}).
*/
private List<BackfillWorkflowDTO> buildResolvedDownstreamBackfillDtos(final BackfillWorkflowDTO backfillWorkflowDTO,
final List<ZonedDateTime> backfillDateTimes,
final List<WorkflowDefinition> downstreamWorkflows) {
final long upstreamWorkflowCode = backfillWorkflowDTO.getWorkflowDefinition().getCode();
final List<ZonedDateTime> upstreamBackfillDates = new ArrayList<>(backfillDateTimes);
final BackfillWorkflowDTO.BackfillParamsDTO originalParams = backfillWorkflowDTO.getBackfillParams();
final boolean allLevelDependent = originalParams.isAllLevelDependent();

final List<BackfillWorkflowDTO> result = new ArrayList<>();
for (WorkflowDefinition downstreamWorkflow : downstreamWorkflows) {
final long downstreamCode = downstreamWorkflow.getCode();

final BackfillWorkflowDTO.BackfillParamsDTO dependentParams =
BackfillWorkflowDTO.BackfillParamsDTO.builder()
// Same as root; executor also branches on root RunMode only.
.runMode(originalParams.getRunMode())
.backfillDateList(upstreamBackfillDates)
.expectedParallelismNumber(originalParams.getExpectedParallelismNumber())
// Downstream expansion has already been decided in resolution stage.
.backfillDependentMode(ComplementDependentMode.OFF_MODE)
.allLevelDependent(allLevelDependent)
.executionOrder(originalParams.getExecutionOrder())
.build();

final BackfillWorkflowDTO dependentBackfillDTO = BackfillWorkflowDTO.builder()
.loginUser(backfillWorkflowDTO.getLoginUser())
.workflowDefinition(downstreamWorkflow)
.startNodes(null)
.failureStrategy(backfillWorkflowDTO.getFailureStrategy())
.taskDependType(backfillWorkflowDTO.getTaskDependType())
.execType(backfillWorkflowDTO.getExecType())
.warningType(backfillWorkflowDTO.getWarningType())
.warningGroupId(downstreamWorkflow.getWarningGroupId())
.runMode(dependentParams.getRunMode())
.workflowInstancePriority(backfillWorkflowDTO.getWorkflowInstancePriority())
.workerGroup(backfillWorkflowDTO.getWorkerGroup())
.tenantCode(backfillWorkflowDTO.getTenantCode())
.environmentCode(backfillWorkflowDTO.getEnvironmentCode())
.startParamList(backfillWorkflowDTO.getStartParamList())
.dryRun(backfillWorkflowDTO.getDryRun())
.backfillParams(dependentParams)
.build();

log.info("Built dependent backfill DTO for workflow {} (upstream {}) with backfill dates {}",
downstreamCode, upstreamWorkflowCode,
backfillDateTimes.stream().map(DateUtils::dateToString).collect(Collectors.toList()));

result.add(dependentBackfillDTO);
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.dolphinscheduler.dao.entity.DependentWorkflowDefinition;
import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
import org.apache.dolphinscheduler.dao.entity.WorkFlowRelationDetail;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
import org.apache.dolphinscheduler.dao.entity.WorkflowTaskLineage;

import java.util.List;
Expand All @@ -42,6 +43,37 @@ public interface WorkflowLineageService {
*/
List<DependentWorkflowDefinition> queryDownstreamDependentWorkflowDefinitions(Long workflowDefinitionCode);

/**
* Resolve downstream workflow definitions that depend on the given root workflow, using stored lineage.
* <ul>
* <li>{@code allLevelDependent == false}: only direct dependents of the root.</li>
* <li>{@code allLevelDependent == true}: transitive dependents (BFS over direct dependents), skipping the root
* when it reappears as an edge target (cycle back to root).</li>
* </ul>
*
* @param rootWorkflowDefinitionCode workflow to start from (not included in the result)
* @param allLevelDependent {@code true} for transitive closure, {@code false} for one hop only
* @return ordered distinct downstream workflow definitions (stable order: BFS / insertion order)
*/
default List<WorkflowDefinition> resolveDownstreamWorkflowDefinitionCodes(long rootWorkflowDefinitionCode,
boolean allLevelDependent) {
return resolveDownstreamWorkflowDefinitionCodes(rootWorkflowDefinitionCode, allLevelDependent, false);
}

/**
* Resolve downstream workflow definitions and optionally filter offline workflows.
* When {@code filterOfflineWorkflow} is true, offline workflow definitions are excluded from the result and are
* not expanded further during transitive traversal.
*
* @param rootWorkflowDefinitionCode workflow to start from (not included in the result)
* @param allLevelDependent {@code true} for transitive closure, {@code false} for one hop only
* @param filterOfflineWorkflow whether offline workflows should be filtered out during traversal
* @return ordered distinct downstream workflow definitions (stable order: BFS / insertion order)
*/
List<WorkflowDefinition> resolveDownstreamWorkflowDefinitionCodes(long rootWorkflowDefinitionCode,
boolean allLevelDependent,
boolean filterOfflineWorkflow);

/**
* Query and return tasks dependence with string format, is a wrapper of queryTaskDepOnTask and task query method.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.WorkflowLineageService;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.dao.entity.DependentLineageTask;
import org.apache.dolphinscheduler.dao.entity.DependentWorkflowDefinition;
import org.apache.dolphinscheduler.dao.entity.Project;
Expand All @@ -37,9 +38,15 @@

import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -319,6 +326,112 @@
return dependentLineageTaskList;
}

@Override
public List<WorkflowDefinition> resolveDownstreamWorkflowDefinitionCodes(long rootWorkflowDefinitionCode,

Check failure on line 330 in dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowLineageServiceImpl.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 40 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&issues=AZ2LRs_b2Y5X5wifwlKf&open=AZ2LRs_b2Y5X5wifwlKf&pullRequest=18003
boolean allLevelDependent,
boolean filterOfflineWorkflow) {

Set<Long> resultCodes = new LinkedHashSet<>();
Set<Long> visitedWorkflowCodes = new HashSet<>();
List<Long> frontier = Collections.singletonList(rootWorkflowDefinitionCode);
Map<Long, WorkflowDefinition> workflowDefinitionCache = new HashMap<>();

while (!CollectionUtils.isEmpty(frontier)) {
List<Long> currentLevel = new ArrayList<>(frontier);
frontier = new ArrayList<>();

for (Long upstreamCode : currentLevel) {

Check warning on line 343 in dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowLineageServiceImpl.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Reduce the total number of break and continue statements in this loop to use at most one.

See more on https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&issues=AZ2LRs_b2Y5X5wifwlKe&open=AZ2LRs_b2Y5X5wifwlKe&pullRequest=18003
if (!visitedWorkflowCodes.add(upstreamCode)) {
continue;
}
List<Long> directDownstreamCodes = queryDirectDownstreamWorkflowCodes(upstreamCode);
if (CollectionUtils.isEmpty(directDownstreamCodes)) {
continue;
}

cacheWorkflowDefinitions(directDownstreamCodes, workflowDefinitionCache);
for (Long downstreamCode : directDownstreamCodes) {

Check warning on line 353 in dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowLineageServiceImpl.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Reduce the total number of break and continue statements in this loop to use at most one.

See more on https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&issues=AZ2LRs_b2Y5X5wifwlKd&open=AZ2LRs_b2Y5X5wifwlKd&pullRequest=18003
if (downstreamCode == rootWorkflowDefinitionCode) {
// Skip cycle edge back to root; root should never appear in downstream result.
continue;
}
WorkflowDefinition downstreamWorkflow = workflowDefinitionCache.get(downstreamCode);
if (downstreamWorkflow == null) {
continue;
}
if (!resultCodes.add(downstreamCode)) {
continue;
}
if (allLevelDependent && (!filterOfflineWorkflow
|| downstreamWorkflow.getReleaseState() != ReleaseState.OFFLINE)) {
frontier.add(downstreamCode);
}
}
}

if (!allLevelDependent) {
break;
}
}

if (resultCodes.isEmpty()) {
return Collections.emptyList();
}

if (filterOfflineWorkflow) {
resultCodes = resultCodes.stream()
.filter(code -> {
WorkflowDefinition definition = workflowDefinitionCache.get(code);
return definition != null
&& definition.getReleaseState() != ReleaseState.OFFLINE;
})
.collect(Collectors.toCollection(LinkedHashSet::new));
if (resultCodes.isEmpty()) {
return Collections.emptyList();
}
}

List<WorkflowDefinition> orderedDefinitions = new ArrayList<>();
for (Long code : resultCodes) {
WorkflowDefinition definition = workflowDefinitionCache.get(code);
if (definition != null) {
orderedDefinitions.add(definition);
}
}
return orderedDefinitions;
}

private List<Long> queryDirectDownstreamWorkflowCodes(long upstreamWorkflowDefinitionCode) {
List<WorkflowTaskLineage> workflowTaskLineageList =
workflowTaskLineageDao.queryWorkFlowLineageByDept(Constants.DEFAULT_PROJECT_CODE,
upstreamWorkflowDefinitionCode,
Constants.DEPENDENT_ALL_TASK);
if (CollectionUtils.isEmpty(workflowTaskLineageList)) {
return Collections.emptyList();
}
return workflowTaskLineageList.stream()
.map(WorkflowTaskLineage::getWorkflowDefinitionCode)
.distinct()
.collect(Collectors.toList());
}

private void cacheWorkflowDefinitions(Collection<Long> workflowDefinitionCodes,
Map<Long, WorkflowDefinition> workflowDefinitionCache) {
List<Long> missingCodes = workflowDefinitionCodes.stream()
.filter(code -> !workflowDefinitionCache.containsKey(code))
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(missingCodes)) {
return;
}
List<WorkflowDefinition> workflowDefinitions = workflowDefinitionMapper.queryByCodes(missingCodes);
if (CollectionUtils.isEmpty(workflowDefinitions)) {
return;
}
for (WorkflowDefinition workflowDefinition : workflowDefinitions) {
workflowDefinitionCache.put(workflowDefinition.getCode(), workflowDefinition);
}
}

@Override
public int updateWorkflowLineage(long workflowDefinitionCode, List<WorkflowTaskLineage> workflowTaskLineages) {
// Remove existing lineage first to keep data consistent
Expand Down
Loading
Loading