Skip to content

Commit 725f180

Browse files
author
luxl
committed
Refactor dependent backfill into resolve and trigger methods.
Split downstream discovery from execution, add transitive resolution coverage, and keep parallel backfill triggering downstream dates by isolating visited state per chunk. Made-with: Cursor
1 parent 364e7bc commit 725f180

2 files changed

Lines changed: 232 additions & 73 deletions

File tree

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java

Lines changed: 96 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,12 @@
3737
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
3838

3939
import java.time.ZonedDateTime;
40+
import java.util.ArrayDeque;
4041
import java.util.ArrayList;
4142
import java.util.Collections;
43+
import java.util.Deque;
4244
import java.util.HashSet;
45+
import java.util.LinkedHashSet;
4346
import java.util.List;
4447
import java.util.Map;
4548
import java.util.Set;
@@ -177,7 +180,8 @@ private Integer doBackfillWorkflow(final BackfillWorkflowDTO backfillWorkflowDTO
177180
}
178181
final BackfillWorkflowDTO.BackfillParamsDTO backfillParams = backfillWorkflowDTO.getBackfillParams();
179182
if (backfillParams.getBackfillDependentMode() == ComplementDependentMode.ALL_DEPENDENT) {
180-
final Set<Long> effectiveVisitedCodes = visitedCodes == null ? new HashSet<>() : visitedCodes;
183+
final Set<Long> effectiveVisitedCodes =
184+
visitedCodes == null ? new HashSet<>() : new HashSet<>(visitedCodes);
181185
effectiveVisitedCodes.add(backfillWorkflowDTO.getWorkflowDefinition().getCode());
182186
doBackfillDependentWorkflow(backfillWorkflowDTO, backfillDateTimes, effectiveVisitedCodes);
183187
}
@@ -195,54 +199,63 @@ protected WorkflowBackfillTriggerResponse triggerBackfillWorkflow(final Workflow
195199
private void doBackfillDependentWorkflow(final BackfillWorkflowDTO backfillWorkflowDTO,
196200
final List<ZonedDateTime> backfillDateTimes,
197201
final Set<Long> visitedCodes) {
198-
// 1) Query downstream dependent workflows for the current workflow
199202
final WorkflowDefinition upstreamWorkflow = backfillWorkflowDTO.getWorkflowDefinition();
200203
final long upstreamWorkflowCode = upstreamWorkflow.getCode();
204+
final BackfillWorkflowDTO.BackfillParamsDTO originalParams = backfillWorkflowDTO.getBackfillParams();
201205

202-
List<DependentWorkflowDefinition> downstreamDefinitions =
203-
workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamWorkflowCode);
204-
205-
if (downstreamDefinitions == null || downstreamDefinitions.isEmpty()) {
206+
final List<WorkflowDefinition> downstreamWorkflowList = resolveDownstreamWorkflows(
207+
upstreamWorkflowCode, visitedCodes, originalParams.isAllLevelDependent());
208+
if (downstreamWorkflowList.isEmpty()) {
206209
log.info("No downstream dependent workflows found for workflow code {}", upstreamWorkflowCode);
207210
return;
208211
}
209-
// downstreamDefinitions may contain multiple entries for the same downstream workflow code
210-
// (different dependent task lineage). We should only traverse each downstream workflow once
211-
// (visitedCodes check), but trigger all dependent nodes within that downstream workflow by
212-
// aggregating distinct taskDefinitionCodes into startNodes.
213-
final Map<Long, List<DependentWorkflowDefinition>> downstreamDefinitionsByCode =
214-
downstreamDefinitions.stream()
215-
.collect(Collectors.groupingBy(DependentWorkflowDefinition::getWorkflowDefinitionCode));
216-
final Set<Long> downstreamCodes = downstreamDefinitionsByCode.keySet();
217-
final List<WorkflowDefinition> downstreamWorkflowList = workflowDefinitionDao.queryByCodes(downstreamCodes);
218-
// Each workflow code maps to a single WorkflowDefinition (code is unique in t_ds_workflow_definition).
219-
// We still group by code to simplify lookup and keep the code robust if this ever changes.
220-
final Map<Long, List<WorkflowDefinition>> downstreamWorkflowMapByCode = downstreamWorkflowList.stream()
221-
.collect(Collectors.groupingBy(WorkflowDefinition::getCode));
222-
223-
// 2) Reuse upstream business dates for downstream backfill (same instants/zones as the chunk passed to
224-
// doBackfillWorkflow; avoids List<String> -> system-default parse -> dateToString drift)
225-
final List<ZonedDateTime> upstreamBackfillDates = new ArrayList<>(backfillDateTimes);
212+
triggerResolvedDownstreamWorkflows(
213+
backfillWorkflowDTO, backfillDateTimes, visitedCodes, downstreamWorkflowList);
214+
}
226215

227-
// 3) Iterate downstream workflows and build/trigger corresponding BackfillWorkflowDTO
228-
for (Map.Entry<Long, List<DependentWorkflowDefinition>> entry : downstreamDefinitionsByCode.entrySet()) {
229-
long downstreamCode = entry.getKey();
230-
// Prevent self-dependency and circular dependency chains.
231-
// We only traverse each downstream workflow once.
232-
if (visitedCodes.contains(downstreamCode)) {
233-
log.warn("Skip already visited dependent workflow {}", downstreamCode);
234-
continue;
216+
private List<WorkflowDefinition> resolveDownstreamWorkflows(final long upstreamWorkflowCode,
217+
final Set<Long> visitedCodes,
218+
final boolean allLevelDependent) {
219+
final Set<Long> downstreamCodes = new LinkedHashSet<>();
220+
if (allLevelDependent) {
221+
final Deque<Long> pendingWorkflows = new ArrayDeque<>();
222+
pendingWorkflows.add(upstreamWorkflowCode);
223+
while (!pendingWorkflows.isEmpty()) {
224+
final Long currentWorkflowCode = pendingWorkflows.removeFirst();
225+
for (Long directDownstreamCode : queryDirectDownstreamWorkflowCodes(currentWorkflowCode)) {
226+
if (directDownstreamCode == upstreamWorkflowCode) {
227+
continue;
228+
}
229+
if (visitedCodes.contains(directDownstreamCode)) {
230+
continue;
231+
}
232+
if (downstreamCodes.add(directDownstreamCode)) {
233+
pendingWorkflows.addLast(directDownstreamCode);
234+
}
235+
}
235236
}
237+
} else {
238+
for (Long directDownstreamCode : queryDirectDownstreamWorkflowCodes(upstreamWorkflowCode)) {
239+
if (directDownstreamCode == upstreamWorkflowCode || visitedCodes.contains(directDownstreamCode)) {
240+
continue;
241+
}
242+
downstreamCodes.add(directDownstreamCode);
243+
}
244+
}
236245

237-
// Simplification: Downstream backfill is always full, startNodes=null, workerGroup uses
238-
// workflowDefinition's own config
239-
// Only grouping and deduplication are needed, all aggregation logic is omitted
246+
if (downstreamCodes.isEmpty()) {
247+
return Collections.emptyList();
248+
}
240249

241-
WorkflowDefinition downstreamWorkflow = null;
242-
List<WorkflowDefinition> workflowCandidates = downstreamWorkflowMapByCode.get(downstreamCode);
243-
if (workflowCandidates != null && !workflowCandidates.isEmpty()) {
244-
downstreamWorkflow = workflowCandidates.get(0); // code is unique, just take the first one
245-
}
250+
final Map<Long, WorkflowDefinition> downstreamWorkflowMapByCode = workflowDefinitionDao
251+
.queryByCodes(downstreamCodes)
252+
.stream()
253+
.collect(Collectors.toMap(WorkflowDefinition::getCode, workflowDefinition -> workflowDefinition,
254+
(left, right) -> left));
255+
256+
final List<WorkflowDefinition> downstreamWorkflows = new ArrayList<>();
257+
for (Long downstreamCode : downstreamCodes) {
258+
final WorkflowDefinition downstreamWorkflow = downstreamWorkflowMapByCode.get(downstreamCode);
246259
if (downstreamWorkflow == null) {
247260
log.warn("Skip dependent workflow {}, workflow definition not found", downstreamCode);
248261
continue;
@@ -251,40 +264,54 @@ private void doBackfillDependentWorkflow(final BackfillWorkflowDTO backfillWorkf
251264
log.warn("Skip dependent workflow {}, release state is not ONLINE", downstreamCode);
252265
continue;
253266
}
267+
downstreamWorkflows.add(downstreamWorkflow);
268+
}
269+
return downstreamWorkflows;
270+
}
254271

255-
// Currently, reuse the same business date list as upstream for downstream backfill;
256-
// later we can refine the dates based on dependency cycle configuration in dependentWorkflowDefinition
257-
// (taskParams).
258-
BackfillWorkflowDTO.BackfillParamsDTO originalParams = backfillWorkflowDTO.getBackfillParams();
259-
boolean allLevelDependent = originalParams.isAllLevelDependent();
260-
ComplementDependentMode downstreamDependentMode =
261-
allLevelDependent ? originalParams.getBackfillDependentMode() : ComplementDependentMode.OFF_MODE;
262-
263-
BackfillWorkflowDTO.BackfillParamsDTO dependentParams = BackfillWorkflowDTO.BackfillParamsDTO.builder()
264-
// If upstream runs in PARALLEL mode, force downstream to SERIAL to avoid
265-
// re-chunking the already sliced date list; otherwise keep the original runMode.
266-
.runMode(originalParams.getRunMode() == RunMode.RUN_MODE_PARALLEL ? RunMode.RUN_MODE_SERIAL
267-
: originalParams.getRunMode())
268-
.backfillDateList(upstreamBackfillDates)
269-
.expectedParallelismNumber(originalParams.getExpectedParallelismNumber())
270-
// Control whether downstream will continue triggering its own dependencies based on
271-
// allLevelDependent flag
272-
.backfillDependentMode(downstreamDependentMode)
273-
.allLevelDependent(allLevelDependent)
274-
.executionOrder(originalParams.getExecutionOrder())
275-
.build();
272+
private Set<Long> queryDirectDownstreamWorkflowCodes(final long workflowCode) {
273+
final List<DependentWorkflowDefinition> downstreamDefinitions =
274+
workflowLineageService.queryDownstreamDependentWorkflowDefinitions(workflowCode);
275+
if (downstreamDefinitions == null || downstreamDefinitions.isEmpty()) {
276+
return Collections.emptySet();
277+
}
278+
return downstreamDefinitions.stream()
279+
.map(DependentWorkflowDefinition::getWorkflowDefinitionCode)
280+
.collect(Collectors.toCollection(LinkedHashSet::new));
281+
}
282+
283+
private void triggerResolvedDownstreamWorkflows(final BackfillWorkflowDTO backfillWorkflowDTO,
284+
final List<ZonedDateTime> backfillDateTimes,
285+
final Set<Long> visitedCodes,
286+
final List<WorkflowDefinition> downstreamWorkflows) {
287+
final long upstreamWorkflowCode = backfillWorkflowDTO.getWorkflowDefinition().getCode();
288+
final List<ZonedDateTime> upstreamBackfillDates = new ArrayList<>(backfillDateTimes);
289+
final BackfillWorkflowDTO.BackfillParamsDTO originalParams = backfillWorkflowDTO.getBackfillParams();
290+
final boolean allLevelDependent = originalParams.isAllLevelDependent();
291+
292+
for (WorkflowDefinition downstreamWorkflow : downstreamWorkflows) {
293+
final long downstreamCode = downstreamWorkflow.getCode();
294+
if (visitedCodes.contains(downstreamCode)) {
295+
log.warn("Skip already visited dependent workflow {}", downstreamCode);
296+
continue;
297+
}
276298

277-
// Simplified design notes:
278-
// 1. Downstream backfill is always full, startNodes=null, no more aggregation of dependent nodes
279-
// 2. workerGroup is directly taken from the downstream workflowDefinition's own config, if null then use
280-
// system default workerGroup
281-
// 3. Only grouping deduplication and visitedCodes check, all complex aggregation logic is omitted
282-
// This implementation is the simplest and most controllable, suitable for full backfill and workerGroup
283-
// based on itself
284-
BackfillWorkflowDTO dependentBackfillDTO = BackfillWorkflowDTO.builder()
299+
final BackfillWorkflowDTO.BackfillParamsDTO dependentParams =
300+
BackfillWorkflowDTO.BackfillParamsDTO.builder()
301+
.runMode(originalParams.getRunMode() == RunMode.RUN_MODE_PARALLEL ? RunMode.RUN_MODE_SERIAL
302+
: originalParams.getRunMode())
303+
.backfillDateList(upstreamBackfillDates)
304+
.expectedParallelismNumber(originalParams.getExpectedParallelismNumber())
305+
// Downstream expansion has already been decided in resolution stage.
306+
.backfillDependentMode(ComplementDependentMode.OFF_MODE)
307+
.allLevelDependent(allLevelDependent)
308+
.executionOrder(originalParams.getExecutionOrder())
309+
.build();
310+
311+
final BackfillWorkflowDTO dependentBackfillDTO = BackfillWorkflowDTO.builder()
285312
.loginUser(backfillWorkflowDTO.getLoginUser())
286313
.workflowDefinition(downstreamWorkflow)
287-
.startNodes(null) // Full backfill, simplified design
314+
.startNodes(null)
288315
.failureStrategy(backfillWorkflowDTO.getFailureStrategy())
289316
.taskDependType(backfillWorkflowDTO.getTaskDependType())
290317
.execType(backfillWorkflowDTO.getExecType())
@@ -304,8 +331,6 @@ private void doBackfillDependentWorkflow(final BackfillWorkflowDTO backfillWorkf
304331
downstreamCode, upstreamWorkflowCode,
305332
backfillDateTimes.stream().map(DateUtils::dateToString).collect(Collectors.toList()));
306333

307-
// Mark as visited to prevent infinite recursion. Actual dependency chains are usually short, recursion is
308-
// safe.
309334
visitedCodes.add(downstreamCode);
310335
executeWithVisitedCodes(dependentBackfillDTO, visitedCodes);
311336
}

0 commit comments

Comments
 (0)