Skip to content

[DSIP-79][Task] Add datavines task for data quality#16863

Open
zixi0825 wants to merge 64 commits intoapache:devfrom
zixi0825:dev-datavines
Open

[DSIP-79][Task] Add datavines task for data quality#16863
zixi0825 wants to merge 64 commits intoapache:devfrom
zixi0825:dev-datavines

Conversation

@zixi0825
Copy link
Copy Markdown
Member

@zixi0825 zixi0825 commented Dec 1, 2024

Purpose of the pull request

See #16113 for detail.

close #16113

Brief change log

Verify this pull request

This pull request is code cleanup without any test coverage.

(or)

This pull request is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(or)

Pull Request Notice

Pull Request Notice

If your pull request contain incompatible change, you should also add it to docs/docs/en/guide/upgrede/incompatible.md

@SbloodyS SbloodyS added the DSIP label Dec 1, 2024
@SbloodyS SbloodyS added this to the 3.3.0 milestone Dec 1, 2024
@SbloodyS SbloodyS changed the title [Feature][Task] Add datavines task for data quality [DSIP-79][Task] Add datavines task for data quality Dec 1, 2024
@TestMethodOrder(MethodOrderer.MethodName.class)
@DolphinScheduler(composeFiles = "docker/datavines-task/docker-compose.yaml")
@DisableIfTestFails
public class DatavinesTaskE2ETest extends BaseWorkflowE2ETest {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should add this to

- name: DolphinDBDataSourceE2ETest
class: org.apache.dolphinscheduler.e2e.cases.DolphinDBDataSourceE2ETest


@Test
@Order(10)
void testRunDatavinesTasks_SuccessCase() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to add a failed test case.

Comment on lines +24 to +26
MASTER_MAX_CPU_LOAD_AVG: 400
MASTER_RESERVED_MEMORY: 0.01
WORKER_TENANT_AUTO_CREATE: 'true'
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These configurations need to be consistent with other test cases.

parse(doPost(address + DatavinesTaskConstants.JOB_EXECUTION_KILL + jobExecutionId, token));
}

public static JsonNode parse(String res) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to use JSONUtils instead of creating a new one.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a new Datavines task type to DolphinScheduler to support triggering Datavines data quality jobs from workflows (UI configuration + worker task plugin + docs/E2E coverage).

Changes:

  • UI: register DATAVINES task type, add form fields (address/jobId/token/failureBlock), and add DAG icon + i18n strings.
  • Backend: add dolphinscheduler-task-datavines plugin module implementing task channel, parameters, and HTTP calls to Datavines OpenAPI.
  • Docs/E2E: add Datavines task docs (EN/ZH) and an E2E workflow test with a mock server.

Reviewed changes

Copilot reviewed 33 out of 38 changed files in this pull request and generated 11 comments.

Show a summary per file
File Description
dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss Adds Datavines task icon + hover icon styling in the DAG.
dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts Adds DATAVINES to UI task type union and task type metadata map.
dolphinscheduler-ui/src/views/projects/task/components/node/types.ts Extends task parameter typing to include Datavines fields.
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-datavines.ts New UI task node model + form composition for Datavines.
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts Registers DATAVINES to the task form factory map.
dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts Serializes Datavines form fields into taskParams.
dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datavines.ts New Datavines-specific form fields (address/jobId/token/failureBlock + custom params).
dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts Exports the new Datavines field builder.
dolphinscheduler-ui/src/store/project/types.ts Adds DATAVINES to the store-side task type union.
dolphinscheduler-ui/src/store/project/task-type.ts Adds DATAVINES to store task type metadata map.
dolphinscheduler-ui/src/locales/zh_CN/project.ts Adds Chinese i18n strings for Datavines form fields.
dolphinscheduler-ui/src/locales/en_US/project.ts Adds English i18n strings for Datavines form fields.
dolphinscheduler-task-plugin/pom.xml Adds dolphinscheduler-task-datavines module to task plugins build.
dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/pom.xml New Datavines task plugin module (deps + shade config).
dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/.../DatavinesTask.java Implements the Datavines remote task execution/tracking/cancel logic.
dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/.../DatavinesParameters.java Defines Datavines task parameters and validation.
dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/.../DatavinesTaskChannel.java Implements parameter parsing + task creation for the plugin.
dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/.../DatavinesTaskChannelFactory.java Registers the Datavines task channel via SPI.
dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/.../DatavinesTaskConstants.java Adds Datavines OpenAPI endpoints/status/constants.
dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/.../DatavinesTaskException.java Adds a Datavines-specific runtime exception.
dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/main/java/.../utils/RequestUtils.java Adds HTTP helper utilities for calling Datavines OpenAPI.
dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/test/java/.../DatavinesTaskTest.java Adds unit tests for task behavior (with static mocking).
dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/test/java/.../DatavinesTaskChannelTest.java Adds unit tests for channel parsing and task creation.
dolphinscheduler-task-plugin/dolphinscheduler-task-datavines/src/test/java/.../DatavinesTaskChannelFactoryTest.java Adds unit tests for SPI factory name and instantiation.
dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml Includes the Datavines task artifact in the “task-all” bundle.
dolphinscheduler-e2e/.../docker/datavines-task/mockserver-config.json MockServer expectations for Datavines OpenAPI endpoints used in E2E.
dolphinscheduler-e2e/.../docker/datavines-task/docker-compose.yaml E2E compose to run standalone DS + mock Datavines server.
dolphinscheduler-e2e/.../pages/project/workflow/task/DatavinesTaskForm.java Adds Selenium page object for filling Datavines task form.
dolphinscheduler-e2e/.../pages/project/workflow/WorkflowForm.java Adds DATAVINES task type wiring to workflow form helper.
dolphinscheduler-e2e/.../cases/tasks/DatavinesTaskE2ETest.java Adds E2E test that creates/runs a workflow with Datavines task.
dolphinscheduler-api/src/main/resources/task-type-config.yaml Adds DATAVINES under a new dataQuality group (and updates ML list).
docs/docs/zh/guide/task/datavines.md Adds Chinese documentation page for Datavines task.
docs/docs/en/guide/task/datavines.md Adds English documentation page for Datavines task.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.


@Override
public boolean checkParameters() {
return StringUtils.isNotEmpty(this.address) && StringUtils.isNotEmpty(this.jobId);
Comment on lines +73 to +99
public static String doGet(String url, String token) {
String result = "";
HttpClient httpClient = HttpClientBuilder.create().build();
HttpGet httpGet = null;
try {
URIBuilder uriBuilder = new URIBuilder(url);
URI uri = uriBuilder.build();
httpGet = new HttpGet(uri);
httpGet.setHeader("Authorization", "Bearer " + token);
log.info("access url: {}", uri);
HttpResponse response = httpClient.execute(httpGet);
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
result = EntityUtils.toString(response.getEntity());
log.info("datavines task succeed with results: {}", result);
} else {
log.error("datavines task terminated,response: {}", response);
}
} catch (IllegalArgumentException ie) {
log.error("datavines task terminated: {}", ie.getMessage());
} catch (Exception e) {
log.error("datavines task terminated: ", e);
} finally {
if (null != httpGet) {
httpGet.releaseConnection();
}
}
return result;
Comment on lines +80 to +121
void submitApplicationExecutesJobSuccessfully() {
when(taskExecutionContext.getTaskParams())
.thenReturn("{\"address\":\"http://localhost\",\"jobId\":\"1\",\"token\":\"token\"}");
datavinesTask.init();
assertDoesNotThrow(() -> datavinesTask.submitApplication());
}

@Test
void trackApplicationStatusJobExecutionSuccessSetsExitCodeSuccess() throws TaskException {
JsonNode executeJobResult = RequestUtils.parse("{\"code\":\"200\",\"data\":\"1\"}");
JsonNode executeStatus = RequestUtils.parse("{\"code\":\"200\",\"data\":\"SUCCESS\"}");
JsonNode executeResult = RequestUtils.parse("{\"code\":\"200\",\"data\":\"1\"}");
try (MockedStatic<RequestUtils> requestUtilsStatic = Mockito.mockStatic(RequestUtils.class)) {
when(taskExecutionContext.getTaskParams())
.thenReturn("{\"address\":\"http://localhost\",\"jobId\":\"1\",\"token\":\"token\"}");
datavinesTask.init();

requestUtilsStatic.when(() -> RequestUtils.executeJob(Mockito.any(), Mockito.any(), Mockito.any()))
.thenReturn(executeJobResult);
datavinesTask.submitApplication();

requestUtilsStatic
.when(() -> RequestUtils.getJobExecutionStatus(Mockito.any(), Mockito.any(), Mockito.any()))
.thenReturn(executeStatus);

requestUtilsStatic
.when(() -> RequestUtils.getJobExecutionResult(Mockito.any(), Mockito.any(), Mockito.any()))
.thenReturn(executeResult);

datavinesTask.trackApplicationStatus();
Assertions.assertEquals(EXIT_CODE_SUCCESS, datavinesTask.getExitStatusCode());
}
}

@Test
void trackApplicationStatusJobExecutionFailureSetsExitCodeFailure() throws TaskException {
when(taskExecutionContext.getTaskParams())
.thenReturn("{\"address\":\"http://localhost\",\"jobId\":\"1\",\"token\":\"token\"}");
datavinesTask.init();
datavinesTask.submitApplication();
datavinesTask.trackApplicationStatus();
Assertions.assertEquals(EXIT_CODE_FAILURE, datavinesTask.getExitStatusCode());
Comment on lines +479 to +481
jobId?: string
token?: string
failureBlock?: string
Comment on lines +171 to +176
if (result instanceof MissingNode || result instanceof NullNode) {
errorHandle(DatavinesTaskConstants.API_ERROR_MSG);
isCorrect = false;
} else if (result.get(DatavinesTaskConstants.API_RESULT_CODE)
.asInt() != DatavinesTaskConstants.API_RESULT_CODE_SUCCESS) {
errorHandle(result.get(DatavinesTaskConstants.API_RESULT_MSG));
package org.apache.dolphinscheduler.plugin.task.datavines;

/**
* Custom DinkyTaskConstants
Comment thread docs/docs/en/guide/task/datavines.md Outdated
|-------------------|-------------------------------------------------------------------------------------------------------|
| Datavines Address | The URL for the Datavines service, e.g., http://localhost:5600. |
| Datavines Job ID | The unique job id for a datavines job. |
| Datavines token | The Datawines service access token can be obtained through token management on the Datavines service. |
- 'OPENMLDB'
- 'DVC'
- 'SAGEMAKER'
- 'PYTORCH'
Comment on lines +22 to +23
import static org.junit.Assert.assertThrows;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
Comment on lines +101 to +102
// Use address-taskId as app id
setAppIds(String.format(DatavinesTaskConstants.APPIDS_FORMAT, address, this.jobExecutionId));
@github-actions github-actions bot added the CI&CD label Apr 16, 2026
@sonarqubecloud
Copy link
Copy Markdown

Quality Gate Failed Quality Gate failed

Failed conditions
0.0% Coverage on New Code (required ≥ 60%)

See analysis details on SonarQube Cloud

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[DSIP-79][Task] Add Datavines task to better support data quality

4 participants