diff --git a/common-interfaces/src/main/java/org/eea/interfaces/controller/dataset/DatasetSnapshotController.java b/common-interfaces/src/main/java/org/eea/interfaces/controller/dataset/DatasetSnapshotController.java index 63eb35d341..c31c154f18 100644 --- a/common-interfaces/src/main/java/org/eea/interfaces/controller/dataset/DatasetSnapshotController.java +++ b/common-interfaces/src/main/java/org/eea/interfaces/controller/dataset/DatasetSnapshotController.java @@ -334,4 +334,10 @@ void downloadHistoricReleasesCSV(@PathVariable Long datasetId, @RequestParam("da @PutMapping("/private/rollBackSnapshotRecord/{jobId}") void rollBackSnapshotRecord(@PathVariable Long jobId, @RequestParam("dataflowId") Long dataflowId, @RequestParam("providerId") Long providerId); + @PostMapping(value = "/private/releasePrecheck/{jobId}") + void precheckReleaseJob(@PathVariable("jobId") Long jobId); + + @PutMapping(value = "/private/startQueuedReleaseJob/{jobId}", produces = MediaType.APPLICATION_JSON_VALUE) + void startQueuedReleaseJob(@PathVariable("jobId") Long jobId); + } diff --git a/dataset-service/src/main/java/org/eea/dataset/controller/DatasetSnapshotControllerImpl.java b/dataset-service/src/main/java/org/eea/dataset/controller/DatasetSnapshotControllerImpl.java index 77efa7dc39..807c74d140 100644 --- a/dataset-service/src/main/java/org/eea/dataset/controller/DatasetSnapshotControllerImpl.java +++ b/dataset-service/src/main/java/org/eea/dataset/controller/DatasetSnapshotControllerImpl.java @@ -9,10 +9,7 @@ import org.apache.commons.lang3.BooleanUtils; import org.eea.dataset.persistence.metabase.domain.ReportingDataset; import org.eea.dataset.persistence.metabase.repository.ReportingDatasetRepository; -import org.eea.dataset.service.DatasetSchemaService; -import org.eea.dataset.service.DatasetSnapshotService; -import org.eea.dataset.service.DatasetTableService; -import org.eea.dataset.service.ResolveSnapshotTable; +import org.eea.dataset.service.*; import org.eea.exception.EEAErrorMessage; import org.eea.exception.EEAException; import org.eea.interfaces.controller.communication.NotificationController.NotificationControllerZuul; @@ -129,6 +126,8 @@ public class DatasetSnapshotControllerImpl implements DatasetSnapshotController @Autowired private KafkaSenderUtils kafkaSenderUtils; + @Autowired + private ReleasePrecheckService releasePrecheckService; @Value("${eea.authorization.key}") private String eeaAuthorizationKey; @@ -1247,4 +1246,22 @@ public void rollBackSnapshotRecord(@PathVariable Long jobId, @RequestParam("data resolveSnapshotTable.rollBackSnapshotTableValues(jobId, dataflowId, providerId); } + @Override + @PostMapping(value = "/private/releasePrecheck/{jobId}") + public void precheckReleaseJob(@PathVariable("jobId") Long jobId) { + releasePrecheckService.precheckOrThrow(jobId); + } + + @Override + @HystrixCommand + @PutMapping(value = "/private/startQueuedReleaseJob/{jobId}", produces = MediaType.APPLICATION_JSON_VALUE) + @ApiOperation(value = "Start queued release job", hidden = true) + @PreAuthorize("isAuthenticated()") + public void startQueuedReleaseJob(@PathVariable("jobId") Long jobId) { + try { + releasePrecheckService.startQueuedReleaseJob(jobId); + } catch (EEAException e) { + throw new RuntimeException(e); + } + } } diff --git a/dataset-service/src/main/java/org/eea/dataset/io/kafka/commands/CheckBlockersDataSnapshotCommand.java b/dataset-service/src/main/java/org/eea/dataset/io/kafka/commands/CheckBlockersDataSnapshotCommand.java index 4ee39a598f..f02ec735a8 100644 --- a/dataset-service/src/main/java/org/eea/dataset/io/kafka/commands/CheckBlockersDataSnapshotCommand.java +++ b/dataset-service/src/main/java/org/eea/dataset/io/kafka/commands/CheckBlockersDataSnapshotCommand.java @@ -1,56 +1,36 @@ package org.eea.dataset.io.kafka.commands; -import org.eea.datalake.service.S3Helper; -import org.eea.datalake.service.S3Service; -import org.eea.datalake.service.model.S3PathResolver; -import org.eea.dataset.persistence.data.repository.ValidationRepository; import org.eea.dataset.persistence.metabase.domain.DataSetMetabase; -import org.eea.dataset.persistence.metabase.domain.Task; import org.eea.dataset.persistence.metabase.repository.DataSetMetabaseRepository; -import org.eea.dataset.persistence.metabase.repository.TaskRepository; import org.eea.dataset.service.DatasetSnapshotService; import org.eea.exception.EEAException; import org.eea.interfaces.controller.dataflow.DataFlowController.DataFlowControllerZuul; import org.eea.interfaces.controller.orchestrator.JobController.JobControllerZuul; import org.eea.interfaces.controller.orchestrator.JobHistoryController.JobHistoryControllerZuul; -import org.eea.interfaces.controller.orchestrator.JobProcessController.JobProcessControllerZuul; -import org.eea.interfaces.controller.recordstore.ProcessController.ProcessControllerZuul; import org.eea.interfaces.controller.ums.UserManagementController.UserManagementControllerZull; import org.eea.interfaces.vo.dataflow.DataFlowVO; -import org.eea.interfaces.vo.dataset.CreateSnapshotVO; -import org.eea.interfaces.vo.dataset.enums.ErrorTypeEnum; -import org.eea.interfaces.vo.orchestrator.JobProcessVO; import org.eea.interfaces.vo.orchestrator.JobVO; -import org.eea.interfaces.vo.orchestrator.enums.JobInfoEnum; import org.eea.interfaces.vo.orchestrator.enums.JobStatusEnum; import org.eea.interfaces.vo.orchestrator.enums.JobTypeEnum; -import org.eea.interfaces.vo.recordstore.enums.ProcessStatusEnum; -import org.eea.interfaces.vo.recordstore.enums.ProcessTypeEnum; import org.eea.interfaces.vo.ums.TokenVO; import org.eea.kafka.commands.AbstractEEAEventHandlerCommand; import org.eea.kafka.domain.EEAEventVO; import org.eea.kafka.domain.EventType; import org.eea.kafka.domain.NotificationVO; import org.eea.kafka.utils.KafkaSenderUtils; -import org.eea.multitenancy.TenantResolver; import org.eea.security.authorization.AdminUserAuthorization; import org.eea.utils.LiteralConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Lazy; -import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.jdbc.support.rowset.SqlRowSet; import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.stereotype.Component; import java.sql.Timestamp; -import java.text.SimpleDateFormat; import java.util.*; -import static org.eea.utils.LiteralConstants.*; /** * The Class PropagateNewFieldCommand. @@ -74,10 +54,6 @@ public class CheckBlockersDataSnapshotCommand extends AbstractEEAEventHandlerCom @Autowired private DataSetMetabaseRepository dataSetMetabaseRepository; - /** The validation repository. */ - @Autowired - private ValidationRepository validationRepository; - /** The kafka sender utils. */ @Lazy @Autowired @@ -87,11 +63,6 @@ public class CheckBlockersDataSnapshotCommand extends AbstractEEAEventHandlerCom @Autowired private DatasetSnapshotService datasetSnapshotService; - /** The process controller zuul */ - @Autowired - private ProcessControllerZuul processControllerZuul; - - /** The dataflow controller zuul */ @Autowired private DataFlowControllerZuul dataFlowControllerZuul; @@ -104,38 +75,17 @@ public class CheckBlockersDataSnapshotCommand extends AbstractEEAEventHandlerCom @Autowired private JobHistoryControllerZuul jobHistoryControllerZuul; - /** The job process controller zuul */ - @Autowired - private JobProcessControllerZuul jobProcessControllerZuul; - @Autowired private UserManagementControllerZull userManagementControllerZull; @Autowired private AdminUserAuthorization adminUserAuthorization; - @Autowired - @Qualifier("dremioJdbcTemplate") - private JdbcTemplate dremioJdbcTemplate; - - @Autowired - private S3Helper s3Helper; - - @Autowired - private S3Service s3Service; - - @Autowired - private TaskRepository taskRepository; /** * The Constant LOG. */ private static final Logger LOG = LoggerFactory.getLogger(CheckBlockersDataSnapshotCommand.class); - /** - * The default release process priority - */ - private int defaultReleaseProcessPriority = 20; - /** * Gets the event type. * @@ -179,9 +129,9 @@ public void execute(EEAEventVO eeaEventVO) throws EEAException { // with one id we take all the datasets with the same dataProviderId and dataflowId DataSetMetabase dataset = - dataSetMetabaseRepository.findById(datasetId).orElse(new DataSetMetabase()); + dataSetMetabaseRepository.findById(datasetId).orElse(new DataSetMetabase()); List datasets = dataSetMetabaseRepository.getDatasetIdsByDataflowIdAndDataProviderId( - dataset.getDataflowId(), dataset.getDataProviderId()); + dataset.getDataflowId(), dataset.getDataProviderId()); Collections.sort(datasets); String dataflowName = null; @@ -203,10 +153,12 @@ public void execute(EEAEventVO eeaEventVO) throws EEAException { parameters.put("userId", userId); parameters.put("datasetId", datasets); parameters.put("silentRelease", silentRelease); + parameters.put("validate", false); if(validationJobId != null){ parameters.put("validationJobId", validationJobId); } - JobVO releaseJob = new JobVO(null, JobTypeEnum.RELEASE, JobStatusEnum.IN_PROGRESS, ts, ts, parameters, user,true, dataset.getDataflowId(), dataset.getDataProviderId(), null,null, dataflowName,null, null, null, null); + + JobVO releaseJob = new JobVO(null, JobTypeEnum.RELEASE, JobStatusEnum.QUEUED, ts, ts, parameters, user, true, dataset.getDataflowId(), dataset.getDataProviderId(), null, null, dataflowName, null, null, null, null); waitForValidationJobIfInProgress(validationJobId, 2000); @@ -221,8 +173,8 @@ public void execute(EEAEventVO eeaEventVO) throws EEAException { value.put(LiteralConstants.USER, user); value.put("release_job_id", releaseJob.getId()); kafkaSenderUtils.releaseNotificableKafkaEvent(EventType.RELEASE_REFUSED_EVENT, value, - NotificationVO.builder().user(user).dataflowId(dataset.getDataflowId()).providerId(dataset.getDataProviderId()) - .error("There is another job with status QUEUED or IN_PROGRESS for dataflowId " + dataset.getDataflowId() + " and providerId " + dataset.getDataProviderId()).build()); + NotificationVO.builder().user(user).dataflowId(dataset.getDataflowId()).providerId(dataset.getDataProviderId()) + .error("There is another job with status QUEUED or IN_PROGRESS for dataflowId " + dataset.getDataflowId() + " and providerId " + dataset.getDataProviderId()).build()); return; } else{ @@ -232,118 +184,19 @@ public void execute(EEAEventVO eeaEventVO) throws EEAException { value.put(LiteralConstants.USER, user); value.put("release_job_id", releaseJob.getId()); kafkaSenderUtils.releaseNotificableKafkaEvent(EventType.SILENT_RELEASE_FAILED_EVENT, value, - NotificationVO.builder().user(user).dataflowId(dataset.getDataflowId()).providerId(dataset.getDataProviderId()) - .error("There is another job with status QUEUED or IN_PROGRESS for dataflowId " + dataset.getDataflowId() + " and providerId " + dataset.getDataProviderId()).build()); + NotificationVO.builder().user(user).dataflowId(dataset.getDataflowId()).providerId(dataset.getDataProviderId()) + .error("There is another job with status QUEUED or IN_PROGRESS for dataflowId " + dataset.getDataflowId() + " and providerId " + dataset.getDataProviderId()).build()); return; } } releaseJob = addReleaseJob(user, dataset, releaseJob, statusToInsert); - // we check if one or more dataset have error, if have we create a notification and abort - // process of releasing - boolean haveBlockers = false; - for (Long id : datasets) { - if (dataflow!=null && dataflow.getBigData()!=null && dataflow.getBigData()) { - S3PathResolver s3PathResolver = new S3PathResolver(dataset.getDataflowId(), dataset.getDataProviderId(), id, S3_VALIDATION); - StringBuilder blockersQueryBuilder = new StringBuilder(); - blockersQueryBuilder.append("select ").append(PARQUET_RECORD_ID_COLUMN_HEADER).append(" from ").append(s3Service.getTableAsFolderQueryPath(s3PathResolver, S3_TABLE_AS_FOLDER_QUERY_PATH)) - .append(" where validation_level='BLOCKER' limit 1"); - if (s3Helper.checkFolderExist(s3PathResolver, S3_VALIDATION_TABLE_PATH)) { - SqlRowSet blockersRowSet = dremioJdbcTemplate.queryForRowSet(blockersQueryBuilder.toString()); - if (blockersRowSet.next()) { - haveBlockers = true; - failRelease(datasetId, user, dataset, releaseJob); - break; - } - } - } else { - setTenant(id); - if (validationRepository.existsByLevelError(ErrorTypeEnum.BLOCKER)) { - haveBlockers = true; - failRelease(datasetId, user, dataset, releaseJob); - break; - } - } - } - - if(validationJobId != null && !haveBlockers) { - List processIds = jobProcessControllerZuul.findProcessesByJobId(validationJobId); - for(String processId: processIds) { - //check if there were canceled tasks that were related to blockers or canceled tasks without a rule id. If the list is not empty we need to fail the release - List canceledBlockerTasks = taskRepository.findAllByProcessIdAndStatusAndLevelErrorBlocker(processId, ProcessStatusEnum.CANCELED.toString()); - if (canceledBlockerTasks != null && canceledBlockerTasks.size() > 0) { - LOG.info("Found canceled tasks with blockers for validationJobId {} and processId {}", validationJobId, processId); - haveBlockers = true; - jobControllerZuul.updateJobInfo(releaseJob.getId(), JobInfoEnum.ERROR_RELEASE_CANCELED_BLOCKERS, null); - failRelease(datasetId, user, dataset, releaseJob); - break; - } - } - - // If none canceled tasks with blocker errors were found check for any canceled tasks and write a warning to Release Job - if (!haveBlockers){ - Task jobHasCanceledTask = taskRepository.findFirstByProcessIdInAndStatus(processIds, ProcessStatusEnum.CANCELED); - if (jobHasCanceledTask != null) { - LOG.info("Found canceled task(s) without blockers for validationJobId {}", validationJobId); - jobControllerZuul.updateJobInfo(releaseJob.getId(), JobInfoEnum.WARNING_HAS_CANCELED_VALIDATION_TASKS, null); - }} - } - // If none blocker errors has found, we have to release datasets one by one - if (!haveBlockers) { - LOG.info( - "Releasing datasets process continues. At this point, the datasets from the dataflowId {}, dataProviderId {} and jobId {} have no blockers", - dataset.getDataflowId(), dataset.getDataProviderId(), releaseJob.getId()); - - LOG.info("Creating the first release process for dataflowId {}, dataProviderId {}, jobId {}", dataset.getDataflowId(), dataset.getDataProviderId(), releaseJob.getId()); - String processId = UUID.randomUUID().toString(); - Boolean isProcessCreated = processControllerZuul.updateProcess(datasets.get(0), dataset.getDataflowId(), - ProcessStatusEnum.IN_PROGRESS, ProcessTypeEnum.RELEASE, processId, user, defaultReleaseProcessPriority, true); - LOG.info("Created the first release process for dataflowId {}, dataProviderId {}, jobId {} and processId {} dataset id {} success: {}", dataset.getDataflowId(), dataset.getDataProviderId(), releaseJob.getId(), processId, datasetId, isProcessCreated); - - CreateSnapshotVO createSnapshotVO = new CreateSnapshotVO(); - createSnapshotVO.setReleased(true); - createSnapshotVO.setAutomatic(Boolean.TRUE); - - //force date to UTC and description to CET - SimpleDateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - Date dateRelease = new Date(); - dateFormatter.setTimeZone(TimeZone.getTimeZone(LiteralConstants.EUROPE_ZONE_ID)); - createSnapshotVO.setDescription("Release " + dateFormatter.format(dateRelease) + " CET"); - dateFormatter.setTimeZone(TimeZone.getTimeZone("UTC")); - - LOG.info("Creating jobProcess for dataflowId {}, dataProviderId {}, jobId {} and release processId {}", dataset.getDataflowId(), dataset.getDataProviderId(), releaseJob.getId(), processId); - JobProcessVO jobProcessVO = new JobProcessVO(null, releaseJob.getId(), processId); - jobProcessControllerZuul.save(jobProcessVO); - LOG.info("Created jobProcess for dataflowId {}, dataProviderId {}, jobId {} and release processId {}", dataset.getDataflowId(), dataset.getDataProviderId(), releaseJob.getId(), processId); - - datasetSnapshotService.addSnapshot(datasets.get(0), createSnapshotVO, null, - dateFormatter.format(dateRelease), false, processId); - } } catch (Exception e) { LOG.error("Unexpected error! Error executing event {}. Message: {}", eeaEventVO, e.getMessage()); } } - private void failRelease(Long datasetId, String user, DataSetMetabase dataset, JobVO releaseJob) throws EEAException { - // Release the locks - datasetSnapshotService.releaseLocksRelatedToRelease(dataset.getDataflowId(), - dataset.getDataProviderId()); - LOG.error( - "Error in the releasing process of the dataflowId {}, dataProviderId {} and jobId {}, the datasets have blocker errors", - dataset.getDataflowId(), dataset.getDataProviderId(), releaseJob.getId()); - - releaseJob.setJobStatus(JobStatusEnum.FAILED); - jobControllerZuul.updateJobStatus(releaseJob.getId(), JobStatusEnum.FAILED); - - kafkaSenderUtils.releaseNotificableKafkaEvent(EventType.RELEASE_BLOCKERS_FAILED_EVENT, null, - NotificationVO.builder() - .user(user) - .datasetId(datasetId) - .error("One or more datasets have blockers errors, Release aborted") - .providerId(dataset.getDataProviderId()).build()); - } - private JobVO addReleaseJob(String user, DataSetMetabase dataset, JobVO releaseJob, JobStatusEnum statusToInsert) { LOG.info("Adding release job for dataflowId {}, dataProviderId {} and creator {} with status {}", dataset.getDataflowId(), dataset.getDataProviderId(), user, statusToInsert); releaseJob = jobControllerZuul.save(releaseJob); @@ -352,15 +205,6 @@ private JobVO addReleaseJob(String user, DataSetMetabase dataset, JobVO releaseJ return releaseJob; } - /** - * Sets the tenant. - * - * @param idDataset the new tenant - */ - private void setTenant(Long idDataset) { - TenantResolver.setTenantName(String.format(LiteralConstants.DATASET_FORMAT_NAME, idDataset)); - } - /** * Waits briefly if the given validation job is still in progress, * to avoid race conditions when triggering the release process. @@ -377,7 +221,7 @@ private void waitForValidationJobIfInProgress(Long validationJobId, long waitMil JobVO validationJob = jobControllerZuul.findJobById(validationJobId); if (validationJob != null && JobStatusEnum.IN_PROGRESS.equals(validationJob.getJobStatus())) { LOG.info("Validation job {} still in progress. Sleeping {} ms before release eligibility check.", - validationJobId, waitMillis); + validationJobId, waitMillis); Thread.sleep(waitMillis); } } catch (InterruptedException ie) { diff --git a/dataset-service/src/main/java/org/eea/dataset/io/kafka/commands/ReleaseDataSnapshotsCommand.java b/dataset-service/src/main/java/org/eea/dataset/io/kafka/commands/ReleaseDataSnapshotsCommand.java index a3c75ae2b5..d32b5fde36 100644 --- a/dataset-service/src/main/java/org/eea/dataset/io/kafka/commands/ReleaseDataSnapshotsCommand.java +++ b/dataset-service/src/main/java/org/eea/dataset/io/kafka/commands/ReleaseDataSnapshotsCommand.java @@ -153,7 +153,9 @@ public void execute(EEAEventVO eeaEventVO) throws EEAException { if(jobId != null){ JobVO jobVO = jobControllerZuul.findJobById(jobId); if(jobVO != null) { - if (!jobVO.getJobStatus().equals(JobStatusEnum.IN_PROGRESS)) { + JobStatusEnum status = jobVO.getJobStatus(); + // Skip only terminal failure/cancel cases. Allow QUEUED, IN_PROGRESS and FINISHED to proceed and run cleanups. + if (JobStatusEnum.FAILED.equals(status) || JobStatusEnum.CANCELED.equals(status) || JobStatusEnum.CANCELED_BY_ADMIN.equals(status)) { return; } else { Map parameters = jobVO.getParameters(); diff --git a/dataset-service/src/main/java/org/eea/dataset/service/ReleasePrecheckService.java b/dataset-service/src/main/java/org/eea/dataset/service/ReleasePrecheckService.java new file mode 100644 index 0000000000..e0867abff6 --- /dev/null +++ b/dataset-service/src/main/java/org/eea/dataset/service/ReleasePrecheckService.java @@ -0,0 +1,284 @@ +package org.eea.dataset.service; + +import org.eea.datalake.service.S3Helper; +import org.eea.datalake.service.S3Service; +import org.eea.datalake.service.model.S3PathResolver; +import org.eea.dataset.persistence.data.repository.ValidationRepository; +import org.eea.dataset.persistence.metabase.domain.Task; +import org.eea.dataset.persistence.metabase.repository.TaskRepository; +import org.eea.exception.EEAException; +import org.eea.interfaces.controller.dataflow.DataFlowController.DataFlowControllerZuul; +import org.eea.interfaces.controller.orchestrator.JobController.JobControllerZuul; +import org.eea.interfaces.controller.orchestrator.JobProcessController.JobProcessControllerZuul; +import org.eea.interfaces.controller.recordstore.ProcessController.ProcessControllerZuul; +import org.eea.interfaces.vo.dataset.CreateSnapshotVO; +import org.eea.interfaces.vo.dataset.DataSetMetabaseVO; +import org.eea.interfaces.vo.dataset.enums.ErrorTypeEnum; +import org.eea.interfaces.vo.orchestrator.JobProcessVO; +import org.eea.interfaces.vo.orchestrator.JobVO; +import org.eea.interfaces.vo.orchestrator.enums.JobInfoEnum; +import org.eea.interfaces.vo.recordstore.enums.ProcessStatusEnum; +import org.eea.interfaces.vo.recordstore.enums.ProcessTypeEnum; +import org.eea.multitenancy.TenantResolver; +import org.eea.thread.ThreadPropertiesManager; +import org.eea.utils.LiteralConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.support.rowset.SqlRowSet; +import org.springframework.security.core.context.SecurityContextHolder; +import org.springframework.stereotype.Service; +import org.springframework.web.server.ResponseStatusException; +import org.springframework.http.HttpStatus; + +import java.text.SimpleDateFormat; +import java.util.*; +import java.util.stream.Collectors; + +import static org.eea.utils.LiteralConstants.DATASET_FORMAT_NAME; +import static org.eea.utils.LiteralConstants.PARQUET_RECORD_ID_COLUMN_HEADER; +import static org.eea.utils.LiteralConstants.S3_TABLE_AS_FOLDER_QUERY_PATH; +import static org.eea.utils.LiteralConstants.S3_VALIDATION; +import static org.eea.utils.LiteralConstants.S3_VALIDATION_TABLE_PATH; + +@Service +public class ReleasePrecheckService { + + private static final Logger LOG = LoggerFactory.getLogger(ReleasePrecheckService.class); + + @Autowired + private JobControllerZuul jobControllerZuul; + + @Autowired + private JobProcessControllerZuul jobProcessControllerZuul; + + @Autowired + private DataFlowControllerZuul dataFlowControllerZuul; + + @Autowired + private ProcessControllerZuul processControllerZuul; + + @Autowired + private ValidationRepository validationRepository; + + @Autowired + private DatasetMetabaseService datasetMetabaseService; + + @Autowired + private DatasetSnapshotService datasetSnapshotService; + + @Autowired + private TaskRepository taskRepository; + + @Autowired + @Qualifier("dremioJdbcTemplate") + private JdbcTemplate dremioJdbcTemplate; + + @Autowired + private S3Helper s3Helper; + + @Autowired + private S3Service s3Service; + + // Default priority used when the first real RELEASE process is created. + private int defaultReleaseProcessPriority = 20; + + /** + * Checks whether a queued RELEASE job is allowed to continue. + * + * This method contains the blocker and canceled-task checks that used to live in the + * old CheckBlockersDataSnapshotCommand. The main difference now is that the RELEASE + * job already exists and is about to start, so we do the checks here just before the + * actual release execution begins. + * + * Corresponding ticket #297462. + */ + public void precheckOrThrow(Long releaseJobId) { + JobVO releaseJob = jobControllerZuul.findJobById(releaseJobId); + Map parameters = releaseJob.getParameters(); + + // Parameters are taken directly from the queued RELEASE job that was created after + // VALIDATION_RELEASE_FINISHED_EVENT. + // Cast in case of id deserialization. + Long dataflowId = Long.valueOf((Integer) parameters.get("dataflowId")); + Long dataProviderId = Long.valueOf((Integer) parameters.get("dataProviderId")); + List datasets = ((List) parameters.get("datasetId")).stream() + .map(obj -> Long.valueOf(String.valueOf(obj))) + .collect(Collectors.toList()); + Long validationJobId = parameters.containsKey("validationJobId") ? Long.valueOf((Integer) parameters.get("validationJobId")): null; + + // For bigData dataflows we cannot use the normal validationRepository check because + // blocker information lives in parquet validation output and must be read through Dremio. + final boolean isBigData = dataFlowControllerZuul.isBigDataflow(dataflowId); + + boolean haveBlockers = false; + for (Long datasetId : datasets) { + if (isBigData) { + if (hasBigDataBlockers(dataflowId, dataProviderId, datasetId)) { + haveBlockers = true; + break; + } + } else { + // For non-bigdata, validationRepository reads from the dataset_id schema. + setTenant(datasetId); + if (validationRepository.existsByLevelError(ErrorTypeEnum.BLOCKER)) { + haveBlockers = true; + break; + } + } + } + + // If at least one dataset has a BLOCKER, the RELEASE must stop immediately. + if (haveBlockers) { + throw new ResponseStatusException(HttpStatus.PRECONDITION_FAILED, + "One or more datasets have blockers errors, Release aborted"); + } + + // After blockers, inspect the validation job that preceded this RELEASE. + if (validationJobId != null) { + List processIds = jobProcessControllerZuul.findProcessesByJobId(validationJobId); + for (String processId : processIds) { + //check if there were canceled tasks that were related to blockers or canceled tasks without a rule id. If the list is not empty we need to fail the release. + List canceledBlockerTasks = taskRepository.findAllByProcessIdAndStatusAndLevelErrorBlocker( + processId, ProcessStatusEnum.CANCELED.toString()); + if (canceledBlockerTasks != null && canceledBlockerTasks.size() > 0) { + LOG.info("Found canceled tasks with blockers for validationJobId {} and processId {}", validationJobId, processId); + jobControllerZuul.updateJobInfo(releaseJobId, JobInfoEnum.ERROR_RELEASE_CANCELED_BLOCKERS, null); + throw new ResponseStatusException(HttpStatus.PRECONDITION_FAILED, + "Release aborted: canceled validation tasks with blocker errors found"); + } + } + + // If none canceled tasks with blocker errors were found check for any canceled tasks and write a warning to Release Job. + Task jobHasCanceledTask = taskRepository.findFirstByProcessIdInAndStatus(processIds, ProcessStatusEnum.CANCELED); + if (jobHasCanceledTask != null) { + LOG.info("Found canceled task(s) without blockers for validationJobId {}", validationJobId); + jobControllerZuul.updateJobInfo(releaseJobId, JobInfoEnum.WARNING_HAS_CANCELED_VALIDATION_TASKS, null); + } + } + } + + /** + * Starts the real queued RELEASE job. + * + * This method intentionally stays close to the old CheckBlockersDataSnapshotCommand logic: + * - sort datasets + * - pick the first dataset + * - clear release-related locks + * - create the first RELEASE process + * - create the job_process relation + * - start the first snapshot + * + * After this first dataset starts, the existing one-by-one release flow continues through + * ReleaseDataSnapshotsCommand as before. + */ + public void startQueuedReleaseJob(Long jobId) throws EEAException { + JobVO releaseJob = jobControllerZuul.findJobById(jobId); + Map parameters = releaseJob.getParameters(); + List datasets = ((List) parameters.get("datasetId")).stream() + .map(obj -> Long.valueOf(String.valueOf(obj))) + .collect(Collectors.toList()); + + Collections.sort(datasets); + + // The first dataset is used to kick off the actual RELEASE process. + Long firstDatasetId = datasets.get(0); + DataSetMetabaseVO dataset = datasetMetabaseService.findDatasetMetabase(firstDatasetId); + String user = releaseJob.getCreatorUsername() != null + ? releaseJob.getCreatorUsername() + : SecurityContextHolder.getContext().getAuthentication().getName(); + + // Keep the effective user in thread context because deeper layers already rely on it. + ThreadPropertiesManager.setVariable("user", user); + + // Validation-for-release leaves locks that are useful during validation but must be removed + // before the real RELEASE starts, otherwise the release kickoff is blocked. + datasetSnapshotService.releaseLocksRelatedToRelease(dataset.getDataflowId(), dataset.getDataProviderId()); + + LOG.info( + "Releasing datasets process continues. At this point, the datasets from the dataflowId {}, dataProviderId {} and jobId {} have no blockers", + dataset.getDataflowId(), dataset.getDataProviderId(), releaseJob.getId()); + + LOG.info("Creating the first release process for dataflowId {}, dataProviderId {}, jobId {}", + dataset.getDataflowId(), dataset.getDataProviderId(), releaseJob.getId()); + + // Create the first RELEASE process in recordstore. + String processId = UUID.randomUUID().toString(); + Boolean isProcessCreated = processControllerZuul.updateProcess( + firstDatasetId, + dataset.getDataflowId(), + ProcessStatusEnum.IN_PROGRESS, + ProcessTypeEnum.RELEASE, + processId, + user, + defaultReleaseProcessPriority, + true); + + LOG.info("Created the first release process for dataflowId {}, dataProviderId {}, jobId {} and processId {} dataset id {} success: {}", + dataset.getDataflowId(), dataset.getDataProviderId(), releaseJob.getId(), processId, firstDatasetId, isProcessCreated); + + // Build the snapshot payload exactly like the old release kickoff logic did. + CreateSnapshotVO createSnapshotVO = new CreateSnapshotVO(); + createSnapshotVO.setReleased(true); + createSnapshotVO.setAutomatic(Boolean.TRUE); + + // Description is stored as CET text, while the release date sent downstream is UTC. + SimpleDateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + Date dateRelease = new Date(); + dateFormatter.setTimeZone(TimeZone.getTimeZone(LiteralConstants.EUROPE_ZONE_ID)); + createSnapshotVO.setDescription("Release " + dateFormatter.format(dateRelease) + " CET"); + dateFormatter.setTimeZone(TimeZone.getTimeZone("UTC")); + + LOG.info("Creating jobProcess for dataflowId {}, dataProviderId {}, jobId {} and release processId {}", + dataset.getDataflowId(), dataset.getDataProviderId(), releaseJob.getId(), processId); + + // Persist the link between the RELEASE job and the first process. + JobProcessVO jobProcessVO = new JobProcessVO(null, releaseJob.getId(), processId); + jobProcessControllerZuul.save(jobProcessVO); + + LOG.info("Created jobProcess for dataflowId {}, dataProviderId {}, jobId {} and release processId {}", + dataset.getDataflowId(), dataset.getDataProviderId(), releaseJob.getId(), processId); + + // Start the first actual snapshot creation. From here the standard one-by-one release + // continuation takes over. + datasetSnapshotService.addSnapshot( + firstDatasetId, + createSnapshotVO, + null, + dateFormatter.format(dateRelease), + false, + processId + ); + } + + /** + * Big-data blocker check. + * + * For big-data dataflows, blockers are stored in parquet validation output instead of the + * normal validation schema, so they must be queried through Dremio. + */ + private boolean hasBigDataBlockers(Long dataflowId, Long providerId, Long datasetId) { + S3PathResolver s3PathResolver = new S3PathResolver(dataflowId, providerId, datasetId, S3_VALIDATION); + if (!s3Helper.checkFolderExist(s3PathResolver, S3_VALIDATION_TABLE_PATH)) { + return false; + } + StringBuilder blockersQueryBuilder = new StringBuilder(); + blockersQueryBuilder.append("select ") + .append(PARQUET_RECORD_ID_COLUMN_HEADER) + .append(" from ") + .append(s3Service.getTableAsFolderQueryPath(s3PathResolver, S3_TABLE_AS_FOLDER_QUERY_PATH)) + .append(" where validation_level='BLOCKER' limit 1"); + SqlRowSet blockersRowSet = dremioJdbcTemplate.queryForRowSet(blockersQueryBuilder.toString()); + return blockersRowSet.next(); + } + + /** + * Switches tenant to the dataset schema so validationRepository points to the correct + * dataset_ validation tables. + */ + private void setTenant(Long idDataset) { + TenantResolver.setTenantName(String.format(DATASET_FORMAT_NAME, idDataset)); + } +} diff --git a/dataset-service/src/test/java/org/eea/dataset/io/kafka/commands/CheckBlockersDataSnapshotCommandTest.java b/dataset-service/src/test/java/org/eea/dataset/io/kafka/commands/CheckBlockersDataSnapshotCommandTest.java index 782f5068ce..e2712ef27f 100644 --- a/dataset-service/src/test/java/org/eea/dataset/io/kafka/commands/CheckBlockersDataSnapshotCommandTest.java +++ b/dataset-service/src/test/java/org/eea/dataset/io/kafka/commands/CheckBlockersDataSnapshotCommandTest.java @@ -132,12 +132,9 @@ public void testExecuteWithoutBlockers() throws EEAException { JobVO jobVO = new JobVO(Long.valueOf(1), JobTypeEnum.RELEASE, JobStatusEnum.QUEUED, new Timestamp(System.currentTimeMillis()), new Timestamp(System.currentTimeMillis()), null, "test", true, 1L, 1L, 1L, null, null, null, null, null, null); Mockito.when(jobControllerZuul.save(any(JobVO.class))).thenReturn(jobVO); Mockito.doNothing().when(jobHistoryControllerZuul).save(any(JobVO.class)); - Mockito.when(validationRepository.existsByLevelError(ErrorTypeEnum.BLOCKER)).thenReturn(false); ProcessVO processVO = new ProcessVO(); processVO.setProcessId("jkhiuh"); - Mockito.when(processControllerZuul.updateProcess(anyLong(), anyLong(), any(ProcessStatusEnum.class), any(ProcessTypeEnum.class), anyString(), anyString(), anyInt(), anyBoolean())).thenReturn(true); JobProcessVO jobProcessVO = new JobProcessVO(Long.valueOf(1), Long.valueOf(1), "jkhiuh"); - Mockito.when(jobProcessControllerZuul.save(any(JobProcessVO.class))).thenReturn(jobProcessVO); checkBlockersDataSnapshotCommand.execute(eeaEventVO); Mockito.verify(dataSetMetabaseRepository, times(1)).findById(1L); @@ -170,12 +167,10 @@ public void testExecuteWithBlockers() throws EEAException { datasetMetabase.getDataflowId(), datasetMetabase.getDataProviderId())) .thenReturn(datasetsId); Mockito.when(jobControllerZuul.checkEligibilityOfJob(anyString(), anyBoolean(), anyLong(), anyLong(), anyList())).thenReturn(JobStatusEnum.QUEUED); - JobVO jobVO = new JobVO(Long.valueOf(1), JobTypeEnum.RELEASE, JobStatusEnum.QUEUED, new Timestamp(System.currentTimeMillis()), new Timestamp(System.currentTimeMillis()), null, "test", true, 1L, 1L, 1L, null, null, null, null, null, null); + JobVO jobVO = new JobVO(Long.valueOf(1), JobTypeEnum.RELEASE, JobStatusEnum.QUEUED, new Timestamp(System.currentTimeMillis()), new Timestamp(System.currentTimeMillis()), null, "test", true, 1L, 1L, 1L, null, null, null, null, null,null); Mockito.when(jobControllerZuul.save(any(JobVO.class))).thenReturn(jobVO); - Mockito.when(validationRepository.existsByLevelError(ErrorTypeEnum.BLOCKER)).thenReturn(true); checkBlockersDataSnapshotCommand.execute(eeaEventVO); - Mockito.verify(validationRepository, times(1)).existsByLevelError(ErrorTypeEnum.BLOCKER); - + Mockito.verifyNoInteractions(validationRepository); } } diff --git a/dataset-service/src/test/java/org/eea/dataset/service/ReleasePrecheckServiceTest.java b/dataset-service/src/test/java/org/eea/dataset/service/ReleasePrecheckServiceTest.java new file mode 100644 index 0000000000..397a8b9af3 --- /dev/null +++ b/dataset-service/src/test/java/org/eea/dataset/service/ReleasePrecheckServiceTest.java @@ -0,0 +1,135 @@ +package org.eea.dataset.service; + +import org.eea.dataset.persistence.data.repository.ValidationRepository; +import org.eea.dataset.persistence.metabase.domain.Task; +import org.eea.dataset.persistence.metabase.repository.TaskRepository; +import org.eea.interfaces.controller.dataflow.DataFlowController.DataFlowControllerZuul; +import org.eea.interfaces.controller.orchestrator.JobController.JobControllerZuul; +import org.eea.interfaces.controller.orchestrator.JobProcessController.JobProcessControllerZuul; +import org.eea.interfaces.vo.dataset.enums.ErrorTypeEnum; +import org.eea.interfaces.vo.orchestrator.JobVO; +import org.eea.interfaces.vo.orchestrator.enums.JobInfoEnum; +import org.eea.interfaces.vo.recordstore.enums.ProcessStatusEnum; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +import org.springframework.web.server.ResponseStatusException; + +import java.sql.Timestamp; +import java.util.*; + +import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; + +@RunWith(MockitoJUnitRunner.class) +public class ReleasePrecheckServiceTest { + + @InjectMocks + private ReleasePrecheckService releasePrecheckService; + + @Mock + private JobControllerZuul jobControllerZuul; + + @Mock + private JobProcessControllerZuul jobProcessControllerZuul; + + @Mock + private DataFlowControllerZuul dataFlowControllerZuul; + + @Mock + private ValidationRepository validationRepository; + + @Mock + private TaskRepository taskRepository; + + @Mock + private org.eea.datalake.service.S3Helper s3Helper; + + @Mock + private org.eea.datalake.service.S3Service s3Service; + + private Map parameters; + + @Before + public void setUp() { + parameters = new HashMap<>(); + parameters.put("dataflowId", 61); + parameters.put("dataProviderId", 2); + parameters.put("datasetId", Arrays.asList(750L)); + parameters.put("validationJobId", 1892); + + JobVO releaseJob = new JobVO(1947L, null, null, new Timestamp(System.currentTimeMillis()), new Timestamp(System.currentTimeMillis()), parameters, "user1", true, 61L, 2L, null, null, null, null, null, null, null); + + Mockito.when(jobControllerZuul.findJobById(1947L)).thenReturn(releaseJob); + Mockito.when(dataFlowControllerZuul.isBigDataflow(61L)).thenReturn(false); + } + + @Test + public void testWithBlockers() { + Mockito.when(validationRepository.existsByLevelError(ErrorTypeEnum.BLOCKER)).thenReturn(true); + + try { + releasePrecheckService.precheckOrThrow(1947L); + fail("Expected ResponseStatusException to be thrown"); + } catch (ResponseStatusException e) { + // expected + } + + Mockito.verify(validationRepository, times(1)).existsByLevelError(ErrorTypeEnum.BLOCKER); + Mockito.verifyNoInteractions(jobProcessControllerZuul); + Mockito.verify(jobControllerZuul, never()).updateJobInfo(anyLong(), any(), any()); + } + + @Test + public void testWithCanceledBlockerTasks() { + Mockito.when(validationRepository.existsByLevelError(ErrorTypeEnum.BLOCKER)).thenReturn(false); + Mockito.when(jobProcessControllerZuul.findProcessesByJobId(1892L)).thenReturn(Collections.singletonList("proc-1")); + + Task canceledBlockerTask = new Task(); + Mockito.when(taskRepository.findAllByProcessIdAndStatusAndLevelErrorBlocker("proc-1", ProcessStatusEnum.CANCELED.toString())).thenReturn(Collections.singletonList(canceledBlockerTask)); + + try { + releasePrecheckService.precheckOrThrow(1947L); + fail("Expected ResponseStatusException to be thrown"); + } catch (ResponseStatusException e) { + // expected + } + + Mockito.verify(jobControllerZuul, times(1)).updateJobInfo(eq(1947L), eq(JobInfoEnum.ERROR_RELEASE_CANCELED_BLOCKERS), isNull()); + } + + @Test + public void testWithCanceledNonBlockerTasks() { + Mockito.when(validationRepository.existsByLevelError(ErrorTypeEnum.BLOCKER)).thenReturn(false); + Mockito.when(jobProcessControllerZuul.findProcessesByJobId(1892L)).thenReturn(Collections.singletonList("proc-1")); + + Mockito.when(taskRepository.findAllByProcessIdAndStatusAndLevelErrorBlocker("proc-1", ProcessStatusEnum.CANCELED.toString())).thenReturn(Collections.emptyList()); + + Task canceledTask = new Task(); + Mockito.when(taskRepository.findFirstByProcessIdInAndStatus(anyList(), eq(ProcessStatusEnum.CANCELED))).thenReturn(canceledTask); + + releasePrecheckService.precheckOrThrow(1947L); + + Mockito.verify(jobControllerZuul, times(1)).updateJobInfo(eq(1947L), eq(JobInfoEnum.WARNING_HAS_CANCELED_VALIDATION_TASKS), isNull()); + } + + @Test + public void testWithoutIssues() { + Mockito.when(validationRepository.existsByLevelError(ErrorTypeEnum.BLOCKER)).thenReturn(false); + Mockito.when(jobProcessControllerZuul.findProcessesByJobId(1892L)).thenReturn(Collections.singletonList("proc-1")); + + Mockito.when(taskRepository.findAllByProcessIdAndStatusAndLevelErrorBlocker("proc-1", ProcessStatusEnum.CANCELED.toString())).thenReturn(Collections.emptyList()); + + Mockito.when(taskRepository.findFirstByProcessIdInAndStatus(anyList(), eq(ProcessStatusEnum.CANCELED))).thenReturn(null); + + releasePrecheckService.precheckOrThrow(1947L); + + Mockito.verify(jobControllerZuul, never()).updateJobInfo(eq(1947L), eq(JobInfoEnum.ERROR_RELEASE_CANCELED_BLOCKERS), isNull()); + Mockito.verify(jobControllerZuul, never()).updateJobInfo(eq(1947L), eq(JobInfoEnum.WARNING_HAS_CANCELED_VALIDATION_TASKS), isNull()); + } +} \ No newline at end of file diff --git a/orchestrator-service/src/main/java/org/eea/orchestrator/io/notification/events/ReleaseValidationBlockersFailEvent.java b/orchestrator-service/src/main/java/org/eea/orchestrator/io/notification/events/ReleaseValidationBlockersFailEvent.java new file mode 100644 index 0000000000..c7a76db469 --- /dev/null +++ b/orchestrator-service/src/main/java/org/eea/orchestrator/io/notification/events/ReleaseValidationBlockersFailEvent.java @@ -0,0 +1,54 @@ +package org.eea.orchestrator.io.notification.events; + +import org.eea.exception.EEAException; +import org.eea.interfaces.controller.dataset.DatasetController.DataSetControllerZuul; +import org.eea.kafka.domain.EventType; +import org.eea.kafka.domain.NotificationVO; +import org.eea.notification.event.NotificableEventHandler; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.Map; + +/** + * The Class RestoreDatasetSnapshotFailedEvent. + */ +@Component +public class ReleaseValidationBlockersFailEvent implements NotificableEventHandler { + + + @Autowired + private DataSetControllerZuul datasetService; + + /** + * Gets the event type. + * + * @return the event type + */ + @Override + public EventType getEventType() { + return EventType.RELEASE_BLOCKERS_FAILED_EVENT; + } + + /** + * Gets the map. + * + * @param notificationVO the notification VO + * @return the map + * @throws EEAException the EEA exception + */ + @Override + public Map getMap(NotificationVO notificationVO) throws EEAException { + Long dataflowId = notificationVO.getDataflowId() != null ? notificationVO.getDataflowId() + : datasetService.getDataFlowIdById(notificationVO.getDatasetId()); + + Map notification = new HashMap<>(); + notification.put("user", notificationVO.getUser()); + notification.put("dataflowId", dataflowId); + notification.put("providerId", notificationVO.getProviderId()); + notification.put("error", notificationVO.getError()); + return notification; + } + +} diff --git a/orchestrator-service/src/main/java/org/eea/orchestrator/scheduling/JobForExecutingQueuedJobs.java b/orchestrator-service/src/main/java/org/eea/orchestrator/scheduling/JobForExecutingQueuedJobs.java index 6e445ee71a..b55e9913db 100644 --- a/orchestrator-service/src/main/java/org/eea/orchestrator/scheduling/JobForExecutingQueuedJobs.java +++ b/orchestrator-service/src/main/java/org/eea/orchestrator/scheduling/JobForExecutingQueuedJobs.java @@ -53,7 +53,7 @@ private void init() { ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); scheduler.initialize(); scheduler.schedule(() -> executeQueuedJobs(), - new CronTrigger("0 */1 * * * *")); + new CronTrigger("0 */1 * * * *")); } /** @@ -62,7 +62,7 @@ private void init() { public void executeQueuedJobs() { try { List jobs = jobService.getJobsByStatus(JobStatusEnum.QUEUED); - if(jobs == null || jobs.size() == 0){ + if(jobs == null || jobs.isEmpty()){ return; } LOG.info("Running scheduled task executeQueuedJobs"); @@ -74,18 +74,28 @@ public void executeQueuedJobs() { LOG.info("Job with id {} and of type {} can not be executed right now.", job.getId(), job.getJobType().getValue()); continue; } + LOG.info("Trying to execute job with id {} and of type {}", job.getId(), job.getJobType().getValue()); - if (job.getJobType() == JobTypeEnum.VALIDATION && !job.isRelease()) { - LOG.info("Job with id {} and of type {} will be executed.", job.getId(), job.getJobType().getValue()); + if (job.getJobType() == JobTypeEnum.VALIDATION) { + LOG.info("Validation job with id {} will be executed. release={}", job.getId(), job.isRelease()); //call validation mechanism jobService.prepareAndExecuteValidationJob(job); - } else if (job.getJobType() == JobTypeEnum.VALIDATION && job.isRelease()) { - //check if another release is already running for the dataflow, but for another provider - //TODO for #297462 remove following check because validations for release will be executed in parallel for the same dataflow id + } else if (job.getJobType() == JobTypeEnum.RELEASE) { if (!jobService.canExecuteReleaseOnDataflow(job.getDataflowId())) { + LOG.info("Release job with id {} can not be executed because another release is already running for dataflowId {}", job.getId(), job.getDataflowId()); continue; } - LOG.info("Job with id {} and of type {} will be executed.", job.getId(), job.getJobType().getValue()); + + LOG.info("Release job with id {} will be executed.", job.getId()); + jobService.updateJobStatus(job.getId(), JobStatusEnum.IN_PROGRESS); + try { + jobService.precheckReleaseJobOrThrow(job.getId()); + } catch (Exception e) { + jobService.failReleaseAfterPrecheckException(job, e); + continue; + } + + LOG.info("Release job with id {} passed precheck. Starting release execution.", job.getId()); //call release mechanism jobService.prepareAndExecuteReleaseJob(job); } else if (job.getJobType() == JobTypeEnum.FILE_EXPORT) { diff --git a/orchestrator-service/src/main/java/org/eea/orchestrator/service/JobService.java b/orchestrator-service/src/main/java/org/eea/orchestrator/service/JobService.java index d5497b0b1e..19d639d9ac 100644 --- a/orchestrator-service/src/main/java/org/eea/orchestrator/service/JobService.java +++ b/orchestrator-service/src/main/java/org/eea/orchestrator/service/JobService.java @@ -97,4 +97,9 @@ JobsVO getJobs(Pageable pageable, boolean asc, String sortedColumn, Long jobId, List findActiveJobsRelatedToADatasetId(Long datasetId, Long dataflowId, Long providerId); - JobStatusEnum checkEligibilityOfPreparationJob(String jobType, Long datasetId, String preparationCode);} + JobStatusEnum checkEligibilityOfPreparationJob(String jobType, Long datasetId, String preparationCode); + + void precheckReleaseJobOrThrow(Long releaseJobId); + + void failReleaseAfterPrecheckException(JobVO job, Exception e) throws EEAException; +} diff --git a/orchestrator-service/src/main/java/org/eea/orchestrator/service/impl/JobServiceImpl.java b/orchestrator-service/src/main/java/org/eea/orchestrator/service/impl/JobServiceImpl.java index 28192a68de..cb03288d6a 100644 --- a/orchestrator-service/src/main/java/org/eea/orchestrator/service/impl/JobServiceImpl.java +++ b/orchestrator-service/src/main/java/org/eea/orchestrator/service/impl/JobServiceImpl.java @@ -341,21 +341,45 @@ private boolean isDatasetInActiveReleaseOrValidationJob(List datasetIds) { public void prepareAndExecuteValidationJob(JobVO jobVO) { Job job = jobMapper.classToEntity(jobVO); Map parameters = job.getParameters(); + + // Validation for release block. + if (jobVO.isRelease()) { + Long dataflowId = Long.valueOf((Integer) parameters.get("dataflowId")); + Long dataProviderId = Long.valueOf((Integer) parameters.get("dataProviderId")); + boolean restrictFromPublic = false; + Object restrictObj = parameters.get("restrictFromPublic"); + if (restrictObj instanceof Boolean) { + restrictFromPublic = (Boolean) restrictObj; + } + + boolean validate = true; + Object validateObj = parameters.get("validate"); + if (validateObj instanceof Boolean) { + validate = (Boolean) validateObj; + } + + dataSetSnapshotControllerZuul.createReleaseSnapshots(dataflowId, dataProviderId, restrictFromPublic, validate, job.getId()); + return; + } + Long datasetId = Long.valueOf((Integer) parameters.get("datasetId")); - Boolean released = (Boolean) parameters.get("released"); + boolean released = false; + Object releasedObj = parameters.get("released"); String preparationCode = jobVO.getPreparationCode(); + if (releasedObj instanceof Boolean) { + released = (Boolean) releasedObj; + } + validationControllerZuul.validateDataSetData(datasetId, released, job.getId(), preparationCode); } + /** + * Calls startQueuedReleaseJob from dataset-service to start the process creation for the QUEUED RELEASE job. + * @param jobVO + */ @Override public void prepareAndExecuteReleaseJob(JobVO jobVO) { - Job job = jobMapper.classToEntity(jobVO); - Map parameters = job.getParameters(); - Long dataflowId = Long.valueOf((Integer) parameters.get("dataflowId")); - Long dataProviderId = Long.valueOf((Integer) parameters.get("dataProviderId")); - Boolean restrictFromPublic = (Boolean) parameters.get("restrictFromPublic"); - Boolean validate = (Boolean) parameters.get("validate"); - dataSetSnapshotControllerZuul.createReleaseSnapshots(dataflowId, dataProviderId, restrictFromPublic, validate, jobVO.getId()); + dataSetSnapshotControllerZuul.startQueuedReleaseJob(jobVO.getId()); } @Override @@ -440,11 +464,8 @@ public JobVO findById(Long jobId) { @Override public boolean canExecuteReleaseOnDataflow(Long dataflowId) { - List jobs = jobRepository.findByDataflowIdAndJobTypeInAndJobStatusAndRelease(dataflowId, Arrays.asList(JobTypeEnum.VALIDATION, JobTypeEnum.RELEASE), JobStatusEnum.IN_PROGRESS, true); - if (jobs.size() > 0) { - return false; - } - return true; + List jobs = jobRepository.findByDataflowIdAndJobTypeInAndJobStatusAndRelease(dataflowId, Arrays.asList(JobTypeEnum.RELEASE), JobStatusEnum.IN_PROGRESS, true); + return jobs == null || jobs.isEmpty(); } @Override @@ -875,4 +896,49 @@ private boolean inProgressOrQueuedJobExists(Long datasetId, String preparationCo preparationCode ); } + + /** + * Calls precheckReleaseJob from dataset-service to evaluate if the RELEASE can create processes. + */ + @Override + public void precheckReleaseJobOrThrow(Long jobId) { + dataSetSnapshotControllerZuul.precheckReleaseJob(jobId); + } + + /** + * This part was originally in the old CheckBlockersDataSnapshotCommand implementation. Since the RELEASE job is now + * set IN_PROGRESS in orchestrator, the FAILED status is also moved in orchestrator. + */ + @Override + public void failReleaseAfterPrecheckException(JobVO job, Exception e) throws EEAException { + updateJobStatus(job.getId(), JobStatusEnum.FAILED); + + String message = (e.getMessage() != null && !e.getMessage().isBlank()) + ? e.getMessage() + : "One or more datasets have blockers errors, Release aborted"; + + boolean silentRelease = false; + if (job.getParameters() != null && job.getParameters().get("silentRelease") instanceof Boolean) { + silentRelease = (Boolean) job.getParameters().get("silentRelease"); + } + + EventType eventType = silentRelease + ? EventType.SILENT_RELEASE_FAILED_EVENT + : EventType.RELEASE_BLOCKERS_FAILED_EVENT; + + try{ + kafkaSenderUtils.releaseNotificableKafkaEvent(eventType, null, + NotificationVO.builder() + .user(job.getCreatorUsername()) + .dataflowId(job.getDataflowId()) + .datasetId(job.getDatasetId()) + .error(message) + .providerId(job.getProviderId()).build()); + } finally { + if (job.getDataflowId() != null && job.getProviderId() != null) { + // Always clean uup locks on failure. + dataSetSnapshotControllerZuul.releaseLocksFromReleaseDatasets(job.getDataflowId(), job.getProviderId()); + } + } + } } diff --git a/validation-service/src/main/java/org/eea/validation/util/ValidationHelper.java b/validation-service/src/main/java/org/eea/validation/util/ValidationHelper.java index ec9d665a64..633e11e575 100644 --- a/validation-service/src/main/java/org/eea/validation/util/ValidationHelper.java +++ b/validation-service/src/main/java/org/eea/validation/util/ValidationHelper.java @@ -1721,7 +1721,7 @@ private boolean checkFinishedValidations(Long datasetId, String processId, Long true); } } - else if (processControllerZuul.isProcessFinished(processId)) { + else { if (jobId != null) { jobControllerZuul.updateJobStatus(jobId, JobStatusEnum.FINISHED); }