[DSIP-79][Task] Add datavines task for data quality#16863
[DSIP-79][Task] Add datavines task for data quality#16863zixi0825 wants to merge 64 commits intoapache:devfrom
Conversation
… dev-datavines # Conflicts: # dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss
9267125 to
8ad0a2a
Compare
27c3e67 to
5d4691c
Compare
# Conflicts: # dolphinscheduler-task-plugin/pom.xml # dolphinscheduler-ui/src/locales/en_US/project.ts # dolphinscheduler-ui/src/locales/zh_CN/project.ts
| @TestMethodOrder(MethodOrderer.MethodName.class) | ||
| @DolphinScheduler(composeFiles = "docker/datavines-task/docker-compose.yaml") | ||
| @DisableIfTestFails | ||
| public class DatavinesTaskE2ETest extends BaseWorkflowE2ETest { |
There was a problem hiding this comment.
You should add this to
dolphinscheduler/.github/workflows/e2e.yml
Lines 137 to 138 in 5314ac0
|
|
||
| @Test | ||
| @Order(10) | ||
| void testRunDatavinesTasks_SuccessCase() { |
There was a problem hiding this comment.
It's better to add a failed test case.
| MASTER_MAX_CPU_LOAD_AVG: 400 | ||
| MASTER_RESERVED_MEMORY: 0.01 | ||
| WORKER_TENANT_AUTO_CREATE: 'true' |
There was a problem hiding this comment.
These configurations need to be consistent with other test cases.
| parse(doPost(address + DatavinesTaskConstants.JOB_EXECUTION_KILL + jobExecutionId, token)); | ||
| } | ||
|
|
||
| public static JsonNode parse(String res) { |
There was a problem hiding this comment.
It's better to use JSONUtils instead of creating a new one.
There was a problem hiding this comment.
Pull request overview
Adds a new Datavines task type to DolphinScheduler to support triggering Datavines data quality jobs from workflows (UI configuration + worker task plugin + docs/E2E coverage).
Changes:
- UI: register
DATAVINEStask type, add form fields (address/jobId/token/failureBlock), and add DAG icon + i18n strings. - Backend: add
dolphinscheduler-task-datavinesplugin module implementing task channel, parameters, and HTTP calls to Datavines OpenAPI. - Docs/E2E: add Datavines task docs (EN/ZH) and an E2E workflow test with a mock server.
Reviewed changes
Copilot reviewed 33 out of 38 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss | Adds Datavines task icon + hover icon styling in the DAG. |
| dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts | Adds DATAVINES to UI task type union and task type metadata map. |
| dolphinscheduler-ui/src/views/projects/task/components/node/types.ts | Extends task parameter typing to include Datavines fields. |
| dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-datavines.ts | New UI task node model + form composition for Datavines. |
| dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts | Registers DATAVINES to the task form factory map. |
| dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts | Serializes Datavines form fields into taskParams. |
| dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datavines.ts | New Datavines-specific form fields (address/jobId/token/failureBlock + custom params). |
| dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts | Exports the new Datavines field builder. |
| dolphinscheduler-ui/src/store/project/types.ts | Adds DATAVINES to the store-side task type union. |
| dolphinscheduler-ui/src/store/project/task-type.ts | Adds DATAVINES to store task type metadata map. |
| dolphinscheduler-ui/src/locales/zh_CN/project.ts | Adds Chinese i18n strings for Datavines form fields. |
| dolphinscheduler-ui/src/locales/en_US/project.ts | Adds English i18n strings for Datavines form fields. |
| dolphinscheduler-task-plugin/pom.xml | Adds dolphinscheduler-task-datavines module to task plugins build. |
| dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/pom.xml | New Datavines task plugin module (deps + shade config). |
| dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/.../DatavinesTask.java | Implements the Datavines remote task execution/tracking/cancel logic. |
| dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/.../DatavinesParameters.java | Defines Datavines task parameters and validation. |
| dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/.../DatavinesTaskChannel.java | Implements parameter parsing + task creation for the plugin. |
| dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/.../DatavinesTaskChannelFactory.java | Registers the Datavines task channel via SPI. |
| dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/.../DatavinesTaskConstants.java | Adds Datavines OpenAPI endpoints/status/constants. |
| dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/.../DatavinesTaskException.java | Adds a Datavines-specific runtime exception. |
| dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/.../utils/RequestUtils.java | Adds HTTP helper utilities for calling Datavines OpenAPI. |
| dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/test/java/.../DatavinesTaskTest.java | Adds unit tests for task behavior (with static mocking). |
| dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/test/java/.../DatavinesTaskChannelTest.java | Adds unit tests for channel parsing and task creation. |
| dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/test/java/.../DatavinesTaskChannelFactoryTest.java | Adds unit tests for SPI factory name and instantiation. |
| dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml | Includes the Datavines task artifact in the “task-all” bundle. |
| dolphinscheduler-e2e/.../docker/datavines-task/mockserver-config.json | MockServer expectations for Datavines OpenAPI endpoints used in E2E. |
| dolphinscheduler-e2e/.../docker/datavines-task/docker-compose.yaml | E2E compose to run standalone DS + mock Datavines server. |
| dolphinscheduler-e2e/.../pages/project/workflow/task/DatavinesTaskForm.java | Adds Selenium page object for filling Datavines task form. |
| dolphinscheduler-e2e/.../pages/project/workflow/WorkflowForm.java | Adds DATAVINES task type wiring to workflow form helper. |
| dolphinscheduler-e2e/.../cases/tasks/DatavinesTaskE2ETest.java | Adds E2E test that creates/runs a workflow with Datavines task. |
| dolphinscheduler-api/src/main/resources/task-type-config.yaml | Adds DATAVINES under a new dataQuality group (and updates ML list). |
| docs/docs/zh/guide/task/datavines.md | Adds Chinese documentation page for Datavines task. |
| docs/docs/en/guide/task/datavines.md | Adds English documentation page for Datavines task. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| @Override | ||
| public boolean checkParameters() { | ||
| return StringUtils.isNotEmpty(this.address) && StringUtils.isNotEmpty(this.jobId); |
| public static String doGet(String url, String token) { | ||
| String result = ""; | ||
| HttpClient httpClient = HttpClientBuilder.create().build(); | ||
| HttpGet httpGet = null; | ||
| try { | ||
| URIBuilder uriBuilder = new URIBuilder(url); | ||
| URI uri = uriBuilder.build(); | ||
| httpGet = new HttpGet(uri); | ||
| httpGet.setHeader("Authorization", "Bearer " + token); | ||
| log.info("access url: {}", uri); | ||
| HttpResponse response = httpClient.execute(httpGet); | ||
| if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) { | ||
| result = EntityUtils.toString(response.getEntity()); | ||
| log.info("datavines task succeed with results: {}", result); | ||
| } else { | ||
| log.error("datavines task terminated,response: {}", response); | ||
| } | ||
| } catch (IllegalArgumentException ie) { | ||
| log.error("datavines task terminated: {}", ie.getMessage()); | ||
| } catch (Exception e) { | ||
| log.error("datavines task terminated: ", e); | ||
| } finally { | ||
| if (null != httpGet) { | ||
| httpGet.releaseConnection(); | ||
| } | ||
| } | ||
| return result; |
| void submitApplicationExecutesJobSuccessfully() { | ||
| when(taskExecutionContext.getTaskParams()) | ||
| .thenReturn("{\"address\":\"http://localhost\",\"jobId\":\"1\",\"token\":\"token\"}"); | ||
| datavinesTask.init(); | ||
| assertDoesNotThrow(() -> datavinesTask.submitApplication()); | ||
| } | ||
|
|
||
| @Test | ||
| void trackApplicationStatusJobExecutionSuccessSetsExitCodeSuccess() throws TaskException { | ||
| JsonNode executeJobResult = RequestUtils.parse("{\"code\":\"200\",\"data\":\"1\"}"); | ||
| JsonNode executeStatus = RequestUtils.parse("{\"code\":\"200\",\"data\":\"SUCCESS\"}"); | ||
| JsonNode executeResult = RequestUtils.parse("{\"code\":\"200\",\"data\":\"1\"}"); | ||
| try (MockedStatic<RequestUtils> requestUtilsStatic = Mockito.mockStatic(RequestUtils.class)) { | ||
| when(taskExecutionContext.getTaskParams()) | ||
| .thenReturn("{\"address\":\"http://localhost\",\"jobId\":\"1\",\"token\":\"token\"}"); | ||
| datavinesTask.init(); | ||
|
|
||
| requestUtilsStatic.when(() -> RequestUtils.executeJob(Mockito.any(), Mockito.any(), Mockito.any())) | ||
| .thenReturn(executeJobResult); | ||
| datavinesTask.submitApplication(); | ||
|
|
||
| requestUtilsStatic | ||
| .when(() -> RequestUtils.getJobExecutionStatus(Mockito.any(), Mockito.any(), Mockito.any())) | ||
| .thenReturn(executeStatus); | ||
|
|
||
| requestUtilsStatic | ||
| .when(() -> RequestUtils.getJobExecutionResult(Mockito.any(), Mockito.any(), Mockito.any())) | ||
| .thenReturn(executeResult); | ||
|
|
||
| datavinesTask.trackApplicationStatus(); | ||
| Assertions.assertEquals(EXIT_CODE_SUCCESS, datavinesTask.getExitStatusCode()); | ||
| } | ||
| } | ||
|
|
||
| @Test | ||
| void trackApplicationStatusJobExecutionFailureSetsExitCodeFailure() throws TaskException { | ||
| when(taskExecutionContext.getTaskParams()) | ||
| .thenReturn("{\"address\":\"http://localhost\",\"jobId\":\"1\",\"token\":\"token\"}"); | ||
| datavinesTask.init(); | ||
| datavinesTask.submitApplication(); | ||
| datavinesTask.trackApplicationStatus(); | ||
| Assertions.assertEquals(EXIT_CODE_FAILURE, datavinesTask.getExitStatusCode()); |
| jobId?: string | ||
| token?: string | ||
| failureBlock?: string |
| if (result instanceof MissingNode || result instanceof NullNode) { | ||
| errorHandle(DatavinesTaskConstants.API_ERROR_MSG); | ||
| isCorrect = false; | ||
| } else if (result.get(DatavinesTaskConstants.API_RESULT_CODE) | ||
| .asInt() != DatavinesTaskConstants.API_RESULT_CODE_SUCCESS) { | ||
| errorHandle(result.get(DatavinesTaskConstants.API_RESULT_MSG)); |
| package org.apache.dolphinscheduler.plugin.task.datavines; | ||
|
|
||
| /** | ||
| * Custom DinkyTaskConstants |
| |-------------------|-------------------------------------------------------------------------------------------------------| | ||
| | Datavines Address | The URL for the Datavines service, e.g., http://localhost:5600. | | ||
| | Datavines Job ID | The unique job id for a datavines job. | | ||
| | Datavines token | The Datawines service access token can be obtained through token management on the Datavines service. | |
| - 'OPENMLDB' | ||
| - 'DVC' | ||
| - 'SAGEMAKER' | ||
| - 'PYTORCH' |
| import static org.junit.Assert.assertThrows; | ||
| import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; |
| // Use address-taskId as app id | ||
| setAppIds(String.format(DatavinesTaskConstants.APPIDS_FORMAT, address, this.jobExecutionId)); |
|


Purpose of the pull request
See #16113 for detail.
close #16113
Brief change log
Verify this pull request
This pull request is code cleanup without any test coverage.
(or)
This pull request is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(or)
Pull Request Notice
Pull Request Notice
If your pull request contain incompatible change, you should also add it to
docs/docs/en/guide/upgrede/incompatible.md