diff --git a/common-utitlities/src/main/java/org/eea/datalake/service/impl/DremioHelperServiceImpl.java b/common-utitlities/src/main/java/org/eea/datalake/service/impl/DremioHelperServiceImpl.java index cda5f2ef50..96f08fcd54 100644 --- a/common-utitlities/src/main/java/org/eea/datalake/service/impl/DremioHelperServiceImpl.java +++ b/common-utitlities/src/main/java/org/eea/datalake/service/impl/DremioHelperServiceImpl.java @@ -122,7 +122,9 @@ public DremioDirectoryItemsResponse getDirectoryItems(S3PathResolver s3PathResol directoryPath = bucketName + "/" + s3Service.getS3Path(s3PathResolver); } else if (S3_DATAFLOW_REFERENCE_FOLDER_PATH.equals(s3PathResolver.getPath())) { directoryPath = bucketName + "/" + s3Service.getTableAsFolderQueryPath(s3PathResolver, S3_REFERENCE_FOLDER_PATH); - } else { + } else if (S3_VIEWS_FOLDER_PATH.equals(s3PathResolver.getPath())) { + directoryPath = bucketName + "/" + s3Service.getTableAsFolderQueryPath(s3PathResolver, S3_VIEWS_FOLDER_PATH).replace("/views",""); + }else { directoryPath = bucketName + "/" + s3Service.getTableAsFolderQueryPath(s3PathResolver, S3_PROVIDER_PATH); } DremioDirectoryItemsResponse directoryItems = null; diff --git a/common-utitlities/src/main/java/org/eea/datalake/service/impl/S3ServiceImpl.java b/common-utitlities/src/main/java/org/eea/datalake/service/impl/S3ServiceImpl.java index c400b347ab..fc82922376 100644 --- a/common-utitlities/src/main/java/org/eea/datalake/service/impl/S3ServiceImpl.java +++ b/common-utitlities/src/main/java/org/eea/datalake/service/impl/S3ServiceImpl.java @@ -90,6 +90,7 @@ private String calculateS3Path(S3PathResolver s3PathResolver) { switch (path) { case S3_IMPORT_QUERY_PATH: case S3_TABLE_AS_FOLDER_QUERY_PATH: + case S3_VIEWS_TABLE_AS_FOLDER_QUERY_PATH: case S3_TABLE_NAME_QUERY_PATH: case S3_IMPORT_CSV_FILE_QUERY_PATH: case S3_TABLE_NAME_VALIDATE_QUERY_PATH: @@ -113,6 +114,7 @@ private String calculateS3Path(S3PathResolver s3PathResolver) { s3PathResolver.getValidationId(), fileName); break; case S3_TABLE_NAME_FOLDER_PATH: + case S3_VIEW_TABLE_NAME_FOLDER_PATH: case S3_ATTACHMENTS_TABLE_PATH: case S3_TABLE_NAME_FOLDER_PATH_FOR_VALID_PREFIX: path = String.format(path, dataflowFolder, dataProviderFolder, datasetFolder, @@ -125,6 +127,7 @@ private String calculateS3Path(S3PathResolver s3PathResolver) { path = String.format(path, dataflowFolder, dataProviderFolder, datasetFolder, fileName); break; case S3_PROVIDER_PATH: + case S3_VIEWS_FOLDER_PATH: case S3_SNAPSHOT_FOLDER_PATH: case S3_VALIDATION_TABLE_PATH: path = String.format(path, dataflowFolder, dataProviderFolder, datasetFolder); @@ -254,6 +257,7 @@ private String calculateS3TableAsFolderPath(S3PathResolver s3PathResolver, Strin return S3_DEFAULT_BUCKET + String.format(path, dataflowFolder, s3PathResolver.getTableName()); case S3_EXPORT_PREFILLED_TABLE_AS_FOLDER_QUERY_PATH: case S3_TABLE_AS_FOLDER_QUERY_PATH: + case S3_VIEWS_TABLE_AS_FOLDER_QUERY_PATH: if(BooleanUtils.isTrue(s3PathResolver.getIsIcebergTable())){ return S3_ICEBERG_BUCKET + String.format(path, dataflowFolder, dataProviderFolder, datasetFolder, s3PathResolver.getTableName()); } @@ -261,9 +265,11 @@ private String calculateS3TableAsFolderPath(S3PathResolver s3PathResolver, Strin case S3_IMPORT_TABLE_NAME_FOLDER_PATH: case S3_VALIDATION_TABLE_PATH: case S3_TABLE_NAME_FOLDER_PATH: + case S3_VIEW_TABLE_NAME_FOLDER_PATH: case S3_TABLE_NAME_FOLDER_PATH_FOR_VALID_PREFIX: case S3_ATTACHMENTS_TABLE_PATH: case S3_PROVIDER_PATH: + case S3_VIEWS_FOLDER_PATH: return String.format(path, dataflowFolder, dataProviderFolder, datasetFolder, s3PathResolver.getTableName()); case S3_ROOT_DATAFLOW_FOLDER_PATH: return String.format(path, dataflowFolder); @@ -303,6 +309,7 @@ private String calculateS3TableAsFolderPath(S3PathResolver s3PathResolver) { return S3_DEFAULT_BUCKET + String.format(path, dataflowFolder, s3PathResolver.getTableName()); case S3_TABLE_NAME_FOLDER_PATH: + case S3_VIEW_TABLE_NAME_FOLDER_PATH: case S3_TABLE_NAME_FOLDER_PATH_FOR_VALID_PREFIX: return String.format(path, dataflowFolder, dataProviderFolder, datasetFolder, s3PathResolver.getTableName()); @@ -336,6 +343,7 @@ private String calculateS3TableDCAsFolderPath(S3PathResolver s3PathResolver, Str return S3_DEFAULT_BUCKET + String.format(path, dataflowFolder, s3PathResolver.getTableName()); case S3_TABLE_AS_FOLDER_QUERY_PATH: + case S3_VIEWS_TABLE_AS_FOLDER_QUERY_PATH: return S3_DEFAULT_BUCKET + String.format(path, dataflowFolder, dataProviderId, datasetId, s3PathResolver.getTableName()); case S3_TABLE_NAME_EU_QUERY_PATH: diff --git a/common-utitlities/src/main/java/org/eea/utils/LiteralConstants.java b/common-utitlities/src/main/java/org/eea/utils/LiteralConstants.java index 0f152b2893..c437b4214a 100644 --- a/common-utitlities/src/main/java/org/eea/utils/LiteralConstants.java +++ b/common-utitlities/src/main/java/org/eea/utils/LiteralConstants.java @@ -321,6 +321,10 @@ public final class LiteralConstants { public static final String S3_SNAPSHOT_TABLE_NAME_VALIDATE_DC_PATH = "%s/%s/%s/snapshots/%s/validation"; + public static final String S3_VIEWS_FOLDER_PATH = "%s/%s/%s/views/"; + + public static final String S3_VIEW_TABLE_NAME_FOLDER_PATH = "%s/%s/%s/views/%s"; + /** The Constant S3_TABLE_NAME_VALIDATE_DC_QUERY_PATH: {@value}. */ public static final String S3_TABLE_NAME_VALIDATE_DC_QUERY_PATH = ".\"%s\".\"collections\".\"%s\".\"current\".\"%s_validate\".\"%s\".\"%s\""; @@ -340,6 +344,7 @@ public final class LiteralConstants { public static final String S3_TABLE_NAME_DC_QUERY_PATH = ".\"%s\".\"collections\".\"%s\".\"current\".\"%s\""; public static final String S3_TABLE_AS_FOLDER_QUERY_PATH = ".\"%s\".\"%s\".\"%s\".\"current\".\"%s\""; + public static final String S3_VIEWS_TABLE_AS_FOLDER_QUERY_PATH = ".\"%s\".\"%s\".\"%s\".\"views\".\"%s\""; public static final String S3_EXPORT_PREFILLED_TABLE_AS_FOLDER_QUERY_PATH = ".\"%s\".\"%s\".\"%s\".\"current\".\"exported\".\"%s\""; diff --git a/dataset-service/src/main/java/org/eea/dataset/controller/DatasetControllerImpl.java b/dataset-service/src/main/java/org/eea/dataset/controller/DatasetControllerImpl.java index 084705188e..df14e5a3f5 100644 --- a/dataset-service/src/main/java/org/eea/dataset/controller/DatasetControllerImpl.java +++ b/dataset-service/src/main/java/org/eea/dataset/controller/DatasetControllerImpl.java @@ -9,7 +9,6 @@ import org.apache.commons.lang3.BooleanUtils; import org.apache.commons.lang3.StringUtils; import org.eea.dataset.service.model.ImportFileInDremioInfo; -import org.eea.interfaces.vo.communication.UserNotificationVO; import org.eea.lock.redis.LockEnum; import org.eea.lock.redis.RedisLockService; import org.eea.utils.UtilityClass; @@ -60,14 +59,12 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.core.io.InputStreamResource; -import org.springframework.core.io.Resource; import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.Pageable; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; -import org.springframework.scheduling.annotation.Async; import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.security.concurrent.DelegatingSecurityContextRunnable; import org.springframework.security.core.context.SecurityContext; @@ -84,12 +81,10 @@ import javax.servlet.http.HttpServletResponse; import java.io.*; import java.util.*; -import java.util.concurrent.ThreadPoolExecutor; import java.util.stream.Collectors; import static org.eea.interfaces.vo.dataset.enums.FileTypeEnum.CSV; -import static org.eea.utils.LiteralConstants.EXPORT_CSV; -import static org.eea.utils.LiteralConstants.EXPORT_PARQUET; +import static org.eea.utils.LiteralConstants.*; /** * The Class DatasetControllerImpl. @@ -4594,4 +4589,38 @@ private void disableEditing(DatasetTableVO datasetTableVO) { LOG.error(e.getMessage(), e); } } + + + @PostMapping("/createView") + public ResponseEntity createTypedView( + @RequestParam Long dataflowId, + @RequestParam Long providerId, + @RequestParam Long datasetId, + @RequestParam String tableName, + @RequestParam String tableSchemaId) + { + + Boolean isBigDataflow = dataFlowControllerZuul.isBigDataflow(dataflowId); + + if (!isBigDataflow) { + return ResponseEntity.badRequest().build(); + } + + try { + bigDataDatasetService.createTypedView( + dataflowId, + providerId, + datasetId, + tableSchemaId, + tableName + ); + + return ResponseEntity.ok().build(); + + } catch (Exception e) { + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build(); + } + + + } } diff --git a/dataset-service/src/main/java/org/eea/dataset/service/BigDataDatasetService.java b/dataset-service/src/main/java/org/eea/dataset/service/BigDataDatasetService.java index 0787438384..8b8178491e 100644 --- a/dataset-service/src/main/java/org/eea/dataset/service/BigDataDatasetService.java +++ b/dataset-service/src/main/java/org/eea/dataset/service/BigDataDatasetService.java @@ -362,4 +362,37 @@ byte[] getGeometryAsGeoJson( * @param s3PathResolver table resolver */ boolean isTableEmpty(S3PathResolver s3PathResolver); + + /** + * Creates a typed Dremio view for a given dataset table by reading its schema definition + * from Mongo metadata and generating a CTAS query that casts dynamic Mongo fields into + * Citus-compatible SQL types. + * + *

The method will: + *

+ * + *

Any field that cannot be safely cast will be converted to NULL using TRY_CAST semantics. + * + * @param dataflowId the dataflow identifier used in the dataset path + * @param providerId the provider identifier used in the dataset path + * @param datasetId the dataset identifier in metadata storage + * @param tableSchemaId the schema identifier of the table to be transformed + * @param tableName the physical table name used in Dremio/S3 paths + * + * @throws Exception if schema resolution fails, Dremio promotion fails, + * or CTAS execution fails + */ + void createTypedView( + Long dataflowId, + Long providerId, + Long datasetId, + String tableSchemaId, + String tableName + ) throws Exception; } diff --git a/dataset-service/src/main/java/org/eea/dataset/service/impl/BigDataDatasetServiceImpl.java b/dataset-service/src/main/java/org/eea/dataset/service/impl/BigDataDatasetServiceImpl.java index 8f3ec93e53..0bc78e1423 100644 --- a/dataset-service/src/main/java/org/eea/dataset/service/impl/BigDataDatasetServiceImpl.java +++ b/dataset-service/src/main/java/org/eea/dataset/service/impl/BigDataDatasetServiceImpl.java @@ -18,6 +18,7 @@ import org.eea.dataset.mapper.HelperMultipartFileMapper; import org.eea.dataset.persistence.metabase.domain.DatasetTable; import org.eea.dataset.persistence.schemas.domain.DataSetSchema; +import org.eea.dataset.persistence.schemas.domain.FieldSchema; import org.eea.dataset.persistence.schemas.domain.TableSchema; import org.eea.dataset.persistence.schemas.domain.pkcatalogue.PkCatalogueSchema; import org.eea.dataset.persistence.schemas.repository.PkCatalogueRepository; @@ -35,7 +36,6 @@ import org.eea.interfaces.controller.orchestrator.JobController.JobControllerZuul; import org.eea.interfaces.controller.orchestrator.JobProcessController.JobProcessControllerZuul; import org.eea.interfaces.controller.recordstore.ProcessController.ProcessControllerZuul; -import org.eea.interfaces.vo.communication.UserNotificationContentVO; import org.eea.interfaces.vo.dataflow.DataFlowVO; import org.eea.interfaces.vo.dataflow.DataProviderVO; import org.eea.interfaces.vo.dataflow.RepresentativeVO; @@ -67,7 +67,6 @@ import org.eea.lock.redis.RedisLockService; import org.eea.multitenancy.DatasetId; import org.eea.multitenancy.TenantResolver; -import org.eea.thread.ThreadPropertiesManager; import org.eea.utils.LiteralConstants; import org.eea.utils.UtilityClass; import org.slf4j.Logger; @@ -3304,4 +3303,284 @@ public boolean isTableEmpty(S3PathResolver s3PathResolver) { return false; } } + + @Override + public void createTypedView( + Long dataflowId, + Long providerId, + Long datasetId, + String tableSchemaId, + String tableName + ) throws Exception { + + DataSetMetabaseVO dataset = + datasetMetabaseService.findDatasetMetabase(datasetId); + + DataSetSchema schema = + schemasRepository.findById( + new ObjectId(dataset.getDatasetSchema()) + ).orElseThrow(); + + TableSchema tableSchema = + schema.getTableSchemas() + .stream() + .filter(t -> + t.getRecordSchema() + .getIdTableSchema() + .toString() + .equals(tableSchemaId) + ) + .findFirst() + .orElseThrow(); + + List fields = + tableSchema.getRecordSchema().getFieldSchema(); + + // SOURCE path + S3PathResolver currentPath = new S3PathResolver( + dataflowId, + providerId, + datasetId, + tableName + ); + + currentPath.setPath(S3_TABLE_AS_FOLDER_QUERY_PATH); + + String sourcePath = s3ServicePrivate.getS3Path(currentPath); + + // TARGET path + currentPath.setPath(S3_VIEWS_TABLE_AS_FOLDER_QUERY_PATH); + + String targetPath = s3ServicePrivate.getS3Path(currentPath); + + // Build select clause for populated tables + String selectClause = + fields.stream() + .map(this::buildCast) + .collect(Collectors.joining(",\n")); + + dropViewTableIfExists(targetPath); + deleteViewFolderIfExists(currentPath); + + Thread.sleep(3000); + + // STEP 1: create schema-only table + String createSql = + buildCreateTableSql(targetPath, fields); + + String processId = + dremioHelperService.executeSqlStatement(createSql); + + dremioHelperService.checkIfDremioProcessFinishedSuccessfully( + createSql, + processId, + null + ); + + // STEP 2: insert data (if any exists) + if (sourceTableHasRows(sourcePath)) { + + String insertSql = + buildInsertSql( + targetPath, + sourcePath, + selectClause + ); + + processId = + dremioHelperService.executeSqlStatement(insertSql); + + dremioHelperService.checkIfDremioProcessFinishedSuccessfully( + insertSql, + processId, + null + ); + } + } + + private boolean sourceTableHasRows(String sourcePath) { + try { + long rowCount = + dremioHelperService.getRowCount(sourcePath); + return rowCount > 0; + } catch (Exception e) { + return false; + } + } + + private String buildCreateTableSql( + String target, + List fields + ) { + + String columns = + fields.stream() + .map(field -> { + + String name = + "\"" + field.getHeaderName() + "\""; + + String type = + mapType(field); + + return name + " " + type; + }) + .collect(Collectors.joining(",\n")); + + return "CREATE TABLE " + + target + + " (\n" + + columns + + "\n)"; + } + + private String buildCast(FieldSchema field) { + + String name = "\"" + field.getHeaderName() + "\""; + String type = mapType(field); + + switch (type) { + case "VARCHAR": + return name + " AS " + name; + case "INTEGER": + + return "CASE " + + "WHEN REGEXP_LIKE(" + name + ", '^-?[0-9]+$') " + + "THEN CAST(" + name + " AS INTEGER) " + + "ELSE NULL END AS " + name; + case "DOUBLE": + + return "CASE " + + "WHEN REGEXP_LIKE(" + name + ", '^-?[0-9]+(\\\\.[0-9]+)?$') " + + "THEN CAST(" + name + " AS DOUBLE) " + + "ELSE NULL END AS " + name; + case "BOOLEAN": + + return "CASE " + + "WHEN LOWER(" + name + ") IN ('true','false') " + + "THEN CAST(" + name + " AS BOOLEAN) " + + "ELSE NULL END AS " + name; + case "DATE": + + return "CASE " + + "WHEN " + name + " IS NOT NULL " + + "THEN CAST(" + name + " AS DATE) " + + "ELSE NULL END AS " + name; + case "TIMESTAMP": + + return "CASE " + + "WHEN " + name + " IS NOT NULL " + + "THEN CAST(" + name + " AS TIMESTAMP) " + + "ELSE NULL END AS " + name; + } + + return name + " AS " + name; + } + + private String mapType(FieldSchema field) { + + String t = field.getType().getValue().toUpperCase(); + + switch (t) { + + case "NUMBER_INTEGER": + return "INTEGER"; + + case "NUMBER_DECIMAL": + return "DOUBLE"; + + case "BOOLEAN": + return "BOOLEAN"; + + case "DATE": + return "DATE"; + + case "DATETIME": + return "TIMESTAMP"; + + case "TEXT": + case "TEXTAREA": + case "LONG_TEXT": + case "CODELIST": + case "MULTISELECT_CODELIST": + case "LINK": + case "EXTERNAL_LINK": + case "URL": + case "PHONE": + case "EMAIL": + case "ATTACHMENT": + case "POINT": + case "LINESTRING": + case "POLYGON": + case "MULTIPOINT": + case "MULTILINESTRING": + case "MULTIPOLYGON": + case "GEOMETRYCOLLECTION": + default: + return "VARCHAR"; + } + } + + private String buildInsertSql( + String target, + String source, + String selectClause + ) { + + return "INSERT INTO " + + target + + " SELECT " + + selectClause + + " FROM " + + source; + } + + private void deleteViewFolderIfExists( + S3PathResolver s3PathResolver + ) { + + try { + + if (s3HelperPrivate.checkFolderExist( + s3PathResolver, + S3_VIEW_TABLE_NAME_FOLDER_PATH + )) { + + s3HelperPrivate.deleteFolder( + s3PathResolver, + S3_VIEW_TABLE_NAME_FOLDER_PATH + ); + + Thread.sleep(3000); + } + + } catch (Exception e) { + + LOG.warn( + "Could not delete existing views folder for dataset {}", + s3PathResolver.getDatasetId() + ); + } + } + + private void dropViewTableIfExists(String tablePath) { + + try { + + String sql = "DROP TABLE IF EXISTS " + tablePath; + + String processId = + dremioHelperService.executeSqlStatement(sql); + + dremioHelperService.checkIfDremioProcessFinishedSuccessfully( + sql, + processId, + 2000L + ); + + } catch (Exception e) { + + LOG.warn("Could not drop table {}", tablePath); + } + } } \ No newline at end of file