Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,9 @@ public DremioDirectoryItemsResponse getDirectoryItems(S3PathResolver s3PathResol
directoryPath = bucketName + "/" + s3Service.getS3Path(s3PathResolver);
} else if (S3_DATAFLOW_REFERENCE_FOLDER_PATH.equals(s3PathResolver.getPath())) {
directoryPath = bucketName + "/" + s3Service.getTableAsFolderQueryPath(s3PathResolver, S3_REFERENCE_FOLDER_PATH);
} else {
} else if (S3_VIEWS_FOLDER_PATH.equals(s3PathResolver.getPath())) {
directoryPath = bucketName + "/" + s3Service.getTableAsFolderQueryPath(s3PathResolver, S3_VIEWS_FOLDER_PATH).replace("/views","");
}else {
directoryPath = bucketName + "/" + s3Service.getTableAsFolderQueryPath(s3PathResolver, S3_PROVIDER_PATH);
}
DremioDirectoryItemsResponse directoryItems = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ private String calculateS3Path(S3PathResolver s3PathResolver) {
switch (path) {
case S3_IMPORT_QUERY_PATH:
case S3_TABLE_AS_FOLDER_QUERY_PATH:
case S3_VIEWS_TABLE_AS_FOLDER_QUERY_PATH:
case S3_TABLE_NAME_QUERY_PATH:
case S3_IMPORT_CSV_FILE_QUERY_PATH:
case S3_TABLE_NAME_VALIDATE_QUERY_PATH:
Expand All @@ -113,6 +114,7 @@ private String calculateS3Path(S3PathResolver s3PathResolver) {
s3PathResolver.getValidationId(), fileName);
break;
case S3_TABLE_NAME_FOLDER_PATH:
case S3_VIEW_TABLE_NAME_FOLDER_PATH:
case S3_ATTACHMENTS_TABLE_PATH:
case S3_TABLE_NAME_FOLDER_PATH_FOR_VALID_PREFIX:
path = String.format(path, dataflowFolder, dataProviderFolder, datasetFolder,
Expand All @@ -125,6 +127,7 @@ private String calculateS3Path(S3PathResolver s3PathResolver) {
path = String.format(path, dataflowFolder, dataProviderFolder, datasetFolder, fileName);
break;
case S3_PROVIDER_PATH:
case S3_VIEWS_FOLDER_PATH:
case S3_SNAPSHOT_FOLDER_PATH:
case S3_VALIDATION_TABLE_PATH:
path = String.format(path, dataflowFolder, dataProviderFolder, datasetFolder);
Expand Down Expand Up @@ -254,16 +257,19 @@ private String calculateS3TableAsFolderPath(S3PathResolver s3PathResolver, Strin
return S3_DEFAULT_BUCKET + String.format(path, dataflowFolder, s3PathResolver.getTableName());
case S3_EXPORT_PREFILLED_TABLE_AS_FOLDER_QUERY_PATH:
case S3_TABLE_AS_FOLDER_QUERY_PATH:
case S3_VIEWS_TABLE_AS_FOLDER_QUERY_PATH:
if(BooleanUtils.isTrue(s3PathResolver.getIsIcebergTable())){
return S3_ICEBERG_BUCKET + String.format(path, dataflowFolder, dataProviderFolder, datasetFolder, s3PathResolver.getTableName());
}
return S3_DEFAULT_BUCKET + String.format(path, dataflowFolder, dataProviderFolder, datasetFolder, s3PathResolver.getTableName());
case S3_IMPORT_TABLE_NAME_FOLDER_PATH:
case S3_VALIDATION_TABLE_PATH:
case S3_TABLE_NAME_FOLDER_PATH:
case S3_VIEW_TABLE_NAME_FOLDER_PATH:
case S3_TABLE_NAME_FOLDER_PATH_FOR_VALID_PREFIX:
case S3_ATTACHMENTS_TABLE_PATH:
case S3_PROVIDER_PATH:
case S3_VIEWS_FOLDER_PATH:
return String.format(path, dataflowFolder, dataProviderFolder, datasetFolder, s3PathResolver.getTableName());
case S3_ROOT_DATAFLOW_FOLDER_PATH:
return String.format(path, dataflowFolder);
Expand Down Expand Up @@ -303,6 +309,7 @@ private String calculateS3TableAsFolderPath(S3PathResolver s3PathResolver) {
return S3_DEFAULT_BUCKET + String.format(path, dataflowFolder,
s3PathResolver.getTableName());
case S3_TABLE_NAME_FOLDER_PATH:
case S3_VIEW_TABLE_NAME_FOLDER_PATH:
case S3_TABLE_NAME_FOLDER_PATH_FOR_VALID_PREFIX:
return String.format(path, dataflowFolder, dataProviderFolder, datasetFolder,
s3PathResolver.getTableName());
Expand Down Expand Up @@ -336,6 +343,7 @@ private String calculateS3TableDCAsFolderPath(S3PathResolver s3PathResolver, Str
return S3_DEFAULT_BUCKET + String.format(path, dataflowFolder,
s3PathResolver.getTableName());
case S3_TABLE_AS_FOLDER_QUERY_PATH:
case S3_VIEWS_TABLE_AS_FOLDER_QUERY_PATH:
return S3_DEFAULT_BUCKET + String.format(path, dataflowFolder, dataProviderId,
datasetId, s3PathResolver.getTableName());
case S3_TABLE_NAME_EU_QUERY_PATH:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,10 @@ public final class LiteralConstants {

public static final String S3_SNAPSHOT_TABLE_NAME_VALIDATE_DC_PATH = "%s/%s/%s/snapshots/%s/validation";

public static final String S3_VIEWS_FOLDER_PATH = "%s/%s/%s/views/";

public static final String S3_VIEW_TABLE_NAME_FOLDER_PATH = "%s/%s/%s/views/%s";

/** The Constant S3_TABLE_NAME_VALIDATE_DC_QUERY_PATH: {@value}. */
public static final String S3_TABLE_NAME_VALIDATE_DC_QUERY_PATH = ".\"%s\".\"collections\".\"%s\".\"current\".\"%s_validate\".\"%s\".\"%s\"";

Expand All @@ -340,6 +344,7 @@ public final class LiteralConstants {
public static final String S3_TABLE_NAME_DC_QUERY_PATH = ".\"%s\".\"collections\".\"%s\".\"current\".\"%s\"";

public static final String S3_TABLE_AS_FOLDER_QUERY_PATH = ".\"%s\".\"%s\".\"%s\".\"current\".\"%s\"";
public static final String S3_VIEWS_TABLE_AS_FOLDER_QUERY_PATH = ".\"%s\".\"%s\".\"%s\".\"views\".\"%s\"";

public static final String S3_EXPORT_PREFILLED_TABLE_AS_FOLDER_QUERY_PATH = ".\"%s\".\"%s\".\"%s\".\"current\".\"exported\".\"%s\"";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.eea.dataset.service.model.ImportFileInDremioInfo;
import org.eea.interfaces.vo.communication.UserNotificationVO;
import org.eea.lock.redis.LockEnum;
import org.eea.lock.redis.RedisLockService;
import org.eea.utils.UtilityClass;
Expand Down Expand Up @@ -60,14 +59,12 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.core.io.InputStreamResource;
import org.springframework.core.io.Resource;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.annotation.Async;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.security.concurrent.DelegatingSecurityContextRunnable;
import org.springframework.security.core.context.SecurityContext;
Expand All @@ -84,12 +81,10 @@
import javax.servlet.http.HttpServletResponse;
import java.io.*;
import java.util.*;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;

import static org.eea.interfaces.vo.dataset.enums.FileTypeEnum.CSV;
import static org.eea.utils.LiteralConstants.EXPORT_CSV;
import static org.eea.utils.LiteralConstants.EXPORT_PARQUET;
import static org.eea.utils.LiteralConstants.*;

/**
* The Class DatasetControllerImpl.
Expand Down Expand Up @@ -4594,4 +4589,38 @@ private void disableEditing(DatasetTableVO datasetTableVO) {
LOG.error(e.getMessage(), e);
}
}


@PostMapping("/createView")
public ResponseEntity<Void> createTypedView(
@RequestParam Long dataflowId,
@RequestParam Long providerId,
@RequestParam Long datasetId,
@RequestParam String tableName,
@RequestParam String tableSchemaId)
{

Boolean isBigDataflow = dataFlowControllerZuul.isBigDataflow(dataflowId);

if (!isBigDataflow) {
return ResponseEntity.badRequest().build();
}

try {
bigDataDatasetService.createTypedView(
dataflowId,
providerId,
datasetId,
tableSchemaId,
tableName
);

return ResponseEntity.ok().build();

} catch (Exception e) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}


}
}
Original file line number Diff line number Diff line change
Expand Up @@ -362,4 +362,37 @@ byte[] getGeometryAsGeoJson(
* @param s3PathResolver table resolver
*/
boolean isTableEmpty(S3PathResolver s3PathResolver);

/**
* Creates a typed Dremio view for a given dataset table by reading its schema definition
* from Mongo metadata and generating a CTAS query that casts dynamic Mongo fields into
* Citus-compatible SQL types.
*
* <p>The method will:
* <ul>
* <li>Load dataset schema from MongoDB</li>
* <li>Resolve field types dynamically from FieldSchema</li>
* <li>Generate a Dremio CTAS query with TRY_CAST for safe conversions</li>
* <li>Create a "views" folder in Dremio if it does not exist</li>
* <li>Execute the CTAS query to generate a Parquet-backed typed view</li>
* </ul>
*
* <p>Any field that cannot be safely cast will be converted to NULL using TRY_CAST semantics.
*
* @param dataflowId the dataflow identifier used in the dataset path
* @param providerId the provider identifier used in the dataset path
* @param datasetId the dataset identifier in metadata storage
* @param tableSchemaId the schema identifier of the table to be transformed
* @param tableName the physical table name used in Dremio/S3 paths
*
* @throws Exception if schema resolution fails, Dremio promotion fails,
* or CTAS execution fails
*/
void createTypedView(
Long dataflowId,
Long providerId,
Long datasetId,
String tableSchemaId,
String tableName
) throws Exception;
}
Loading