From 20403ee15df3519608157bde27ae903cc9c824d0 Mon Sep 17 00:00:00 2001 From: Christos Kalaitzis Date: Thu, 29 Jan 2026 11:59:58 +0200 Subject: [PATCH] preparation datasets create empty tables usefull for validation --- .../datalake/service/impl/S3ServiceImpl.java | 21 +- .../service/model/S3PathResolver.java | 9 + .../java/org/eea/utils/LiteralConstants.java | 9 + .../PreparationDatasetControllerImpl.java | 13 + .../dataset/service/CreateEmptyTables.java | 6 +- .../service/impl/CreateEmptyTablesImpl.java | 334 ++++++++------ .../impl/CreateEmptyTablesImplTest.java | 407 ++++++++++++++++++ 7 files changed, 674 insertions(+), 125 deletions(-) create mode 100644 dataset-service/src/test/java/org/eea/dataset/service/impl/CreateEmptyTablesImplTest.java diff --git a/common-utitlities/src/main/java/org/eea/datalake/service/impl/S3ServiceImpl.java b/common-utitlities/src/main/java/org/eea/datalake/service/impl/S3ServiceImpl.java index 6e32d5743d..38754ae909 100644 --- a/common-utitlities/src/main/java/org/eea/datalake/service/impl/S3ServiceImpl.java +++ b/common-utitlities/src/main/java/org/eea/datalake/service/impl/S3ServiceImpl.java @@ -14,9 +14,6 @@ import org.springframework.context.annotation.Primary; import org.springframework.stereotype.Service; -import java.text.SimpleDateFormat; -import java.util.Date; - import static org.eea.utils.LiteralConstants.*; @@ -234,6 +231,24 @@ private String calculateS3TableAsFolderPath(S3PathResolver s3PathResolver, Strin String dataCollectionFolder = formatFolderName(s3PathResolver.getDatasetId(), S3_DATA_COLLECTION_PATTERN); String euDatasetFolder = formatFolderName(s3PathResolver.getDatasetId(), S3_EU_DATASET_PATTERN); + String preparationSetCode = s3PathResolver.getPreparationSetCode(); + + // handling preparation dataset case + if (StringUtils.isNotBlank(preparationSetCode)) { + + switch (path) { + case S3_TABLE_AS_FOLDER_QUERY_PATH: + return S3_DEFAULT_BUCKET + String.format(S3_TABLE_NAME_PREPARATION_DATASET_FOLDER_PATH, dataflowFolder, dataProviderFolder, datasetFolder, preparationSetCode, s3PathResolver.getTableName()); + case S3_TABLE_NAME_FOLDER_PATH: + case S3_TABLE_NAME_PATH: + return String.format(S3_TABLE_NAME_PREPARATION_DATASET_FOLDER_PATH, dataflowFolder, dataProviderFolder, datasetFolder, preparationSetCode, s3PathResolver.getTableName()); + case S3_TABLE_NAME_QUERY_PATH: + return String.format(S3_TABLE_NAME_PREPARATION_DATASET_QUERY_PATH, dataflowFolder, dataProviderFolder, datasetFolder, preparationSetCode, s3PathResolver.getTableName()); + default: + break; + } + } + switch (path) { case S3_EXPORT_PREFILLED_TABLE_FILE_PATH: case S3_IMPORT_FILE_PATH: diff --git a/common-utitlities/src/main/java/org/eea/datalake/service/model/S3PathResolver.java b/common-utitlities/src/main/java/org/eea/datalake/service/model/S3PathResolver.java index 1d8316a690..6290ce536a 100644 --- a/common-utitlities/src/main/java/org/eea/datalake/service/model/S3PathResolver.java +++ b/common-utitlities/src/main/java/org/eea/datalake/service/model/S3PathResolver.java @@ -46,6 +46,15 @@ public class S3PathResolver { /** The isIcebergTable. */ private Boolean isIcebergTable; + /** Name of the preparation dataset, if applicable */ + private String preparationSetCode; + + public S3PathResolver(long dataflowId, long dataProviderId, long datasetId, + String tableName, String filename, String path, String preparationSetName) { + this(dataflowId, dataProviderId, datasetId, tableName, filename, path); + this.preparationSetCode = preparationSetName; + } + public S3PathResolver(long dataflowId, long dataProviderId, long datasetId) { this.dataflowId = dataflowId; this.dataProviderId = dataProviderId; diff --git a/common-utitlities/src/main/java/org/eea/utils/LiteralConstants.java b/common-utitlities/src/main/java/org/eea/utils/LiteralConstants.java index 43bb64431e..9f6e6cd73a 100644 --- a/common-utitlities/src/main/java/org/eea/utils/LiteralConstants.java +++ b/common-utitlities/src/main/java/org/eea/utils/LiteralConstants.java @@ -184,6 +184,15 @@ public final class LiteralConstants { /** The Constant NO_IMPORT_IN_PROGRESS. */ public static final String NO_IMPORT_IN_PROGRESS = "There is no import process in progress"; + /** The Constant S3_PREPARATION_DATASET_PATH: {@value}. */ + public static final String S3_PREPARATION_DATASET_PATH = "%s/%s/%s/current/preparation/%s/%s"; + + /** The Constant S3_TABLE_NAME_PREPARATION_DATASET_FOLDER_PATH: {@value}. */ + public static final String S3_TABLE_NAME_PREPARATION_DATASET_FOLDER_PATH = "%s/%s/%s/current/preparation/%s/%s/%s"; + + /** The Constant S3_TABLE_NAME_PREPARATION_DATASET_QUERY_PATH: {@value}. */ + public static final String S3_TABLE_NAME_PREPARATION_DATASET_QUERY_PATH = ".\"%s\".\"%s\".\"%s\".\"current\".\"preparation\".\"%s\".\"%s\".\"%s\""; + /** The Constant S3_NAME_PATTERN_LENGTH: {@value}. */ public static final int S3_NAME_PATTERN_LENGTH = 7; diff --git a/dataset-service/src/main/java/org/eea/dataset/controller/PreparationDatasetControllerImpl.java b/dataset-service/src/main/java/org/eea/dataset/controller/PreparationDatasetControllerImpl.java index cc7285873f..017e8d3d73 100644 --- a/dataset-service/src/main/java/org/eea/dataset/controller/PreparationDatasetControllerImpl.java +++ b/dataset-service/src/main/java/org/eea/dataset/controller/PreparationDatasetControllerImpl.java @@ -8,9 +8,11 @@ import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; import org.apache.commons.lang3.StringUtils; +import org.eea.dataset.service.CreateEmptyTables; import org.eea.dataset.service.PreparationDatasetService; import org.eea.exception.EEAException; import org.eea.interfaces.controller.dataset.PreparationDatasetController; +import org.eea.interfaces.vo.dataset.DataSetMetabaseVO; import org.eea.interfaces.vo.dataset.PreparationDatasetVO; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +41,9 @@ public class PreparationDatasetControllerImpl @Autowired private PreparationDatasetService preparationDatasetService; + @Autowired + private CreateEmptyTables createEmptyTables; + /** * List preparation datasets by dataflow id and provider id. */ @@ -168,4 +173,12 @@ public void deletePreparationDatasetById( "Unexpected error while deleting preparation dataset"); } } + + @PostMapping("/run-creation-for-preparation-dataset") + @ApiOperation("Create preparation datasets inside Dremio") + public void runCreationForPreparationDataset( + @RequestBody DataSetMetabaseVO parentDataset, + @RequestParam("preparationSetName") String preparationSetName) throws EEAException { + createEmptyTables.runCreationForPreparationDataset(parentDataset, preparationSetName); + } } diff --git a/dataset-service/src/main/java/org/eea/dataset/service/CreateEmptyTables.java b/dataset-service/src/main/java/org/eea/dataset/service/CreateEmptyTables.java index 520d0e7b4a..b9b2cf635d 100644 --- a/dataset-service/src/main/java/org/eea/dataset/service/CreateEmptyTables.java +++ b/dataset-service/src/main/java/org/eea/dataset/service/CreateEmptyTables.java @@ -1,6 +1,5 @@ package org.eea.dataset.service; -import org.eea.datalake.service.DremioHelperService; import org.eea.datalake.service.model.S3PathResolver; import org.eea.exception.EEAException; import org.eea.interfaces.vo.dataset.DataSetMetabaseVO; @@ -32,4 +31,9 @@ public interface CreateEmptyTables { * @throws Exception exception */ void deleteTableIfEmpty(String tableSchemaName, S3PathResolver tablePathResolver) throws Exception; + + void runCreationForPreparationDataset(DataSetMetabaseVO parentDataset, String preparationSetName) throws EEAException; + + void runCreationForPreparationDatasetTable(DataSetMetabaseVO parentDataset, String tableSchemaId, String preparationSetName) throws EEAException; + } diff --git a/dataset-service/src/main/java/org/eea/dataset/service/impl/CreateEmptyTablesImpl.java b/dataset-service/src/main/java/org/eea/dataset/service/impl/CreateEmptyTablesImpl.java index c7f82596ec..aae910c245 100644 --- a/dataset-service/src/main/java/org/eea/dataset/service/impl/CreateEmptyTablesImpl.java +++ b/dataset-service/src/main/java/org/eea/dataset/service/impl/CreateEmptyTablesImpl.java @@ -40,59 +40,121 @@ @RequiredArgsConstructor public class CreateEmptyTablesImpl implements CreateEmptyTables { - private final S3Helper s3Helper; - private final DremioHelperService dremioHelperService; - private final SpatialDataHandling spatialDataHandling; - private final SchemasRepository schemasRepository; + private final S3Helper s3Helper; + private final DremioHelperService dremioHelperService; + private final SpatialDataHandling spatialDataHandling; + private final SchemasRepository schemasRepository; - @Autowired - private JdbcTemplate dremioJdbcTemplate; + @Autowired + private JdbcTemplate dremioJdbcTemplate; - private static final Logger LOG = LoggerFactory.getLogger(CreateEmptyTablesImpl.class); + private static final Logger LOG = LoggerFactory.getLogger(CreateEmptyTablesImpl.class); - final String ILLEGAL_CHAR_MARKER = "Illegal character in"; + final String ILLEGAL_CHAR_MARKER = "Illegal character in"; - @Value("${parquet.file.path}") - private String parquetFilePath; + @Value("${parquet.file.path}") + private String parquetFilePath; - @Override - public void runCreationForOneDataset(DataSetMetabaseVO dataset) throws EEAException { - DataSetSchema schema = schemasRepository.findByIdDataSetSchema(new ObjectId(dataset.getDatasetSchema())); + @Override + public void runCreationForOneDataset(DataSetMetabaseVO dataset) throws EEAException { + DataSetSchema schema = schemasRepository.findByIdDataSetSchema(new ObjectId(dataset.getDatasetSchema())); - for (TableSchema tableSchema : schema.getTableSchemas()) { - processTableSchema(dataset, tableSchema); + for (TableSchema tableSchema : schema.getTableSchemas()) { + processTableSchema(dataset, tableSchema); + } + } + + @Override + public void runCreationForSpecificTableSchema(DataSetMetabaseVO dataset, String tableSchemaId) throws EEAException { + DataSetSchema schema = schemasRepository.findByIdDataSetSchema(new ObjectId(dataset.getDatasetSchema())); + TableSchema targetTableSchema = schema.getTableSchemas().stream() + .filter(table -> table.getIdTableSchema().toString().equals(tableSchemaId)) + .findFirst() + .orElseThrow(() -> new EEAException("Table schema with ID " + tableSchemaId + " not found")); + processTableSchema(dataset, targetTableSchema); + } + + private void processTableSchema(DataSetMetabaseVO dataset, TableSchema tableSchema) throws EEAException { + S3PathResolver s3TablePathResolver = new S3PathResolver( + dataset.getDataflowId(), + dataset.getDataProviderId() != null ? dataset.getDataProviderId() : 0L, + dataset.getId(), + tableSchema.getNameTableSchema(), + tableSchema.getNameTableSchema(), + getRightPath(dataset, true) + ); + + try { + if (dataset.getDatasetTypeEnum().equals(DatasetTypeEnum.DESIGN)) { + deleteTableIfEmpty(tableSchema.getNameTableSchema(), s3TablePathResolver); + } + boolean folderExists = s3Helper.checkFolderExist(s3TablePathResolver, getRightPath(dataset, false)); + if (!folderExists) { + createEmptyTableWithFields(dataset, tableSchema, s3TablePathResolver); + } + } catch (EEAException eea) { + throw eea; + } catch (Exception e) { + LOG.error("Something went wrong, trying to create empty tables for dataflowId {} and datasetId {} , with exception message: {}", dataset.getDataflowId(), dataset.getId(), e.getMessage()); + throw new EEAException("Something went wrong, trying to create empty tables with message: " + e.getMessage()); + } + } + + /** + * Extracted method: regenerates parquet table and uploads to S3. + * Used for both normal and preparation datasets. + */ + protected void regenerateTable(DataSetMetabaseVO dataset, TableSchema tableSchema, List fields, S3PathResolver s3PathResolver) throws Exception { + String file = "0_0_0.parquet"; + String parquetFile = parquetFilePath + file; + try { + Schema schema1 = Schema.createRecord("Data", null, null, false, fields); + + try { + dremioHelperService.deleteFileFromR3IfExists(parquetFile); + try (ParquetWriter writer = AvroParquetWriter + .builder(new Path(parquetFile)) + .withSchema(schema1) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withPageSize(4 * 1024) + .withRowGroupSize(16 * 1024) + .build()) { + } catch (Exception e1) { + LOG.error("Error creating parquet file {},{}", parquetFile, e1.getMessage()); + throw new EEAException(e1.getMessage()); + } + + // Set S3 import path, preserving preparationSetName if present + S3PathResolver s3ImportPathResolver = getImportS3PathForParquet(dataset, tableSchema, file); + if (s3PathResolver.getPreparationSetCode() != null) { + s3ImportPathResolver.setPreparationSetCode(s3PathResolver.getPreparationSetCode()); + } + + String pathToS3ForImport = s3Helper.getS3Service().getS3Path(s3ImportPathResolver); + String tablePathQuery = s3Helper.getS3Service().getTableAsFolderQueryPath(s3ImportPathResolver, getRightPath(dataset, true)); + + s3Helper.uploadFileToBucket(pathToS3ForImport, parquetFile); + dremioHelperService.refreshTableMetadataAndPromote(null, tablePathQuery, s3ImportPathResolver, tableSchema.getNameTableSchema()); + + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } + + } catch (Exception ex) { + LOG.error(ex.getMessage()); + } finally { + dremioHelperService.deleteFileFromR3IfExists(parquetFile); + } } - } - - @Override - public void runCreationForSpecificTableSchema(DataSetMetabaseVO dataset, String tableSchemaId) throws EEAException { - DataSetSchema schema = schemasRepository.findByIdDataSetSchema(new ObjectId(dataset.getDatasetSchema())); - TableSchema targetTableSchema = schema.getTableSchemas().stream() - .filter(table -> table.getIdTableSchema().toString().equals(tableSchemaId)) - .findFirst() - .orElseThrow(() -> new EEAException("Table schema with ID " + tableSchemaId + " not found")); - processTableSchema(dataset, targetTableSchema); - } - - private void processTableSchema(DataSetMetabaseVO dataset, TableSchema tableSchema) throws EEAException { - S3PathResolver s3TablePathResolver = new S3PathResolver( - dataset.getDataflowId(), - dataset.getDataProviderId() != null ? dataset.getDataProviderId() : 0L, - dataset.getId(), - tableSchema.getNameTableSchema(), - tableSchema.getNameTableSchema(), - getRightPath(dataset, true) - ); - - try { - if (dataset.getDatasetTypeEnum().equals(DatasetTypeEnum.DESIGN)) { - deleteTableIfEmpty(tableSchema.getNameTableSchema(), s3TablePathResolver); - } - boolean folderExists = s3Helper.checkFolderExist(s3TablePathResolver, getRightPath(dataset, false)); - if (!folderExists) { - - List fieldSchemas = tableSchema.getRecordSchema().getFieldSchema(); + /** + * Extracted logic for creating fields and regenerating parquet tables. + * This is now reused for both normal datasets and preparation datasets. + */ + protected void createEmptyTableWithFields(DataSetMetabaseVO dataset, TableSchema tableSchema, S3PathResolver s3PathResolver) throws Exception { + List fieldSchemas = new ArrayList<>(tableSchema.getRecordSchema().getFieldSchema()); + + // Prepend mandatory fields FieldSchema recordIdSchema = new FieldSchema(); recordIdSchema.setHeaderName("record_id"); recordIdSchema.setType(DataType.TEXT); @@ -106,95 +168,125 @@ private void processTableSchema(DataSetMetabaseVO dataset, TableSchema tableSche String currentField = ""; try { - List fields = new ArrayList<>(); - for (FieldSchema field : fieldSchemas) { - currentField = field.getHeaderName(); - if (spatialDataHandling.getGeoJsonEnums().contains(field.getType())) { - fields.add(new Schema.Field(field.getHeaderName(), Schema.create(Schema.Type.BYTES))); - } else { - fields.add(new Schema.Field(field.getHeaderName(), Schema.create(Schema.Type.STRING))); + List fields = new ArrayList<>(); + for (FieldSchema field : fieldSchemas) { + currentField = field.getHeaderName(); + if (spatialDataHandling.getGeoJsonEnums().contains(field.getType())) { + fields.add(new Schema.Field(field.getHeaderName(), Schema.create(Schema.Type.BYTES))); + } else { + fields.add(new Schema.Field(field.getHeaderName(), Schema.create(Schema.Type.STRING))); + } } - } - regenerateTables(dataset, tableSchema, fields); + regenerateTable(dataset, tableSchema, fields, s3PathResolver); } catch (Exception e) { - String msg = e.getMessage(); - if (msg != null && msg.contains(ILLEGAL_CHAR_MARKER)) { - throw new EEAException(EEAErrorMessage.ERROR_ILLEGAL_HEADER_CHARACTER + currentField); - } - throw e; + String msg = e.getMessage(); + if (msg != null && msg.contains(ILLEGAL_CHAR_MARKER)) { + throw new EEAException(EEAErrorMessage.ERROR_ILLEGAL_HEADER_CHARACTER + currentField); + } + throw e; } - } - } catch (EEAException eea) { - throw eea; - } catch (Exception e) { - LOG.error("Something went wrong, trying to create empty tables for dataflowId {} and datasetId {} , with exception message: {}", dataset.getDataflowId(), dataset.getId(), e.getMessage()); - throw new EEAException("Something went wrong, trying to create empty tables with message: " + e.getMessage()); } - } - - private void regenerateTables(DataSetMetabaseVO dataset, TableSchema tableSchema, List fields) throws Exception { - Schema schema1 = Schema.createRecord("Data", null, null, false, fields); - String file = "0_0_0.parquet"; - String parquetFile = parquetFilePath + file; - try { - dremioHelperService.deleteFileFromR3IfExists(parquetFile); - try (ParquetWriter writer = AvroParquetWriter - .builder(new Path(parquetFile)) - .withSchema(schema1) - .withCompressionCodec(CompressionCodecName.SNAPPY) - .withPageSize(4 * 1024) - .withRowGroupSize(16 * 1024) - .build()) { - } catch (Exception e1) { - LOG.error("Error creating parquet file {},{}", parquetFile, e1.getMessage()); - throw new EEAException(e1.getMessage()); - } - - S3PathResolver s3PathResolver = getImportS3PathForParquet(dataset, tableSchema, file); - String pathToS3ForImport = s3Helper.getS3Service().getS3Path(s3PathResolver); - String tablePath1 = s3Helper.getS3Service().getTableAsFolderQueryPath(s3PathResolver, getRightPath(dataset, true)); - - s3Helper.uploadFileToBucket(pathToS3ForImport, parquetFile); - dremioHelperService.refreshTableMetadataAndPromote(null, tablePath1, s3PathResolver, tableSchema.getNameTableSchema()); - } catch (Exception ex) { - LOG.error(ex.getMessage()); - } finally { - dremioHelperService.deleteFileFromR3IfExists(parquetFile); + + @Override + public void deleteTableIfEmpty(String tableSchemaName, S3PathResolver tablePathResolver) throws Exception { + String tablePath = s3Helper.getS3Service().getTableAsFolderQueryPath(tablePathResolver, S3_TABLE_AS_FOLDER_QUERY_PATH); + if (s3Helper.checkFolderExist(tablePathResolver, S3_TABLE_NAME_FOLDER_PATH) && !dremioHelperService.checkFolderPromoted(tablePathResolver, tablePathResolver.getTableName())) { + dremioHelperService.promoteFolderOrFile(tablePathResolver, tablePathResolver.getTableName()); + } + String numberOfRecordsQuery = "SELECT COUNT (*) FROM " + tablePath; + if (s3Helper.checkFolderExist(tablePathResolver, S3_TABLE_NAME_FOLDER_PATH) && dremioJdbcTemplate.queryForObject(numberOfRecordsQuery, Long.class) == 0) { + dremioHelperService.demoteFolderOrFile(tablePathResolver, tableSchemaName); + s3Helper.deleteFolder(tablePathResolver, S3_TABLE_NAME_FOLDER_PATH); + } } - } - @Override - public void deleteTableIfEmpty(String tableSchemaName, S3PathResolver tablePathResolver) throws Exception { - String tablePath = s3Helper.getS3Service().getTableAsFolderQueryPath(tablePathResolver, S3_TABLE_AS_FOLDER_QUERY_PATH); - if (s3Helper.checkFolderExist(tablePathResolver, S3_TABLE_NAME_FOLDER_PATH) && !dremioHelperService.checkFolderPromoted(tablePathResolver, tablePathResolver.getTableName())) { - dremioHelperService.promoteFolderOrFile(tablePathResolver, tablePathResolver.getTableName()); + private S3PathResolver getImportS3PathForParquet(DataSetMetabaseVO dataset, TableSchema tableSchema, String parquetFilename) { + S3PathResolver s3PathResolver = new S3PathResolver(dataset.getDataflowId(), dataset.getDataProviderId() != null ? dataset.getDataProviderId() : 0L, dataset.getId(), tableSchema.getNameTableSchema(), parquetFilename, getImportPathForParquet(dataset)); + s3PathResolver.setParquetFolder(tableSchema.getNameTableSchema() + "_" + UUID.randomUUID()); + return s3PathResolver; } - String numberOfRecordsQuery = "SELECT COUNT (*) FROM " + tablePath; - if (s3Helper.checkFolderExist(tablePathResolver, S3_TABLE_NAME_FOLDER_PATH) && dremioJdbcTemplate.queryForObject(numberOfRecordsQuery, Long.class) == 0) { - dremioHelperService.demoteFolderOrFile(tablePathResolver, tableSchemaName); - s3Helper.deleteFolder(tablePathResolver, S3_TABLE_NAME_FOLDER_PATH); + + private String getRightPath(DataSetMetabaseVO dataset, boolean isQueryPath) { + if (Objects.requireNonNull(dataset.getDatasetTypeEnum()) == DatasetTypeEnum.REFERENCE) { + return isQueryPath ? S3_DATAFLOW_REFERENCE_QUERY_PATH : S3_DATAFLOW_REFERENCE_FOLDER_PATH; + } + return isQueryPath ? S3_TABLE_AS_FOLDER_QUERY_PATH : S3_TABLE_NAME_FOLDER_PATH; } - } - private S3PathResolver getImportS3PathForParquet(DataSetMetabaseVO dataset, TableSchema tableSchema, String parquetFilename) { - S3PathResolver s3PathResolver = new S3PathResolver(dataset.getDataflowId(), dataset.getDataProviderId() != null ? dataset.getDataProviderId() : 0L, dataset.getId(), tableSchema.getNameTableSchema(), parquetFilename, getImportPathForParquet(dataset)); - s3PathResolver.setParquetFolder(tableSchema.getNameTableSchema() + "_" + UUID.randomUUID()); - return s3PathResolver; - } + private String getImportPathForParquet(DataSetMetabaseVO dataset) { + if (Objects.requireNonNull(dataset.getDatasetTypeEnum()) == DatasetTypeEnum.REFERENCE) { + return S3_DATAFLOW_REFERENCE_PATH; + } + return S3_TABLE_NAME_WITH_PARQUET_FOLDER_PATH; + } - private String getRightPath(DataSetMetabaseVO dataset, boolean isQueryPath) { - if (Objects.requireNonNull(dataset.getDatasetTypeEnum()) == DatasetTypeEnum.REFERENCE) { - return isQueryPath ? S3_DATAFLOW_REFERENCE_QUERY_PATH : S3_DATAFLOW_REFERENCE_FOLDER_PATH; + // ========================== Preparation Dataset Methods ========================== + + /** + * Public method to run creation for a full preparation dataset. + * Uses preparationSetName explicitly. + */ + @Override + public void runCreationForPreparationDataset(DataSetMetabaseVO parentDataset, String preparationSetName) throws EEAException { + DataSetSchema schema = schemasRepository.findByIdDataSetSchema(new ObjectId(parentDataset.getDatasetSchema())); + for (TableSchema tableSchema : schema.getTableSchemas()) { + processTableSchemaForPreparationDataset(parentDataset, tableSchema, preparationSetName); + } } - return isQueryPath ? S3_TABLE_AS_FOLDER_QUERY_PATH : S3_TABLE_NAME_FOLDER_PATH; - } - private String getImportPathForParquet(DataSetMetabaseVO dataset) { - if (Objects.requireNonNull(dataset.getDatasetTypeEnum()) == DatasetTypeEnum.REFERENCE) { - return S3_DATAFLOW_REFERENCE_PATH; + /** + * Public method to run creation for a specific table in a preparation dataset. + */ + @Override + public void runCreationForPreparationDatasetTable(DataSetMetabaseVO parentDataset, String tableSchemaId, String preparationSetName) throws EEAException { + DataSetSchema schema = schemasRepository.findByIdDataSetSchema(new ObjectId(parentDataset.getDatasetSchema())); + TableSchema targetTableSchema = schema.getTableSchemas().stream() + .filter(table -> table.getIdTableSchema().toString().equals(tableSchemaId)) + .findFirst() + .orElseThrow(() -> new EEAException("Table schema with ID " + tableSchemaId + " not found")); + processTableSchemaForPreparationDataset(parentDataset, targetTableSchema, preparationSetName); + } + + /** + * Private method: process a table for a preparation dataset. + * Key differences from normal table: + * 1. preparationSetName is set on S3PathResolver + * 2. Everything else (folder checks, field generation, parquet) is reused + */ + private void processTableSchemaForPreparationDataset(DataSetMetabaseVO parentDataset, TableSchema tableSchema, + String preparationSetName) throws EEAException { + S3PathResolver s3PathResolver = new S3PathResolver( + parentDataset.getDataflowId(), + parentDataset.getDataProviderId() != null ? parentDataset.getDataProviderId() : 0L, + parentDataset.getId(), + tableSchema.getNameTableSchema(), + tableSchema.getNameTableSchema(), + getRightPath(parentDataset, true) + ); + + // Set explicit preparation dataset name + s3PathResolver.setPreparationSetCode(preparationSetName); + + try { + LOG.info("[CHRIS] parentDataset.getDatasetTypeEnum().equals(DatasetTypeEnum.DESIGN)"); + if (parentDataset.getDatasetTypeEnum().equals(DatasetTypeEnum.DESIGN)) { + LOG.info("[CHRIS] deleteTableIfEmpty"); + deleteTableIfEmpty(tableSchema.getNameTableSchema(), s3PathResolver); + } + + LOG.info("[CHRIS] boolean folderExists"); + boolean folderExists = s3Helper.checkFolderExist(s3PathResolver, getRightPath(parentDataset, false)); + if (!folderExists) { + LOG.info("[CHRIS] createEmptyTableWithFields"); + createEmptyTableWithFields(parentDataset, tableSchema, s3PathResolver); + } + } catch (EEAException eea) { + throw eea; + } catch (Exception e) { + LOG.error("Error creating preparation dataset table for datasetId {}: {}", parentDataset.getId(), e.getMessage()); + throw new EEAException("Error creating preparation dataset table: " + e.getMessage()); + } } - return S3_TABLE_NAME_WITH_PARQUET_FOLDER_PATH; - } } diff --git a/dataset-service/src/test/java/org/eea/dataset/service/impl/CreateEmptyTablesImplTest.java b/dataset-service/src/test/java/org/eea/dataset/service/impl/CreateEmptyTablesImplTest.java new file mode 100644 index 0000000000..3e293e6276 --- /dev/null +++ b/dataset-service/src/test/java/org/eea/dataset/service/impl/CreateEmptyTablesImplTest.java @@ -0,0 +1,407 @@ +package org.eea.dataset.service.impl; + +import org.bson.types.ObjectId; +import org.eea.datalake.service.DremioHelperService; +import org.eea.datalake.service.S3Helper; +import org.eea.datalake.service.S3Service; +import org.eea.datalake.service.SpatialDataHandling; +import org.eea.datalake.service.model.S3PathResolver; +import org.eea.dataset.persistence.schemas.domain.DataSetSchema; +import org.eea.dataset.persistence.schemas.domain.FieldSchema; +import org.eea.dataset.persistence.schemas.domain.RecordSchema; +import org.eea.dataset.persistence.schemas.domain.TableSchema; +import org.eea.dataset.persistence.schemas.repository.SchemasRepository; +import org.eea.exception.EEAException; +import org.eea.interfaces.vo.dataset.DataSetMetabaseVO; +import org.eea.interfaces.vo.dataset.enums.DataType; +import org.eea.interfaces.vo.dataset.enums.DatasetTypeEnum; +import org.junit.Before; +import org.junit.Test; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.test.util.ReflectionTestUtils; + +import java.util.Collections; +import java.util.List; + +import static org.mockito.Mockito.times; +import static org.eea.utils.LiteralConstants.*; + +/** + * Test class for CreateEmptyTablesImpl. + */ +public class CreateEmptyTablesImplTest { + + @InjectMocks + private CreateEmptyTablesImpl createEmptyTables; + + private CreateEmptyTablesImpl createEmptyTablesSpy; + + @Mock + private S3Helper s3Helper; + + @Mock + private DremioHelperService dremioHelperService; + + @Mock + private SpatialDataHandling spatialDataHandling; + + @Mock + private SchemasRepository schemasRepository; + + @Mock + private JdbcTemplate dremioJdbcTemplate; + + @Mock + private S3Service s3Service; + + @Before + public void initMocks() { + MockitoAnnotations.openMocks(this); + + // IMPORTANT: + // We spy the service to disable heavy side-effect methods + // (filesystem, parquet writer, S3 upload, etc.) + createEmptyTablesSpy = Mockito.spy(createEmptyTables); + + ReflectionTestUtils.setField(createEmptyTablesSpy, "parquetFilePath", "/tmp/"); + } + + // ============================================================================ + // Normal dataset tests + // ============================================================================ + + @Test + public void testRunCreationForOneDataset_createsTablesWhenFolderDoesNotExist1() throws Exception { + + // GIVEN + DataSetMetabaseVO dataset = new DataSetMetabaseVO(); + dataset.setId(1L); + dataset.setDataflowId(10L); + dataset.setDataProviderId(20L); + dataset.setDatasetSchema("507f1f77bcf86cd799439011"); + // Using REFERENCE to avoid trigger for DESIGN cleanup logic `deleteTableIfEmpty` + // This test should only verify table creation when the folder doesn't exist + dataset.setDatasetTypeEnum(DatasetTypeEnum.REFERENCE); + + FieldSchema fieldSchema = new FieldSchema(); + fieldSchema.setHeaderName("test_column"); + fieldSchema.setType(DataType.TEXT); + + RecordSchema recordSchema = new RecordSchema(); + recordSchema.setFieldSchema(List.of(fieldSchema)); + + TableSchema tableSchema = new TableSchema(); + tableSchema.setNameTableSchema("my_table"); + tableSchema.setRecordSchema(recordSchema); + + DataSetSchema dataSetSchema = new DataSetSchema(); + dataSetSchema.setTableSchemas(List.of(tableSchema)); + + Mockito.when(schemasRepository.findByIdDataSetSchema(Mockito.any(ObjectId.class))) + .thenReturn(dataSetSchema); + + Mockito.when(s3Helper.checkFolderExist(Mockito.any(), Mockito.anyString())) + .thenReturn(false); + + Mockito.when(spatialDataHandling.getGeoJsonEnums()) + .thenReturn(Collections.emptyList()); + + // CRITICAL: disable heavy side-effects + Mockito.doNothing() + .when(createEmptyTablesSpy) + .regenerateTable( + Mockito.any(), + Mockito.any(), + Mockito.anyList(), + Mockito.any() + ); + + // WHEN + createEmptyTablesSpy.runCreationForOneDataset(dataset); + + // THEN + Mockito.verify(schemasRepository, times(1)) + .findByIdDataSetSchema(Mockito.any(ObjectId.class)); + + Mockito.verify(s3Helper, Mockito.atLeastOnce()) + .checkFolderExist(Mockito.any(), Mockito.anyString()); + + Mockito.verify(createEmptyTablesSpy, times(1)) + .regenerateTable( + Mockito.any(DataSetMetabaseVO.class), + Mockito.any(TableSchema.class), + Mockito.anyList(), + Mockito.any(S3PathResolver.class) + ); + } + + @Test + public void testRunCreationForOneDataset_doesNothingWhenFolderExists() throws Exception { + // GIVEN + DataSetMetabaseVO dataSetMetabaseVO = new DataSetMetabaseVO(); + dataSetMetabaseVO.setId(1L); + dataSetMetabaseVO.setDataflowId(10L); + dataSetMetabaseVO.setDatasetSchema("507f1f77bcf86cd799439011"); + dataSetMetabaseVO.setDatasetTypeEnum(DatasetTypeEnum.REFERENCE); + + TableSchema tableSchema = new TableSchema(); + tableSchema.setNameTableSchema("table"); + tableSchema.setRecordSchema(new RecordSchema()); + + DataSetSchema dataSetSchema = new DataSetSchema(); + dataSetSchema.setTableSchemas(List.of(tableSchema)); + + Mockito.when(schemasRepository.findByIdDataSetSchema(Mockito.any())) + .thenReturn(dataSetSchema); + + Mockito.when(s3Helper.checkFolderExist(Mockito.any(), Mockito.anyString())) + .thenReturn(true); + + // WHEN + createEmptyTablesSpy.runCreationForOneDataset(dataSetMetabaseVO); + + // THEN + Mockito.verify(createEmptyTablesSpy, Mockito.never()) + .createEmptyTableWithFields(Mockito.any(), Mockito.any(), Mockito.any()); + } + + @Test(expected = EEAException.class) + public void testRunCreationForSpecificTableSchema_throwsWhenTableSchemaNotFound() throws Exception { + // GIVEN + DataSetMetabaseVO dataSetMetabaseVO = new DataSetMetabaseVO(); + dataSetMetabaseVO.setDatasetSchema("507f1f77bcf86cd799439011"); + + DataSetSchema dataSetSchema = new DataSetSchema(); + dataSetSchema.setTableSchemas(Collections.emptyList()); + + Mockito.when(schemasRepository.findByIdDataSetSchema(Mockito.any())) + .thenReturn(dataSetSchema); + + // WHEN + createEmptyTablesSpy.runCreationForSpecificTableSchema(dataSetMetabaseVO, "missing-id"); + + // THEN throw exception + } + + // ============================================================================ + // Preparation dataset tests + // ============================================================================ + + @Test + public void testRunCreationForPreparationDataset_setsPreparationSetNameOnS3PathResolver() throws Exception { + // GIVEN + DataSetMetabaseVO parentDataSetMetabaseVO = new DataSetMetabaseVO(); + parentDataSetMetabaseVO.setId(1L); + parentDataSetMetabaseVO.setDataflowId(10L); + parentDataSetMetabaseVO.setDatasetSchema("507f1f77bcf86cd799439011"); + parentDataSetMetabaseVO.setDatasetTypeEnum(DatasetTypeEnum.REFERENCE); + + TableSchema tableSchema = new TableSchema(); + tableSchema.setNameTableSchema("table"); + tableSchema.setRecordSchema(new RecordSchema()); + + DataSetSchema dataSetSchema = new DataSetSchema(); + dataSetSchema.setTableSchemas(List.of(tableSchema)); + + Mockito.when(schemasRepository.findByIdDataSetSchema(Mockito.any())) + .thenReturn(dataSetSchema); + + Mockito.when(s3Helper.checkFolderExist(Mockito.any(), Mockito.anyString())) + .thenReturn(false); + + Mockito.doNothing() + .when(createEmptyTablesSpy) + .createEmptyTableWithFields(Mockito.any(), Mockito.any(), Mockito.any()); + + // WHEN + createEmptyTablesSpy.runCreationForPreparationDataset( + parentDataSetMetabaseVO, + "section_a_granada" + ); + + // THEN + Mockito.verify(createEmptyTablesSpy) + .createEmptyTableWithFields( + Mockito.eq(parentDataSetMetabaseVO), + Mockito.eq(tableSchema), + Mockito.argThat(resolver -> + "section_a_granada".equals(resolver.getPreparationSetCode()) + ) + ); + } + + @Test + public void testRunCreationForPreparationDatasetTable_onlyProcessesSingleTable() throws Exception { + // GIVEN + DataSetMetabaseVO dataSetMetabaseVO = new DataSetMetabaseVO(); + dataSetMetabaseVO.setId(1L); + dataSetMetabaseVO.setDataflowId(10L); + dataSetMetabaseVO.setDatasetSchema("507f1f77bcf86cd799439011"); + dataSetMetabaseVO.setDatasetTypeEnum(DatasetTypeEnum.REFERENCE); + + TableSchema tableSchema1 = new TableSchema(); + tableSchema1.setIdTableSchema(new ObjectId()); + tableSchema1.setNameTableSchema("t1"); + tableSchema1.setRecordSchema(new RecordSchema()); + + TableSchema tableSchema2 = new TableSchema(); + tableSchema2.setIdTableSchema(new ObjectId()); + tableSchema2.setNameTableSchema("t2"); + tableSchema2.setRecordSchema(new RecordSchema()); + + DataSetSchema dataSetSchema = new DataSetSchema(); + dataSetSchema.setTableSchemas(List.of(tableSchema1, tableSchema2)); + + Mockito.when(schemasRepository.findByIdDataSetSchema(Mockito.any())) + .thenReturn(dataSetSchema); + + Mockito.when(s3Helper.checkFolderExist(Mockito.any(), Mockito.anyString())) + .thenReturn(false); + + Mockito.doNothing() + .when(createEmptyTablesSpy) + .createEmptyTableWithFields(Mockito.any(), Mockito.any(), Mockito.any()); + + // WHEN + createEmptyTablesSpy.runCreationForPreparationDatasetTable( + dataSetMetabaseVO, + tableSchema1.getIdTableSchema().toString(), + "prep" + ); + + // THEN + Mockito.verify(createEmptyTablesSpy, Mockito.times(1)) + .createEmptyTableWithFields( + Mockito.eq(dataSetMetabaseVO), + Mockito.eq(tableSchema1), + Mockito.any() + ); + } + + // ============================================================================ + // DESIGN dataset cleanup tests + // ============================================================================ + + @Test + public void testDeleteTableIfEmpty_demotesAndDeletesWhenEmpty() throws Exception { + // GIVEN + S3PathResolver tablePathResolver = Mockito.mock(S3PathResolver.class); + Mockito.when(tablePathResolver.getTableName()).thenReturn("table"); + + // Ensure s3Helper.getS3Service() returns a mock + S3Service s3ServiceMock = Mockito.mock(S3Service.class); + Mockito.when(s3Helper.getS3Service()).thenReturn(s3ServiceMock); + Mockito.when(s3ServiceMock.getTableAsFolderQueryPath(Mockito.any(), Mockito.anyString())) + .thenReturn("fake/table/path"); + + // Folder exists + Mockito.when(s3Helper.checkFolderExist(Mockito.eq(tablePathResolver), Mockito.eq(S3_TABLE_NAME_FOLDER_PATH))) + .thenReturn(true); + + // Table promoted + Mockito.when(dremioHelperService.checkFolderPromoted(Mockito.eq(tablePathResolver), Mockito.eq("table"))) + .thenReturn(true); + + // Zero rows → triggers deletion + Mockito.when(dremioJdbcTemplate.queryForObject(Mockito.anyString(), Mockito.eq(Long.class))) + .thenReturn(0L); + + // Inject mocks into class under test to avoid spy issues + ReflectionTestUtils.setField(createEmptyTables, "s3Helper", s3Helper); + ReflectionTestUtils.setField(createEmptyTables, "dremioHelperService", dremioHelperService); + ReflectionTestUtils.setField(createEmptyTables, "dremioJdbcTemplate", dremioJdbcTemplate); + + // WHEN + createEmptyTables.deleteTableIfEmpty("table", tablePathResolver); + + // THEN + Mockito.verify(dremioHelperService, Mockito.times(1)) + .demoteFolderOrFile(tablePathResolver, "table"); + + Mockito.verify(s3Helper, Mockito.times(1)) + .deleteFolder(tablePathResolver, S3_TABLE_NAME_FOLDER_PATH); + } + + @Test + public void testDeleteTableIfEmpty_doesNothingWhenNotEmpty_fixed() throws Exception { + // GIVEN + S3PathResolver tablePathResolver = Mockito.mock(S3PathResolver.class); + Mockito.when(tablePathResolver.getTableName()).thenReturn("table"); + + // Mock S3Service + S3Service s3ServiceMock = Mockito.mock(S3Service.class); + Mockito.when(s3Helper.getS3Service()).thenReturn(s3ServiceMock); + Mockito.when(s3ServiceMock.getTableAsFolderQueryPath(Mockito.any(), Mockito.anyString())) + .thenReturn("fake/table/path"); + + // Folder exists + Mockito.when(s3Helper.checkFolderExist(Mockito.eq(tablePathResolver), Mockito.eq(S3_TABLE_NAME_FOLDER_PATH))) + .thenReturn(true); + + // Table has >0 rows → should do nothing + Mockito.when(dremioJdbcTemplate.queryForObject(Mockito.anyString(), Mockito.eq(Long.class))) + .thenReturn(5L); + + // Inject mocks to avoid spy/reference issues + ReflectionTestUtils.setField(createEmptyTables, "s3Helper", s3Helper); + ReflectionTestUtils.setField(createEmptyTables, "dremioHelperService", dremioHelperService); + ReflectionTestUtils.setField(createEmptyTables, "dremioJdbcTemplate", dremioJdbcTemplate); + + // WHEN + createEmptyTables.deleteTableIfEmpty("table", tablePathResolver); + + // THEN + Mockito.verify(dremioHelperService, Mockito.never()) + .demoteFolderOrFile(Mockito.any(), Mockito.any()); + + Mockito.verify(s3Helper, Mockito.never()) + .deleteFolder(Mockito.any(), Mockito.anyString()); + } + + // ============================================================================ + // Error handling tests + // ============================================================================ + + @Test(expected = EEAException.class) + public void testIllegalCharacterInFieldHeaderThrowsEEAException() throws Exception { + // GIVEN + DataSetMetabaseVO dataSetMetabaseVO = new DataSetMetabaseVO(); + dataSetMetabaseVO.setId(1L); + dataSetMetabaseVO.setDataflowId(10L); + dataSetMetabaseVO.setDatasetSchema("507f1f77bcf86cd799439011"); + dataSetMetabaseVO.setDatasetTypeEnum(DatasetTypeEnum.REFERENCE); + + FieldSchema fieldSchema = new FieldSchema(); + fieldSchema.setHeaderName("bad@header"); + fieldSchema.setType(DataType.TEXT); + + RecordSchema recordSchema = new RecordSchema(); + recordSchema.setFieldSchema(List.of(fieldSchema)); + + TableSchema tableSchema = new TableSchema(); + tableSchema.setNameTableSchema("table"); + tableSchema.setRecordSchema(recordSchema); + + DataSetSchema dataSetSchema = new DataSetSchema(); + dataSetSchema.setTableSchemas(List.of(tableSchema)); + + Mockito.when(schemasRepository.findByIdDataSetSchema(Mockito.any())) + .thenReturn(dataSetSchema); + + Mockito.when(spatialDataHandling.getGeoJsonEnums()) + .thenReturn(Collections.emptyList()); + + Mockito.doThrow(new RuntimeException("Illegal character in")) + .when(createEmptyTablesSpy) + .regenerateTable(Mockito.any(), Mockito.any(), Mockito.anyList(), Mockito.any()); + + // WHEN + createEmptyTablesSpy.runCreationForOneDataset(dataSetMetabaseVO); + + // THEN -> exception + } +}