diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java index fb99e54c134d..d62c52f45d4a 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java @@ -1861,4 +1861,107 @@ public void testWorkflowTimeout_WithAlertGroup_ShouldSendAlert() { masterContainer.assertAllResourceReleased(); } + @Test + @DisplayName("Test start a workflow which contains a dep task with timeout warn strategy") + public void testStartWorkflow_withTimeoutWarnTask() { + masterConfig.getServerLoadProtection().setEnabled(false); + + final String yaml = "/it/start/workflow_with_timeout_warn_task.yaml"; + final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml); + final WorkflowDefinition workflow = context.getWorkflow("workflow_with_timeout_warn_task"); + + final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO + .builder() + .workflowDefinition(workflow) + .runWorkflowCommandParam(new RunWorkflowCommandParam()) + .warningGroupId(workflow.getWarningGroupId()) + .build(); + + final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO); + + await() + .atMost(Duration.ofSeconds(90)) + .untilAsserted(() -> { + Assertions + .assertThat(repository.queryWorkflowInstance(workflow)) + .satisfiesExactly(workflowInstance -> assertThat( + workflowInstance.getState()) + .isEqualTo(WorkflowExecutionStatus.RUNNING_EXECUTION)); + + Assertions + .assertThat(repository.queryTaskInstance(workflow)) + .hasSize(1) + .satisfiesExactly(taskInstance -> { + assertThat(taskInstance.getName()) + .isEqualTo("dep_task_with_timeout_warn"); + assertThat(taskInstance.getState()) + .isEqualTo(TaskExecutionStatus.RUNNING_EXECUTION); + }); + + Assertions + .assertThat(repository.queryAlert(workflowInstanceId)) + .isNotEmpty() + .anySatisfy(alert -> { + assertThat(alert.getAlertType()) + .isEqualTo(AlertType.TASK_TIMEOUT); + }); + }); + + workflowOperator.stopWorkflowInstance(workflowInstanceId); + await() + .atMost(Duration.ofSeconds(30)) + .untilAsserted(() -> Assertions.assertThat(repository.queryWorkflowInstance(workflowInstanceId)) + .matches(w -> w.getState() == WorkflowExecutionStatus.STOP)); + masterContainer.assertAllResourceReleased(); + } + + @Test + @DisplayName("Test start a workflow which contains a dep task with timeout warn failed strategy") + public void testStartWorkflow_withTimeoutWarnFailedTask() { + masterConfig.getServerLoadProtection().setEnabled(false); + + final String yaml = "/it/start/workflow_with_timeout_warnfailed_task.yaml"; + final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml); + final WorkflowDefinition workflow = context.getWorkflow("workflow_with_timeout_warnfailed_task"); + + final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO + .builder() + .workflowDefinition(workflow) + .runWorkflowCommandParam(new RunWorkflowCommandParam()) + .warningGroupId(workflow.getWarningGroupId()) + .build(); + + final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO); + + await() + .atMost(Duration.ofSeconds(90)) + .untilAsserted(() -> { + Assertions + .assertThat(repository.queryWorkflowInstance(workflow)) + .satisfiesExactly(workflowInstance -> assertThat( + workflowInstance.getState()) + .isEqualTo(WorkflowExecutionStatus.STOP)); + + Assertions + .assertThat(repository.queryTaskInstance(workflow)) + .hasSize(1) + .satisfiesExactly(taskInstance -> { + assertThat(taskInstance.getName()) + .isEqualTo("dep_task_with_timeout_warnfailed"); + assertThat(taskInstance.getState()) + .isEqualTo(TaskExecutionStatus.KILL); + }); + + Assertions + .assertThat(repository.queryAlert(workflowInstanceId)) + .isNotEmpty() + .anySatisfy(alert -> { + assertThat(alert.getAlertType()) + .isEqualTo(AlertType.TASK_TIMEOUT); + }); + }); + + masterContainer.assertAllResourceReleased(); + } + } diff --git a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_timeout_warn_task.yaml b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_timeout_warn_task.yaml new file mode 100644 index 000000000000..b6047f5cf308 --- /dev/null +++ b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_timeout_warn_task.yaml @@ -0,0 +1,65 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +project: + name: MasterIntegrationTest + code: 1 + description: This is a fake project + userId: 1 + userName: admin + createTime: 2024-08-12 00:00:00 + updateTime: 2021-08-12 00:00:00 + +workflows: + - name: workflow_with_timeout_warn_task + code: 1 + version: 1 + projectCode: 1 + description: This is a fake workflow with single timeout warn task + releaseState: ONLINE + warningGroupId: 1 + createTime: 2024-08-12 00:00:00 + updateTime: 2021-08-12 00:00:00 + userId: 1 + executionType: PARALLEL + +tasks: + - name: dep_task_with_timeout_warn + code: 1 + version: 1 + projectCode: 1 + userId: 1 + timeoutFlag: 'OPEN' + timeoutNotifyStrategy: 'WARN' + timeout: 1 + taskType: DEPENDENT + taskParams: '{"localParams":[],"resourceList":[],"dependence":{"checkInterval":10,"failurePolicy":"DEPENDENT_FAILURE_FAILURE","relation":"AND","dependTaskList":[{"relation":"AND","dependItemList":[{"dependentType":"DEPENDENT_ON_WORKFLOW","projectCode":1,"definitionCode":1,"depTaskCode":0,"cycle":"day","dateValue":"last1Days"}]}]}}' + workerGroup: default + createTime: 2024-08-12 00:00:00 + updateTime: 2021-08-12 00:00:00 + taskExecuteType: BATCH + +taskRelations: + - projectCode: 1 + workflowDefinitionCode: 1 + workflowDefinitionVersion: 1 + preTaskCode: 0 + preTaskVersion: 0 + postTaskCode: 1 + postTaskVersion: 1 + createTime: 2024-08-12 00:00:00 + updateTime: 2024-08-12 00:00:00 diff --git a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_timeout_warnfailed_task.yaml b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_timeout_warnfailed_task.yaml new file mode 100644 index 000000000000..20c2e50865c2 --- /dev/null +++ b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_timeout_warnfailed_task.yaml @@ -0,0 +1,65 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +project: + name: MasterIntegrationTest + code: 1 + description: This is a fake project + userId: 1 + userName: admin + createTime: 2024-08-12 00:00:00 + updateTime: 2021-08-12 00:00:00 + +workflows: + - name: workflow_with_timeout_warnfailed_task + code: 1 + version: 1 + projectCode: 1 + description: This is a fake workflow with single timeout warnfailed task + releaseState: ONLINE + warningGroupId: 1 + createTime: 2024-08-12 00:00:00 + updateTime: 2021-08-12 00:00:00 + userId: 1 + executionType: PARALLEL + +tasks: + - name: dep_task_with_timeout_warnfailed + code: 1 + version: 1 + projectCode: 1 + userId: 1 + timeoutFlag: 'OPEN' + timeoutNotifyStrategy: 'WARNFAILED' + timeout: 1 + taskType: DEPENDENT + taskParams: '{"localParams":[],"resourceList":[],"dependence":{"checkInterval":10,"failurePolicy":"DEPENDENT_FAILURE_FAILURE","relation":"AND","dependTaskList":[{"relation":"AND","dependItemList":[{"dependentType":"DEPENDENT_ON_WORKFLOW","projectCode":1,"definitionCode":1,"depTaskCode":0,"cycle":"day","dateValue":"last1Days"}]}]}}' + workerGroup: default + createTime: 2024-08-12 00:00:00 + updateTime: 2021-08-12 00:00:00 + taskExecuteType: BATCH + +taskRelations: + - projectCode: 1 + workflowDefinitionCode: 1 + workflowDefinitionVersion: 1 + preTaskCode: 0 + preTaskVersion: 0 + postTaskCode: 1 + postTaskVersion: 1 + createTime: 2024-08-12 00:00:00 + updateTime: 2024-08-12 00:00:00